-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Support ramp in Matching #7126
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support ramp in Matching #7126
Conversation
…hab/ramp # Conflicts: # common/testing/testvars/test_vars.go # go.mod # go.sum # service/matching/matching_engine.go
# Conflicts: # client/frontend/client_gen.go # client/frontend/metric_client_gen.go # client/frontend/retryable_client_gen.go # common/dynamicconfig/constants.go # common/rpc/interceptor/logtags/workflow_service_server_gen.go # common/rpc/interceptor/redirection.go # common/testing/mockapi/workflowservicemock/v1/service_grpc.pb.mock.go # common/testing/testvars/test_vars.go # go.mod # go.sum # proto/internal/temporal/server/api/matchingservice/v1/request_response.proto # service/frontend/service.go # service/frontend/workflow_handler.go # service/history/workflow/mutable_state_impl.go # service/history/workflow/mutable_state_impl_test.go # service/history/workflow/util.go # service/history/workflow/workflow_task_state_machine.go # service/matching/config.go # service/matching/physical_task_queue_manager.go # service/worker/workerdeployment/fx.go # tests/deployment_version_test.go
|
||
// Find new current | ||
for _, v := range deployments.GetVersions() { | ||
// [cleanup-old-wv] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// [cleanup-old-wv] |
|
||
// Find ramping version | ||
for _, v := range deployments.GetVersions() { | ||
// [cleanup-old-wv] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// [cleanup-old-wv] |
if v.RoutingUpdateTime != nil && v.GetRampPercentage() > 0 { | ||
if t := v.RoutingUpdateTime.AsTime(); t.After(ramping.GetRoutingUpdateTime().AsTime()) { | ||
ramping = v | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not do this in the same for-loop as the current version?
// FindDeploymentVersionForWorkflowID returns the deployment version that should be used for a | ||
// particular workflow ID based on the versioning info of the task queue. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// FindDeploymentVersionForWorkflowID returns the deployment version that should be used for a | |
// particular workflow ID based on the versioning info of the task queue. | |
// FindDeploymentVersionForWorkflowID returns the deployment version that should be used for a | |
// particular workflow ID based on the versioning info of the task queue. Nil means unversioned. |
oneof operation { | ||
// The deployment version and its data that is being updated. | ||
temporal.server.api.persistence.v1.DeploymentVersionData update_version_data = 6; | ||
// Pass to clean up this version's data from the task queue. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Pass to clean up this version's data from the task queue. | |
// The version whose data should be cleaned from the task queue. |
message DeploymentVersionData { | ||
// Special case: `version.version` may be absent (but not `version.deployment_name`). That is | ||
// only valid when `ramp_percentage` is non-zero, representing ramping to unversioned in a | ||
// particular deployment name. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// particular deployment name. | |
// particular worker deployment. |
google.protobuf.Timestamp routing_update_time = 2; | ||
// If this version is the current version of its deployment. | ||
bool is_current = 3; | ||
// Range: [0, 100]. Must be zero if is_current is true. Must be non-zero if version is null. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand what you mean by Must be non-zero if version is null.
. It seems like DeploymentVersionData.version == nil
is invalid. If we are ramping to unversioned, we still need the deployment name inside version
.
service/matching/deployment_util.go
Outdated
return -1 | ||
} | ||
|
||
func hasDeploymentVersion(deployments *persistencespb.DeploymentData, deployment *deploymentpb.Deployment) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be nicer if hasDeploymentVersion
took in a dv *deploymentpb.WorkerDeploymentVersion
instead of the old deployment *deploymentpb.Deployment
but it's not a big deal.
// forwarded Query/Nexus task requests do not expire rapidly in contrast to forwarded activity/workflow tasks | ||
// that only try up to 200ms sync-match. Therefore, to prevent blocking the request on the wrong build ID, its | ||
// more important to allow the parent partition to make a fresh versioning decision in case the child partition | ||
// did not have up-to-date User Data when selected a dispatch build ID. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you want to delete this comment? It seems like we're still passing nil forwardInfo
# Conflicts: # common/rpc/interceptor/redirection.go # go.mod # go.sum # service/frontend/workflow_handler.go # tests/deployment_version_test.go
|
||
func (tv *TestVars) WithDeploymentNameNumber(n int) *TestVars { | ||
return tv.cloneSetN("deployment_name", n) | ||
func (tv *TestVars) DeploymentVersion() *deploymentpb.WorkerDeploymentVersion { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: We should probably rename this to WorkerDeploymentVersion
imo - I say this because I almost thought it would return the deployment version (string) rather than the WorkerDeploymentVersion
struct it currently returns.
// Special case: `version.version` may be empty (but not `version.deployment_name`). That is | ||
// only valid when `ramp_percentage` is non-zero, representing ramping to unversioned in a | ||
// particular worker deployment. | ||
temporal.api.deployment.v1.WorkerDeploymentVersion version = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
quite similar to my previous comment, this is intended to represent the combination of deploymentName
+ version
. IMO, we shouldn't be calling this attribute version
since it could just plainly signify the deployment version. I think we should call this worker_deployment_version
or any other better name.
Even the comment version.version
feels off for the same reason - wdyt
// only valid when `ramp_percentage` is non-zero, representing ramping to unversioned in a | ||
// particular worker deployment. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ramp_percentage
is non_zero and are ramping to unversioned in a particular worker deployment
the current comment makes it sound that whenever ramp_percentage
is zero, we are moving to unversioned which isn't true always
if v.RoutingUpdateTime != nil && v.GetIsCurrent() { | ||
if t := v.RoutingUpdateTime.AsTime(); t.After(current.GetRoutingUpdateTime().AsTime()) { | ||
current = v |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curious - why do we need a combined check of if v.RoutingUpdateTime != nil && v.GetIsCurrent()
? A current deployment version will always have an RoutingUpdateTime right? Or is this just defensive programming to be wary of any potential errors?
} | ||
if ramping.GetRampPercentage() > 0 { | ||
info.RampingVersionPercentage = ramping.GetRampPercentage() | ||
if ramping.GetVersion().GetVersion() != "" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stressing on the earlier comment - ramping.GetVersion().GetVersion()
looks a bit off to me
// The deployment version and its data that is being updated. | ||
temporal.server.api.persistence.v1.DeploymentVersionData update_version_data = 6; | ||
// The version whose data should be cleaned from the task queue. | ||
temporal.api.deployment.v1.WorkerDeploymentVersion forget_version = 7; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: delete_version
?
@@ -272,5 +275,6 @@ func getTaskQueueCurrentDeployment( | |||
// The TQ is unversioned | |||
return nil, nil | |||
} | |||
return worker_versioning.FindCurrentDeployment(tqData.GetDeploymentData()), nil | |||
versioningInfo := worker_versioning.CalculateTaskQueueVersioningInfo(tqData.GetDeploymentData()) | |||
return worker_versioning.FindDeploymentVersionForWorkflowID(versioningInfo, workflowId), nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this function's readability - was able to understand everything in one pass - very well written 💡
if pm.partition.Kind() != enumspb.TASK_QUEUE_KIND_STICKY { | ||
perTypeUserData, _, err := pm.getPerTypeUserData() | ||
if err != nil { | ||
return nil, err | ||
} | ||
resp.DescResponse.VersioningInfo = worker_versioning.CalculateTaskQueueVersioningInfo(perTypeUserData.GetDeploymentData()) | ||
} | ||
return resp, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any specific reason for touching the legacy API? In an idealistic world, isn't the plan to deprecate/reduce usage of the legacy API and encourage folks to move to the Enhanced mode?
v1 = &deploymentpb.WorkerDeploymentVersion{ | ||
Version: "v1", | ||
DeploymentName: "foo", | ||
} | ||
v2 = &deploymentpb.WorkerDeploymentVersion{ | ||
Version: "v2", | ||
DeploymentName: "foo", | ||
} | ||
v3 = &deploymentpb.WorkerDeploymentVersion{ | ||
Version: "v3", | ||
DeploymentName: "foo", | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wont block on this but since this is a new file, I think you could have used testVars to get these variables? (with a new added method that returns a struct with a different version but the same deployment name)
{name: "two current + two ramping", want: &taskqueuepb.TaskQueueVersioningInfo{CurrentVersion: v2, UpdateTime: t3, RampingVersion: v3, RampingVersionPercentage: 20}, | ||
data: &persistencespb.DeploymentData{ | ||
Versions: []*persistencespb.DeploymentVersionData{ | ||
{Version: v1, IsCurrent: true, RoutingUpdateTime: t1}, | ||
{Version: v2, IsCurrent: true, RoutingUpdateTime: t2}, | ||
{Version: v1, RampPercentage: 50, RoutingUpdateTime: t2}, | ||
{Version: v3, RampPercentage: 20, RoutingUpdateTime: t3}, | ||
}, | ||
}, | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
confused - can we have two current and two ramping versions in the same deployment at a point in time?
## What changed? <!-- Describe what has changed in this PR --> Matching to Worker Deployment Ramping Version and its percentage when routing tasks. User-facing APIs are not implemented yet. The implementation also supports ramping from a Deployment Version to unversioned workers. ## Why? <!-- Tell your future self why have you made these changes --> Part of Worker Versioning features. ## How did you test it? <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> Added unit and functional tests. ## Potential risks <!-- Assuming the worst case, what can be broken when deploying this change to production? --> None. ## Documentation <!-- Have you made sure this change doesn't falsify anything currently stated in `docs/`? If significant new behavior is added, have you described that in `docs/`? --> None. ## Is hotfix candidate? <!-- Is this PR a hotfix candidate or does it require a notification to be sent to the broader community? (Yes/No) --> No --------- Co-authored-by: Shivam Saraf <shivam.saraf@temporal.io>
## What changed? <!-- Describe what has changed in this PR --> Matching to Worker Deployment Ramping Version and its percentage when routing tasks. User-facing APIs are not implemented yet. The implementation also supports ramping from a Deployment Version to unversioned workers. ## Why? <!-- Tell your future self why have you made these changes --> Part of Worker Versioning features. ## How did you test it? <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> Added unit and functional tests. ## Potential risks <!-- Assuming the worst case, what can be broken when deploying this change to production? --> None. ## Documentation <!-- Have you made sure this change doesn't falsify anything currently stated in `docs/`? If significant new behavior is added, have you described that in `docs/`? --> None. ## Is hotfix candidate? <!-- Is this PR a hotfix candidate or does it require a notification to be sent to the broader community? (Yes/No) --> No --------- Co-authored-by: Shivam Saraf <shivam.saraf@temporal.io>
## What changed? <!-- Describe what has changed in this PR --> Matching to Worker Deployment Ramping Version and its percentage when routing tasks. User-facing APIs are not implemented yet. The implementation also supports ramping from a Deployment Version to unversioned workers. ## Why? <!-- Tell your future self why have you made these changes --> Part of Worker Versioning features. ## How did you test it? <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> Added unit and functional tests. ## Potential risks <!-- Assuming the worst case, what can be broken when deploying this change to production? --> None. ## Documentation <!-- Have you made sure this change doesn't falsify anything currently stated in `docs/`? If significant new behavior is added, have you described that in `docs/`? --> None. ## Is hotfix candidate? <!-- Is this PR a hotfix candidate or does it require a notification to be sent to the broader community? (Yes/No) --> No --------- Co-authored-by: Shivam Saraf <shivam.saraf@temporal.io>
## What changed? <!-- Describe what has changed in this PR --> Matching to Worker Deployment Ramping Version and its percentage when routing tasks. User-facing APIs are not implemented yet. The implementation also supports ramping from a Deployment Version to unversioned workers. ## Why? <!-- Tell your future self why have you made these changes --> Part of Worker Versioning features. ## How did you test it? <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> Added unit and functional tests. ## Potential risks <!-- Assuming the worst case, what can be broken when deploying this change to production? --> None. ## Documentation <!-- Have you made sure this change doesn't falsify anything currently stated in `docs/`? If significant new behavior is added, have you described that in `docs/`? --> None. ## Is hotfix candidate? <!-- Is this PR a hotfix candidate or does it require a notification to be sent to the broader community? (Yes/No) --> No --------- Co-authored-by: Shivam Saraf <shivam.saraf@temporal.io>
## What changed? <!-- Describe what has changed in this PR --> Matching to Worker Deployment Ramping Version and its percentage when routing tasks. User-facing APIs are not implemented yet. The implementation also supports ramping from a Deployment Version to unversioned workers. ## Why? <!-- Tell your future self why have you made these changes --> Part of Worker Versioning features. ## How did you test it? <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> Added unit and functional tests. ## Potential risks <!-- Assuming the worst case, what can be broken when deploying this change to production? --> None. ## Documentation <!-- Have you made sure this change doesn't falsify anything currently stated in `docs/`? If significant new behavior is added, have you described that in `docs/`? --> None. ## Is hotfix candidate? <!-- Is this PR a hotfix candidate or does it require a notification to be sent to the broader community? (Yes/No) --> No --------- Co-authored-by: Shivam Saraf <shivam.saraf@temporal.io>
## What changed? <!-- Describe what has changed in this PR --> Matching to Worker Deployment Ramping Version and its percentage when routing tasks. User-facing APIs are not implemented yet. The implementation also supports ramping from a Deployment Version to unversioned workers. ## Why? <!-- Tell your future self why have you made these changes --> Part of Worker Versioning features. ## How did you test it? <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> Added unit and functional tests. ## Potential risks <!-- Assuming the worst case, what can be broken when deploying this change to production? --> None. ## Documentation <!-- Have you made sure this change doesn't falsify anything currently stated in `docs/`? If significant new behavior is added, have you described that in `docs/`? --> None. ## Is hotfix candidate? <!-- Is this PR a hotfix candidate or does it require a notification to be sent to the broader community? (Yes/No) --> No --------- Co-authored-by: Shivam Saraf <shivam.saraf@temporal.io>
## What changed? <!-- Describe what has changed in this PR --> Matching to Worker Deployment Ramping Version and its percentage when routing tasks. User-facing APIs are not implemented yet. The implementation also supports ramping from a Deployment Version to unversioned workers. ## Why? <!-- Tell your future self why have you made these changes --> Part of Worker Versioning features. ## How did you test it? <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> Added unit and functional tests. ## Potential risks <!-- Assuming the worst case, what can be broken when deploying this change to production? --> None. ## Documentation <!-- Have you made sure this change doesn't falsify anything currently stated in `docs/`? If significant new behavior is added, have you described that in `docs/`? --> None. ## Is hotfix candidate? <!-- Is this PR a hotfix candidate or does it require a notification to be sent to the broader community? (Yes/No) --> No --------- Co-authored-by: Shivam Saraf <shivam.saraf@temporal.io>
## What changed? <!-- Describe what has changed in this PR --> Matching to Worker Deployment Ramping Version and its percentage when routing tasks. User-facing APIs are not implemented yet. The implementation also supports ramping from a Deployment Version to unversioned workers. ## Why? <!-- Tell your future self why have you made these changes --> Part of Worker Versioning features. ## How did you test it? <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> Added unit and functional tests. ## Potential risks <!-- Assuming the worst case, what can be broken when deploying this change to production? --> None. ## Documentation <!-- Have you made sure this change doesn't falsify anything currently stated in `docs/`? If significant new behavior is added, have you described that in `docs/`? --> None. ## Is hotfix candidate? <!-- Is this PR a hotfix candidate or does it require a notification to be sent to the broader community? (Yes/No) --> No --------- Co-authored-by: Shivam Saraf <shivam.saraf@temporal.io>
What changed?
Matching to Worker Deployment Ramping Version and its percentage when routing tasks. User-facing APIs are not implemented yet.
The implementation also supports ramping from a Deployment Version to unversioned workers.
Why?
Part of Worker Versioning features.
How did you test it?
Added unit and functional tests.
Potential risks
None.
Documentation
None.
Is hotfix candidate?
No