-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Add OnConflictOptions to StartWorkflowExecution #7080
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
b3cd07b
to
74c0732
Compare
41335d0
to
3a5a6f6
Compare
d2b40ac
to
b7ed8a7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to see functional test coverage for as many of the possible cases. I don't feel comfortable merging this otherwise.
Some tests off the top of my head:
- Test that the event is created as expected with links, callbacks, and request ID attached.
- Test that callbacks appear in the describe response.
- Test request ID based deduping works for the attached request ID
RequestID string | ||
RunID string | ||
State enumsspb.WorkflowExecutionState | ||
Status enumspb.WorkflowExecutionStatus | ||
LastWriteVersion int64 | ||
StartTime *time.Time | ||
AttachedRequestIDs []string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please document the difference between the RequestID
and AttachedRequestIDs
fields and when to use either.
go.mod
Outdated
@@ -56,7 +56,7 @@ require ( | |||
go.opentelemetry.io/otel/sdk v1.31.0 | |||
go.opentelemetry.io/otel/sdk/metric v1.31.0 | |||
go.opentelemetry.io/otel/trace v1.31.0 | |||
go.temporal.io/api v1.43.1-0.20241219191931-461db5a23056 | |||
go.temporal.io/api v1.43.2-0.20250115015852-6bdd96365d41 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should merge the API PR before this server PR.
proto/internal/temporal/server/api/persistence/v1/executions.proto
Outdated
Show resolved
Hide resolved
proto/internal/temporal/server/api/persistence/v1/executions.proto
Outdated
Show resolved
Hide resolved
if err := wh.validateOnConflictOptions(request.OnConflictOptions); err != nil { | ||
return nil, err | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Redundant for now but 🤷
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I know is unnecessary for now. Just being preemptive adding the function to validate.
versioningOverride *workflowpb.VersioningOverride, | ||
attachRequestID string, | ||
attachCompletionCallbacks []*commonpb.Callback, | ||
links []*commonpb.Link, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe instead accept a historypb.WorkflowExecutionOptionsUpdatedEventAttributes and links here to avoid this getting out of control?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, I think it would make sense to use historyservice.UpdateWorkflowExecutionOptionsRequest
, but it would add some logic in the event factory (do the mask processing here), which I'm not sure it makes sense.
I'd leave as it is for now, and refactor later. I'm already planning to refactor the startworkflow
api to call the updateworkflowwithoptions
api in which I can address this issue together.
versioningOverride *workflowpb.VersioningOverride, | ||
attachRequestID string, | ||
attachCompletionCallbacks []*commonpb.Callback, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above, probably better to accept the attributes here.
versioningOverride *workflowpb.VersioningOverride, | ||
attachRequestID string, | ||
attachCompletionCallbacks []*commonpb.Callback, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same, accept the event attributes here instead.
b7ed8a7
to
33229ff
Compare
3a5a6f6
to
9031420
Compare
func WorkflowExecutionRequestIDsToBlob(requestIDs *persistencespb.WorkflowExecutionRequestIDs) (*commonpb.DataBlob, error) { | ||
return proto3Encode(requestIDs) | ||
} | ||
|
||
func WorkflowExecutionRequestIDsFromBlob(blob []byte, encoding string) (*persistencespb.WorkflowExecutionRequestIDs, error) { | ||
if blob == nil { | ||
return nil, nil | ||
} | ||
result := &persistencespb.WorkflowExecutionRequestIDs{} | ||
return result, proto3Decode(blob, encoding, result) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is code duplication for now (from the serializer.go
file).
We need to refactor the code to pass the serializer object to the DB plugins to be able to serialize/deserialize fields from DB.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will you do the refactor? Not blocking this PR, but ideally we'd at least track this.
proto/internal/temporal/server/api/persistence/v1/executions.proto
Outdated
Show resolved
Hide resolved
proto/internal/temporal/server/api/persistence/v1/executions.proto
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that we also need to ensure that the WorkflowExecutionOptionsUpdated event is cherry picked (reapplied) on conflict resolution.
executionStateDatablob, err := serialization.WorkflowExecutionStateToBlob( | ||
&persistencespb.WorkflowExecutionState{ | ||
RunId: runID, | ||
CreateRequestId: createRequestID, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we stop storing this now that we have the map?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd leave for a while first to transition to use the new field.
@@ -122,13 +122,18 @@ type ( | |||
|
|||
// CurrentWorkflowConditionFailedError represents a failed conditional update for current workflow record | |||
CurrentWorkflowConditionFailedError struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we just return the entire *persistencespb.WorkflowExecutionState
object here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, yes, we can do that, but it's kind of a big refactor. I can do this in another PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good.
"github.com/pborman/uuid" | ||
"github.com/google/uuid" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! I've been wanting to get rid of this dependency and forbid it in our linter but haven't gotten around to it yet.
proto/internal/temporal/server/api/persistence/v1/executions.proto
Outdated
Show resolved
Hide resolved
versioningOverride *workflowpb.VersioningOverride, | ||
unsetVersioningOverride bool, | ||
attachRequestID string, | ||
attachCompletionCallbacks []*commonpb.Callback, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still think you should just accept *historypb.WorkflowExecutionOptionsUpdatedEventAttributes
here and avoid this function signature growing every time we add another attribute.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a bit complicated. This is the event factory, ie., that's the function that builds *historypb.WorkflowExecutionOptionsUpdatedEventAttributes
. I want to change to get the history request, but then it would add some logic to translate the request input to event attributes, which is not something we do so far.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should be able to accept the attributes in the event factory, not blocking this PR though.
err := api.ResolveWorkflowIDReusePolicy( | ||
workflowKey, | ||
currentWorkflowConditionFailed.Status, | ||
currentWorkflowConditionFailed.RequestID, | ||
s.request.StartRequest.GetWorkflowIdReusePolicy(), | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is currentWorkflowConditionFailed.RequestID
the current request's ID? Should we be checking that the request IDs in the DB and request matches? (asking for lack of context on this part of the code).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
currentWorkflowConditionFailed.RequestID
is the existing current workflow, which can be running or completed already.
33229ff
to
51e80d1
Compare
51e80d1
to
4c09e73
Compare
if expectedValue == nil { | ||
if !actualV.IsNil() { | ||
require.Failf(h.t, "", "Value of property %s.%s for EventID=%v expected to be nil", attrPrefix, attrName, eventID) | ||
} | ||
continue | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a fix in the history event attributes assertion.
If the field is expected to be nil
, the assertion below at L527 will fail because actualV
is something like var actualV any = (*T)(nil)
, and this is different than var expectedValue any = nil
.
proto/internal/temporal/server/api/persistence/v1/executions.proto
Outdated
Show resolved
Hide resolved
abcee87
to
7ff6845
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM but I'm missing a few test cases:
- Start without OnConflictOptions and same different request ID fails (for good measure)
- Start fails due to validation (e.g. max callback limit reached), does not attach a request ID to the current executions table (e.g. a following request doesn't get duduped).
- XDC test and/or reset test that verifies the update event is applied and start requests get properly deduped
@@ -1152,3 +1161,28 @@ func updateExecution( | |||
|
|||
return nil | |||
} | |||
|
|||
func workflowExecutionStateFromCurrentExecutionsRow( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we change other places that uses current execution row to use the blob instead? e.g. GetCurrentExecution in sql/execution.go.
we still do double write so doesn't really matter right now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about the same, but I'll check and see if it's gonna be an improvement or not in another PR.
7ff6845
to
46835e2
Compare
_**READ BEFORE MERGING:** All PRs require approval by both Server AND SDK teams before merging! This is why the number of required approvals is "2" and not "1"--two reviewers from the same team is NOT sufficient. If your PR is not approved by someone in BOTH teams, it may be summarily reverted._ <!-- Describe what has changed in this PR --> **What changed?** Add `OnConflictOptions` to `StartWorkflowExecutionRequest` <!-- Tell your future self why have you made these changes --> **Why?** When the workflow id conflict policy is `USE_EXISTING`, `OnConflictOptions` add the ability to change the existing running workflow. <!-- Are there any breaking changes on binary or code level? --> **Breaking changes** <!-- If this breaks the Server, please provide the Server PR to merge right after this PR was merged. --> **Server PR** temporalio/temporal#7080
_**READ BEFORE MERGING:** All PRs require approval by both Server AND SDK teams before merging! This is why the number of required approvals is "2" and not "1"--two reviewers from the same team is NOT sufficient. If your PR is not approved by someone in BOTH teams, it may be summarily reverted._ <!-- Describe what has changed in this PR --> **What changed?** Add `OnConflictOptions` to `StartWorkflowExecutionRequest` <!-- Tell your future self why have you made these changes --> **Why?** When the workflow id conflict policy is `USE_EXISTING`, `OnConflictOptions` add the ability to change the existing running workflow. <!-- Are there any breaking changes on binary or code level? --> **Breaking changes** <!-- If this breaks the Server, please provide the Server PR to merge right after this PR was merged. --> **Server PR** temporalio/temporal#7080
9c3304b
to
924f1d9
Compare
924f1d9
to
d01c49a
Compare
What changed?
Add
OnConflictOptions
toStartWorkflowExecution
.USE_EXISTING
, then no-op;WorkflowExecutionOptionsUpdatedEvent
to the existing running workflow.Why?
Ability to attach completion callbacks to existing running workflows.
How did you test it?
WIP: writing tests
Potential risks
Documentation
Is hotfix candidate?