-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Add Stats to DescribeWorkerDeploymentVersion #7959
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Stats to DescribeWorkerDeploymentVersion #7959
Conversation
d5ecba5
to
71bb52e
Compare
service/matching/matching_engine.go
Outdated
resp := &matchingservice.DescribeVersionedTaskQueuesResponse{} | ||
for _, tq := range request.VersionTaskQueues { | ||
tqResp, err := e.DescribeTaskQueue(ctx, | ||
&matchingservice.DescribeTaskQueueRequest{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might make these concurrently. Didn't get there yet.
}) | ||
} | ||
|
||
pm.PutCache(cacheKey, resp) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had considered moving the cache into the matchingEngineImpl
as part of this PR (it's outstanding tech debt), but that was difficult since it needs to be initialised with the TTL. But the TTL is a namespace-scoped config. So the lifetimes don't match up with the engine.
9f51c31
to
2aa69b3
Compare
service/matching/matching_engine.go
Outdated
DescRequest: &workflowservice.DescribeTaskQueueRequest{ | ||
TaskQueue: &taskqueuepb.TaskQueue{ | ||
Name: tq.Name, | ||
Kind: enumspb.TaskQueueKind(tq.Type), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems wrong. In this case it's always NORMAL kind
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right; good catch. I probably auto-completed that. Now I wonder why the tests are passing ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good to me otherwise.
} | ||
taskQueues = append(taskQueues, element) | ||
} | ||
infos := make([]*deploymentpb.WorkerDeploymentVersionInfo_VersionTaskQueueInfo, 0, len(taskQueueInfos)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this part will be removed once the deprecated filed is cleaned up, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exactly!
if tqRespTQ, ok := tqRespMap[tqKey(tq.Name, tq.Type)]; ok { | ||
tqOutputs[i].Stats = tqRespTQ.Stats | ||
} else { | ||
// Setting empty stats instead of leaving nil (which is only used when not querying for stats). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure about this. empty stats will mislead users into thinking the stats are all zero which is a valid stats by itself. It's dangerous if users think backlog is empty while it is actually not.
Options are reporting partial result with some nils in the list, or failing the whole call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking about this; it's not clear to me yet why/when we wouldn't have a result there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, other than random edge cases I cannot think of one either... but in that case, why not just return error for the whole thing? Better to be safe than returning incorrect results.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 okidoki
@@ -1354,6 +1363,56 @@ func (e *matchingEngineImpl) DescribeTaskQueue( | |||
return descrResp, nil | |||
} | |||
|
|||
func (e *matchingEngineImpl) DescribeVersionedTaskQueues( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this API is for saving the tq fan-out by caching the version results, while DescribeTaskQueue
is caching the per-tq results already, right?
It makes sense, I just wonder if we had to do it now or we could've waited for some more signals that the tq fan-out will be a practical problem. But it's good that we have it, now that we have it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exactly! Since we have the cache, it was easy to add 🤷
buildIds = []string{worker_versioning.WorkerDeploymentVersionToStringV31(request.Version)} | ||
} | ||
|
||
cacheKey := "dtq_default:" + strings.Join(buildIds, ",") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be keyed by the requested version or the cache results of all vs one version collide.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not for this PR, but could we always put the per-version stats in the cache, keyed by version string so they do not collide? It'd mean the total result here would need to check cache for all individual versions but that should be fine, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 Good idea.
localPM, _, _ := e.getTaskQueuePartitionManager(ctx, partition, false, 0) | ||
if localPM != nil { | ||
// If available, query the local partition manager to save a network call. | ||
tqResp, err = e.DescribeTaskQueue(ctx, tqReq) | ||
} else { | ||
// Otherwise, query the other matching service instance. | ||
tqResp, err = e.matchingRawClient.DescribeTaskQueue(ctx, tqReq) | ||
} | ||
if err != nil { | ||
return nil, err // some other error, return it | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I realized that our functional test setup doesn't cover this; but in production it would need to actually use the matching client for task queues that aren't on the instance.
Note that getTaskQueuePartitionManager
AFAIK can also return nil
when the partition has been unloaded but should be on this instance. I don't know how to check for that; so right now it would make a network call despite that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense, it seems good to me.
For the future, I wonder we can improve the routing logic inside the client to use the local matching_engine if the target instance ends up being the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, this best-effort thing is a little odd. My thought with the DescribeTaskQueue fanout was: the DescribeTaskQueue call goes to the root. So the root partition should be in the same process. So when asking the root, look up the pm, otherwise do an rpc. The "is it loaded locally" check can theoretically be wrong in both directions, as you point out.
In this case if we're doing a fanout to multiple task queues I think we should just do rpcs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hah, that's what #6733 does. But that's way overkill for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I didn't realize it can go wrong in both directions? I only assumed that it could falsely state that the tq root isn't on this host (ie unloaded) and then make an avoidable RPC.
I'm trying to assess the impact here and whether I need to patch it for the release asap. Are you saying it's incorrect or unnecessary/overly complex?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, I suppose it could only be wrong in the positive direction for a few seconds, since we have the unload on membership change monitor. The cache ttl is a few seconds so... I would say not a correctness concern.
I just get worried about stuff like this because of the potential inconsistencies: the source of truth for partition owner is membership, not the map in matching engine. The map in matching engine eventually follows membership, hopefully within a few seconds. So I'd rather just go to the source (by doing an rpc that gets routed by membership) than ask the map directly.
In the DescribeTaskQueue case, the rpc to the root just arrived, so that seems safe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 I'll simplify it to always making the rpc.
buildIds = []string{worker_versioning.WorkerDeploymentVersionToStringV31(request.Version)} | ||
} | ||
|
||
cacheKey := "dtq_default:" + strings.Join(buildIds, ",") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not for this PR, but could we always put the per-version stats in the cache, keyed by version string so they do not collide? It'd mean the total result here would need to check cache for all individual versions but that should be fine, right?
localPM, _, _ := e.getTaskQueuePartitionManager(ctx, partition, false, 0) | ||
if localPM != nil { | ||
// If available, query the local partition manager to save a network call. | ||
tqResp, err = e.DescribeTaskQueue(ctx, tqReq) | ||
} else { | ||
// Otherwise, query the other matching service instance. | ||
tqResp, err = e.matchingRawClient.DescribeTaskQueue(ctx, tqReq) | ||
} | ||
if err != nil { | ||
return nil, err // some other error, return it | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense, it seems good to me.
For the future, I wonder we can improve the routing logic inside the client to use the local matching_engine if the target instance ends up being the same.
abea51b
to
7c46b92
Compare
_**READ BEFORE MERGING:** All PRs require approval by both Server AND SDK teams before merging! This is why the number of required approvals is "2" and not "1"--two reviewers from the same team is NOT sufficient. If your PR is not approved by someone in BOTH teams, it may be summarily reverted._ <!-- Describe what has changed in this PR --> **What changed?** (1) Deprecated `task_queue_infos` in `deployment.WorkerDeploymentVersionInfo`. (2) Added `version_task_queues` to `DescribeWorkerDeploymentVersionResponse`. <!-- Tell your future self why have you made these changes --> **Why?** We want to report task queue stats for each task queue that is part of a worker deployment version. The challenge is that the `taskqueue` package depends on the `deployment` package. So adding `TaskQueueStats` to `deployment.WorkerDeploymentVersionInfo` causes a cycle import error. Weighing our options, we decided to effectively _move_ the task queue-related data from within the deployment package into the response message. <!-- Are there any breaking changes on binary or code level? --> **Breaking changes** Not yet; but in subsequent releases the deprecated field `task_queue_infos` will be removed. <!-- If this breaks the Server, please provide the Server PR to merge right after this PR was merged. --> **Server PR** temporalio/temporal#7959 (draft) --------- Co-authored-by: Spencer Judge <sjudge@hey.com>
_**READ BEFORE MERGING:** All PRs require approval by both Server AND SDK teams before merging! This is why the number of required approvals is "2" and not "1"--two reviewers from the same team is NOT sufficient. If your PR is not approved by someone in BOTH teams, it may be summarily reverted._ <!-- Describe what has changed in this PR --> **What changed?** (1) Deprecated `task_queue_infos` in `deployment.WorkerDeploymentVersionInfo`. (2) Added `version_task_queues` to `DescribeWorkerDeploymentVersionResponse`. <!-- Tell your future self why have you made these changes --> **Why?** We want to report task queue stats for each task queue that is part of a worker deployment version. The challenge is that the `taskqueue` package depends on the `deployment` package. So adding `TaskQueueStats` to `deployment.WorkerDeploymentVersionInfo` causes a cycle import error. Weighing our options, we decided to effectively _move_ the task queue-related data from within the deployment package into the response message. <!-- Are there any breaking changes on binary or code level? --> **Breaking changes** Not yet; but in subsequent releases the deprecated field `task_queue_infos` will be removed. <!-- If this breaks the Server, please provide the Server PR to merge right after this PR was merged. --> **Server PR** temporalio/temporal#7959 (draft) --------- Co-authored-by: Spencer Judge <sjudge@hey.com>
c86b8b9
to
361f03f
Compare
f008ee9
to
29b5466
Compare
29b5466
to
54a15c6
Compare
54a15c6
to
64daee2
Compare
@@ -134,6 +136,7 @@ type ( | |||
gaugeMetrics gaugeMetrics // per-namespace task queue counters | |||
config *Config | |||
versionChecker headers.VersionChecker | |||
cache cache.Cache |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unused?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, good catch! That's from when I realized this won't work due to the caching TTL being ns scoped.
fix in #7975
localPM, _, _ := e.getTaskQueuePartitionManager(ctx, partition, false, 0) | ||
if localPM != nil { | ||
// If available, query the local partition manager to save a network call. | ||
tqResp, err = e.DescribeTaskQueue(ctx, tqReq) | ||
} else { | ||
// Otherwise, query the other matching service instance. | ||
tqResp, err = e.matchingRawClient.DescribeTaskQueue(ctx, tqReq) | ||
} | ||
if err != nil { | ||
return nil, err // some other error, return it | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, this best-effort thing is a little odd. My thought with the DescribeTaskQueue fanout was: the DescribeTaskQueue call goes to the root. So the root partition should be in the same process. So when asking the root, look up the pm, otherwise do an rpc. The "is it loaded locally" check can theoretically be wrong in both directions, as you point out.
In this case if we're doing a fanout to multiple task queues I think we should just do rpcs.
localPM, _, _ := e.getTaskQueuePartitionManager(ctx, partition, false, 0) | ||
if localPM != nil { | ||
// If available, query the local partition manager to save a network call. | ||
tqResp, err = e.DescribeTaskQueue(ctx, tqReq) | ||
} else { | ||
// Otherwise, query the other matching service instance. | ||
tqResp, err = e.matchingRawClient.DescribeTaskQueue(ctx, tqReq) | ||
} | ||
if err != nil { | ||
return nil, err // some other error, return it | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hah, that's what #6733 does. But that's way overkill for this.
What changed?
Added task queue stats reporting to DescribeWorkerDeploymentVersion API.
Why?
To allow inspecting backlog etc of task queues of a particular worker version.
How did you test it?