-
Notifications
You must be signed in to change notification settings - Fork 742
Open
Labels
type/enhancementThe issue or PR belongs to an enhancement.The issue or PR belongs to an enhancement.
Description
Enhancement Task
If the TSO request fails, it will try to update the members to get the new leader.
Lines 403 to 416 in 41ec8dc
err = td.processRequests(stream, tsoBatchController, done) | |
// If error happens during tso stream handling, reset stream and run the next trial. | |
if err == nil { | |
// A nil error returned by `processRequests` indicates that the request batch is started successfully. | |
// In this case, the `tsoBatchController` will be put back to the pool when the request is finished | |
// asynchronously (either successful or not). This infers that the current `tsoBatchController` object will | |
// be asynchronously accessed after the `processRequests` call. As a result, we need to use another | |
// `tsoBatchController` for collecting the next batch. Do to this, we set the `tsoBatchController` to nil so that | |
// another one will be fetched from the pool at the beginning of the batching loop. | |
// Otherwise, the `tsoBatchController` won't be processed in other goroutines concurrently, and it can be | |
// reused in the next loop safely. | |
tsoBatchController = nil | |
} else { | |
exit := !td.handleProcessRequestError(ctx, bo, streamURL, cancel, err) |
Line 436 in 41ec8dc
svcDiscovery.ScheduleCheckMemberChanged() |
And there is a backoff, which the minimum time is 100ms
pd/client/pd_service_discovery.go
Lines 532 to 556 in 41ec8dc
func (c *pdServiceDiscovery) updateMemberLoop() { | |
defer c.wg.Done() | |
ctx, cancel := context.WithCancel(c.ctx) | |
defer cancel() | |
ticker := time.NewTicker(memberUpdateInterval) | |
defer ticker.Stop() | |
bo := retry.InitialBackoffer(updateMemberBackOffBaseTime, updateMemberTimeout, updateMemberBackOffBaseTime) | |
for { | |
select { | |
case <-ctx.Done(): | |
log.Info("[pd] exit member loop due to context canceled") | |
return | |
case <-ticker.C: | |
case <-c.checkMembershipCh: | |
} | |
failpoint.Inject("skipUpdateMember", func() { | |
failpoint.Continue() | |
}) | |
if err := bo.Exec(ctx, c.updateMember); err != nil { | |
log.Error("[pd] failed to update member", zap.Strings("urls", c.GetServiceURLs()), errs.ZapError(err)) | |
} | |
} | |
} |
At the same time, the request can still be put into the channel and wait for handling:
Line 528 in 41ec8dc
c.getDispatcher().push(request) |
And the request might be affected by the backoff because we need to wait for the stream to be re-established.
AndreMouche
Metadata
Metadata
Assignees
Labels
type/enhancementThe issue or PR belongs to an enhancement.The issue or PR belongs to an enhancement.