Skip to content

The TSO request may have a high latency after the leader changes #8835

@rleungx

Description

@rleungx

Enhancement Task

If the TSO request fails, it will try to update the members to get the new leader.

pd/client/tso_dispatcher.go

Lines 403 to 416 in 41ec8dc

err = td.processRequests(stream, tsoBatchController, done)
// If error happens during tso stream handling, reset stream and run the next trial.
if err == nil {
// A nil error returned by `processRequests` indicates that the request batch is started successfully.
// In this case, the `tsoBatchController` will be put back to the pool when the request is finished
// asynchronously (either successful or not). This infers that the current `tsoBatchController` object will
// be asynchronously accessed after the `processRequests` call. As a result, we need to use another
// `tsoBatchController` for collecting the next batch. Do to this, we set the `tsoBatchController` to nil so that
// another one will be fetched from the pool at the beginning of the batching loop.
// Otherwise, the `tsoBatchController` won't be processed in other goroutines concurrently, and it can be
// reused in the next loop safely.
tsoBatchController = nil
} else {
exit := !td.handleProcessRequestError(ctx, bo, streamURL, cancel, err)

svcDiscovery.ScheduleCheckMemberChanged()

And there is a backoff, which the minimum time is 100ms

func (c *pdServiceDiscovery) updateMemberLoop() {
defer c.wg.Done()
ctx, cancel := context.WithCancel(c.ctx)
defer cancel()
ticker := time.NewTicker(memberUpdateInterval)
defer ticker.Stop()
bo := retry.InitialBackoffer(updateMemberBackOffBaseTime, updateMemberTimeout, updateMemberBackOffBaseTime)
for {
select {
case <-ctx.Done():
log.Info("[pd] exit member loop due to context canceled")
return
case <-ticker.C:
case <-c.checkMembershipCh:
}
failpoint.Inject("skipUpdateMember", func() {
failpoint.Continue()
})
if err := bo.Exec(ctx, c.updateMember); err != nil {
log.Error("[pd] failed to update member", zap.Strings("urls", c.GetServiceURLs()), errs.ZapError(err))
}
}
}

At the same time, the request can still be put into the channel and wait for handling:

c.getDispatcher().push(request)

And the request might be affected by the backoff because we need to wait for the stream to be re-established.

Metadata

Metadata

Assignees

No one assigned

    Labels

    type/enhancementThe issue or PR belongs to an enhancement.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions