Skip to content

Conversation

stephanos
Copy link
Contributor

@stephanos stephanos commented Jun 24, 2025

What changed?

Added task queue stats reporting to DescribeWorkerDeploymentVersion API.

Why?

To allow inspecting backlog etc of task queues of a particular worker version.

How did you test it?

  • built
  • run locally and tested manually
  • covered by existing tests
  • added new unit test(s)
  • added new functional test(s)

@stephanos stephanos force-pushed the worker-deployment-stats branch 4 times, most recently from d5ecba5 to 71bb52e Compare June 25, 2025 16:40
resp := &matchingservice.DescribeVersionedTaskQueuesResponse{}
for _, tq := range request.VersionTaskQueues {
tqResp, err := e.DescribeTaskQueue(ctx,
&matchingservice.DescribeTaskQueueRequest{
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might make these concurrently. Didn't get there yet.

})
}

pm.PutCache(cacheKey, resp)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had considered moving the cache into the matchingEngineImpl as part of this PR (it's outstanding tech debt), but that was difficult since it needs to be initialised with the TTL. But the TTL is a namespace-scoped config. So the lifetimes don't match up with the engine.

@stephanos stephanos force-pushed the worker-deployment-stats branch 3 times, most recently from 9f51c31 to 2aa69b3 Compare June 25, 2025 17:03
DescRequest: &workflowservice.DescribeTaskQueueRequest{
TaskQueue: &taskqueuepb.TaskQueue{
Name: tq.Name,
Kind: enumspb.TaskQueueKind(tq.Type),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems wrong. In this case it's always NORMAL kind

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right; good catch. I probably auto-completed that. Now I wonder why the tests are passing ...

Copy link
Contributor

@ShahabT ShahabT left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good to me otherwise.

}
taskQueues = append(taskQueues, element)
}
infos := make([]*deploymentpb.WorkerDeploymentVersionInfo_VersionTaskQueueInfo, 0, len(taskQueueInfos))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this part will be removed once the deprecated filed is cleaned up, right?

Copy link
Contributor Author

@stephanos stephanos Jun 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly!

if tqRespTQ, ok := tqRespMap[tqKey(tq.Name, tq.Type)]; ok {
tqOutputs[i].Stats = tqRespTQ.Stats
} else {
// Setting empty stats instead of leaving nil (which is only used when not querying for stats).
Copy link
Contributor

@ShahabT ShahabT Jun 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about this. empty stats will mislead users into thinking the stats are all zero which is a valid stats by itself. It's dangerous if users think backlog is empty while it is actually not.

Options are reporting partial result with some nils in the list, or failing the whole call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about this; it's not clear to me yet why/when we wouldn't have a result there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, other than random edge cases I cannot think of one either... but in that case, why not just return error for the whole thing? Better to be safe than returning incorrect results.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 okidoki

@@ -1354,6 +1363,56 @@ func (e *matchingEngineImpl) DescribeTaskQueue(
return descrResp, nil
}

func (e *matchingEngineImpl) DescribeVersionedTaskQueues(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this API is for saving the tq fan-out by caching the version results, while DescribeTaskQueue is caching the per-tq results already, right?

It makes sense, I just wonder if we had to do it now or we could've waited for some more signals that the tq fan-out will be a practical problem. But it's good that we have it, now that we have it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly! Since we have the cache, it was easy to add 🤷

buildIds = []string{worker_versioning.WorkerDeploymentVersionToStringV31(request.Version)}
}

cacheKey := "dtq_default:" + strings.Join(buildIds, ",")
Copy link
Contributor Author

@stephanos stephanos Jun 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be keyed by the requested version or the cache results of all vs one version collide.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not for this PR, but could we always put the per-version stats in the cache, keyed by version string so they do not collide? It'd mean the total result here would need to check cache for all individual versions but that should be fine, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Good idea.

Comment on lines +1404 to +1416
localPM, _, _ := e.getTaskQueuePartitionManager(ctx, partition, false, 0)
if localPM != nil {
// If available, query the local partition manager to save a network call.
tqResp, err = e.DescribeTaskQueue(ctx, tqReq)
} else {
// Otherwise, query the other matching service instance.
tqResp, err = e.matchingRawClient.DescribeTaskQueue(ctx, tqReq)
}
if err != nil {
return nil, err // some other error, return it
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized that our functional test setup doesn't cover this; but in production it would need to actually use the matching client for task queues that aren't on the instance.

Note that getTaskQueuePartitionManager AFAIK can also return nil when the partition has been unloaded but should be on this instance. I don't know how to check for that; so right now it would make a network call despite that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense, it seems good to me.

For the future, I wonder we can improve the routing logic inside the client to use the local matching_engine if the target instance ends up being the same.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this best-effort thing is a little odd. My thought with the DescribeTaskQueue fanout was: the DescribeTaskQueue call goes to the root. So the root partition should be in the same process. So when asking the root, look up the pm, otherwise do an rpc. The "is it loaded locally" check can theoretically be wrong in both directions, as you point out.

In this case if we're doing a fanout to multiple task queues I think we should just do rpcs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hah, that's what #6733 does. But that's way overkill for this.

Copy link
Contributor Author

@stephanos stephanos Jun 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I didn't realize it can go wrong in both directions? I only assumed that it could falsely state that the tq root isn't on this host (ie unloaded) and then make an avoidable RPC.

I'm trying to assess the impact here and whether I need to patch it for the release asap. Are you saying it's incorrect or unnecessary/overly complex?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I suppose it could only be wrong in the positive direction for a few seconds, since we have the unload on membership change monitor. The cache ttl is a few seconds so... I would say not a correctness concern.

I just get worried about stuff like this because of the potential inconsistencies: the source of truth for partition owner is membership, not the map in matching engine. The map in matching engine eventually follows membership, hopefully within a few seconds. So I'd rather just go to the source (by doing an rpc that gets routed by membership) than ask the map directly.

In the DescribeTaskQueue case, the rpc to the root just arrived, so that seems safe.

Copy link
Contributor Author

@stephanos stephanos Jun 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I'll simplify it to always making the rpc.

buildIds = []string{worker_versioning.WorkerDeploymentVersionToStringV31(request.Version)}
}

cacheKey := "dtq_default:" + strings.Join(buildIds, ",")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not for this PR, but could we always put the per-version stats in the cache, keyed by version string so they do not collide? It'd mean the total result here would need to check cache for all individual versions but that should be fine, right?

Comment on lines +1404 to +1416
localPM, _, _ := e.getTaskQueuePartitionManager(ctx, partition, false, 0)
if localPM != nil {
// If available, query the local partition manager to save a network call.
tqResp, err = e.DescribeTaskQueue(ctx, tqReq)
} else {
// Otherwise, query the other matching service instance.
tqResp, err = e.matchingRawClient.DescribeTaskQueue(ctx, tqReq)
}
if err != nil {
return nil, err // some other error, return it
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense, it seems good to me.

For the future, I wonder we can improve the routing logic inside the client to use the local matching_engine if the target instance ends up being the same.

@stephanos stephanos force-pushed the worker-deployment-stats branch from abea51b to 7c46b92 Compare June 26, 2025 00:32
stephanos added a commit to temporalio/api that referenced this pull request Jun 26, 2025
_**READ BEFORE MERGING:** All PRs require approval by both Server AND
SDK teams before merging! This is why the number of required approvals
is "2" and not "1"--two reviewers from the same team is NOT sufficient.
If your PR is not approved by someone in BOTH teams, it may be summarily
reverted._

<!-- Describe what has changed in this PR -->
**What changed?**

(1) Deprecated `task_queue_infos` in
`deployment.WorkerDeploymentVersionInfo`.
(2) Added `version_task_queues` to
`DescribeWorkerDeploymentVersionResponse`.

<!-- Tell your future self why have you made these changes -->
**Why?**

We want to report task queue stats for each task queue that is part of a
worker deployment version.

The challenge is that the `taskqueue` package depends on the
`deployment` package. So adding `TaskQueueStats` to
`deployment.WorkerDeploymentVersionInfo` causes a cycle import error.

Weighing our options, we decided to effectively _move_ the task
queue-related data from within the deployment package into the response
message.

<!-- Are there any breaking changes on binary or code level? -->
**Breaking changes**

Not yet; but in subsequent releases the deprecated field
`task_queue_infos` will be removed.

<!-- If this breaks the Server, please provide the Server PR to merge
right after this PR was merged. -->
**Server PR**

temporalio/temporal#7959 (draft)

---------

Co-authored-by: Spencer Judge <sjudge@hey.com>
temporal-cicd bot pushed a commit to temporalio/api-go that referenced this pull request Jun 26, 2025
_**READ BEFORE MERGING:** All PRs require approval by both Server AND
SDK teams before merging! This is why the number of required approvals
is "2" and not "1"--two reviewers from the same team is NOT sufficient.
If your PR is not approved by someone in BOTH teams, it may be summarily
reverted._

<!-- Describe what has changed in this PR -->
**What changed?**

(1) Deprecated `task_queue_infos` in
`deployment.WorkerDeploymentVersionInfo`.
(2) Added `version_task_queues` to
`DescribeWorkerDeploymentVersionResponse`.

<!-- Tell your future self why have you made these changes -->
**Why?**

We want to report task queue stats for each task queue that is part of a
worker deployment version.

The challenge is that the `taskqueue` package depends on the
`deployment` package. So adding `TaskQueueStats` to
`deployment.WorkerDeploymentVersionInfo` causes a cycle import error.

Weighing our options, we decided to effectively _move_ the task
queue-related data from within the deployment package into the response
message.

<!-- Are there any breaking changes on binary or code level? -->
**Breaking changes**

Not yet; but in subsequent releases the deprecated field
`task_queue_infos` will be removed.

<!-- If this breaks the Server, please provide the Server PR to merge
right after this PR was merged. -->
**Server PR**

temporalio/temporal#7959 (draft)

---------

Co-authored-by: Spencer Judge <sjudge@hey.com>
@stephanos stephanos marked this pull request as ready for review June 26, 2025 18:34
@stephanos stephanos requested a review from a team as a code owner June 26, 2025 18:34
@stephanos stephanos force-pushed the worker-deployment-stats branch 7 times, most recently from c86b8b9 to 361f03f Compare June 26, 2025 22:33
@stephanos stephanos enabled auto-merge (squash) June 26, 2025 23:11
@stephanos stephanos force-pushed the worker-deployment-stats branch from f008ee9 to 29b5466 Compare June 26, 2025 23:29
@stephanos stephanos force-pushed the worker-deployment-stats branch from 29b5466 to 54a15c6 Compare June 26, 2025 23:54
@stephanos stephanos force-pushed the worker-deployment-stats branch from 54a15c6 to 64daee2 Compare June 27, 2025 00:05
@stephanos stephanos merged commit 2730e2a into temporalio:main Jun 27, 2025
52 checks passed
@stephanos stephanos deleted the worker-deployment-stats branch June 27, 2025 00:35
@@ -134,6 +136,7 @@ type (
gaugeMetrics gaugeMetrics // per-namespace task queue counters
config *Config
versionChecker headers.VersionChecker
cache cache.Cache
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused?

Copy link
Contributor Author

@stephanos stephanos Jun 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good catch! That's from when I realized this won't work due to the caching TTL being ns scoped.

fix in #7975

Comment on lines +1404 to +1416
localPM, _, _ := e.getTaskQueuePartitionManager(ctx, partition, false, 0)
if localPM != nil {
// If available, query the local partition manager to save a network call.
tqResp, err = e.DescribeTaskQueue(ctx, tqReq)
} else {
// Otherwise, query the other matching service instance.
tqResp, err = e.matchingRawClient.DescribeTaskQueue(ctx, tqReq)
}
if err != nil {
return nil, err // some other error, return it
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this best-effort thing is a little odd. My thought with the DescribeTaskQueue fanout was: the DescribeTaskQueue call goes to the root. So the root partition should be in the same process. So when asking the root, look up the pm, otherwise do an rpc. The "is it loaded locally" check can theoretically be wrong in both directions, as you point out.

In this case if we're doing a fanout to multiple task queues I think we should just do rpcs.

Comment on lines +1404 to +1416
localPM, _, _ := e.getTaskQueuePartitionManager(ctx, partition, false, 0)
if localPM != nil {
// If available, query the local partition manager to save a network call.
tqResp, err = e.DescribeTaskQueue(ctx, tqReq)
} else {
// Otherwise, query the other matching service instance.
tqResp, err = e.matchingRawClient.DescribeTaskQueue(ctx, tqReq)
}
if err != nil {
return nil, err // some other error, return it
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hah, that's what #6733 does. But that's way overkill for this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants