Skip to content

Conversation

rodrigozhou
Copy link
Contributor

@rodrigozhou rodrigozhou commented Jan 14, 2025

What changed?

Add OnConflictOptions to StartWorkflowExecution.

  • if the workflow id conflict policy is not USE_EXISTING, then no-op;
  • if the field is nil, then no-op;
  • otherwise, then it will add a WorkflowExecutionOptionsUpdatedEvent to the existing running workflow.

Why?

Ability to attach completion callbacks to existing running workflows.

How did you test it?

WIP: writing tests

Potential risks

Documentation

Is hotfix candidate?

@rodrigozhou rodrigozhou requested a review from a team as a code owner January 14, 2025 19:09
@rodrigozhou rodrigozhou force-pushed the rodrigozhou/on-conflict-options branch from b3cd07b to 74c0732 Compare January 14, 2025 19:55
@rodrigozhou rodrigozhou force-pushed the rodrigozhou/on-conflict-options branch 2 times, most recently from d2b40ac to b7ed8a7 Compare January 15, 2025 18:11
Copy link
Member

@bergundy bergundy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to see functional test coverage for as many of the possible cases. I don't feel comfortable merging this otherwise.
Some tests off the top of my head:

  • Test that the event is created as expected with links, callbacks, and request ID attached.
  • Test that callbacks appear in the describe response.
  • Test request ID based deduping works for the attached request ID

Comment on lines 126 to 132
RequestID string
RunID string
State enumsspb.WorkflowExecutionState
Status enumspb.WorkflowExecutionStatus
LastWriteVersion int64
StartTime *time.Time
AttachedRequestIDs []string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please document the difference between the RequestID and AttachedRequestIDs fields and when to use either.

go.mod Outdated
@@ -56,7 +56,7 @@ require (
go.opentelemetry.io/otel/sdk v1.31.0
go.opentelemetry.io/otel/sdk/metric v1.31.0
go.opentelemetry.io/otel/trace v1.31.0
go.temporal.io/api v1.43.1-0.20241219191931-461db5a23056
go.temporal.io/api v1.43.2-0.20250115015852-6bdd96365d41
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should merge the API PR before this server PR.

Comment on lines +478 to +484
if err := wh.validateOnConflictOptions(request.OnConflictOptions); err != nil {
return nil, err
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant for now but 🤷

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I know is unnecessary for now. Just being preemptive adding the function to validate.

Comment on lines 397 to 401
versioningOverride *workflowpb.VersioningOverride,
attachRequestID string,
attachCompletionCallbacks []*commonpb.Callback,
links []*commonpb.Link,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe instead accept a historypb.WorkflowExecutionOptionsUpdatedEventAttributes and links here to avoid this getting out of control?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I think it would make sense to use historyservice.UpdateWorkflowExecutionOptionsRequest, but it would add some logic in the event factory (do the mask processing here), which I'm not sure it makes sense.
I'd leave as it is for now, and refactor later. I'm already planning to refactor the startworkflow api to call the updateworkflowwithoptions api in which I can address this issue together.

Comment on lines 415 to 418
versioningOverride *workflowpb.VersioningOverride,
attachRequestID string,
attachCompletionCallbacks []*commonpb.Callback,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, probably better to accept the attributes here.

versioningOverride *workflowpb.VersioningOverride,
attachRequestID string,
attachCompletionCallbacks []*commonpb.Callback,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same, accept the event attributes here instead.

@rodrigozhou rodrigozhou force-pushed the rodrigozhou/on-conflict-options branch from b7ed8a7 to 33229ff Compare January 27, 2025 18:17
Comment on lines 128 to 152
func WorkflowExecutionRequestIDsToBlob(requestIDs *persistencespb.WorkflowExecutionRequestIDs) (*commonpb.DataBlob, error) {
return proto3Encode(requestIDs)
}

func WorkflowExecutionRequestIDsFromBlob(blob []byte, encoding string) (*persistencespb.WorkflowExecutionRequestIDs, error) {
if blob == nil {
return nil, nil
}
result := &persistencespb.WorkflowExecutionRequestIDs{}
return result, proto3Decode(blob, encoding, result)
}
Copy link
Contributor Author

@rodrigozhou rodrigozhou Jan 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is code duplication for now (from the serializer.go file).
We need to refactor the code to pass the serializer object to the DB plugins to be able to serialize/deserialize fields from DB.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will you do the refactor? Not blocking this PR, but ideally we'd at least track this.

Base automatically changed from rodrigozhou/refactor to main January 27, 2025 21:31
Copy link
Member

@bergundy bergundy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that we also need to ensure that the WorkflowExecutionOptionsUpdated event is cherry picked (reapplied) on conflict resolution.

executionStateDatablob, err := serialization.WorkflowExecutionStateToBlob(
&persistencespb.WorkflowExecutionState{
RunId: runID,
CreateRequestId: createRequestID,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we stop storing this now that we have the map?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd leave for a while first to transition to use the new field.

@@ -122,13 +122,18 @@ type (

// CurrentWorkflowConditionFailedError represents a failed conditional update for current workflow record
CurrentWorkflowConditionFailedError struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just return the entire *persistencespb.WorkflowExecutionState object here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, yes, we can do that, but it's kind of a big refactor. I can do this in another PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

"github.com/pborman/uuid"
"github.com/google/uuid"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I've been wanting to get rid of this dependency and forbid it in our linter but haven't gotten around to it yet.

Comment on lines 397 to +400
versioningOverride *workflowpb.VersioningOverride,
unsetVersioningOverride bool,
attachRequestID string,
attachCompletionCallbacks []*commonpb.Callback,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still think you should just accept *historypb.WorkflowExecutionOptionsUpdatedEventAttributes here and avoid this function signature growing every time we add another attribute.

Copy link
Contributor Author

@rodrigozhou rodrigozhou Jan 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit complicated. This is the event factory, ie., that's the function that builds *historypb.WorkflowExecutionOptionsUpdatedEventAttributes. I want to change to get the history request, but then it would add some logic to translate the request input to event attributes, which is not something we do so far.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should be able to accept the attributes in the event factory, not blocking this PR though.

Comment on lines 684 to 687
err := api.ResolveWorkflowIDReusePolicy(
workflowKey,
currentWorkflowConditionFailed.Status,
currentWorkflowConditionFailed.RequestID,
s.request.StartRequest.GetWorkflowIdReusePolicy(),
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is currentWorkflowConditionFailed.RequestID the current request's ID? Should we be checking that the request IDs in the DB and request matches? (asking for lack of context on this part of the code).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currentWorkflowConditionFailed.RequestID is the existing current workflow, which can be running or completed already.

@rodrigozhou rodrigozhou force-pushed the rodrigozhou/on-conflict-options branch from 33229ff to 51e80d1 Compare January 29, 2025 00:27
@rodrigozhou rodrigozhou requested a review from bergundy January 29, 2025 00:27
@rodrigozhou rodrigozhou force-pushed the rodrigozhou/on-conflict-options branch from 51e80d1 to 4c09e73 Compare January 29, 2025 02:33
Comment on lines +497 to +503
if expectedValue == nil {
if !actualV.IsNil() {
require.Failf(h.t, "", "Value of property %s.%s for EventID=%v expected to be nil", attrPrefix, attrName, eventID)
}
continue
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a fix in the history event attributes assertion.
If the field is expected to be nil, the assertion below at L527 will fail because actualV is something like var actualV any = (*T)(nil), and this is different than var expectedValue any = nil.

@rodrigozhou rodrigozhou force-pushed the rodrigozhou/on-conflict-options branch 2 times, most recently from abcee87 to 7ff6845 Compare January 31, 2025 01:11
Copy link
Member

@bergundy bergundy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM but I'm missing a few test cases:

  • Start without OnConflictOptions and same different request ID fails (for good measure)
  • Start fails due to validation (e.g. max callback limit reached), does not attach a request ID to the current executions table (e.g. a following request doesn't get duduped).
  • XDC test and/or reset test that verifies the update event is applied and start requests get properly deduped

@@ -1152,3 +1161,28 @@ func updateExecution(

return nil
}

func workflowExecutionStateFromCurrentExecutionsRow(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we change other places that uses current execution row to use the blob instead? e.g. GetCurrentExecution in sql/execution.go.

we still do double write so doesn't really matter right now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about the same, but I'll check and see if it's gonna be an improvement or not in another PR.

@rodrigozhou rodrigozhou force-pushed the rodrigozhou/on-conflict-options branch from 7ff6845 to 46835e2 Compare February 3, 2025 17:56
@rodrigozhou
Copy link
Contributor Author

XDC test and/or reset test that verifies the update event is applied and start requests get properly deduped

@bergundy I created a separate PR that reapplies the WorkflowExecutionOptionsUpdated history event: #7221

rodrigozhou added a commit to temporalio/api that referenced this pull request Feb 3, 2025
_**READ BEFORE MERGING:** All PRs require approval by both Server AND
SDK teams before merging! This is why the number of required approvals
is "2" and not "1"--two reviewers from the same team is NOT sufficient.
If your PR is not approved by someone in BOTH teams, it may be summarily
reverted._

<!-- Describe what has changed in this PR -->
**What changed?**
Add `OnConflictOptions` to `StartWorkflowExecutionRequest`

<!-- Tell your future self why have you made these changes -->
**Why?**
When the workflow id conflict policy is `USE_EXISTING`,
`OnConflictOptions` add the ability to change the existing running
workflow.

<!-- Are there any breaking changes on binary or code level? -->
**Breaking changes**


<!-- If this breaks the Server, please provide the Server PR to merge
right after this PR was merged. -->
**Server PR**
temporalio/temporal#7080
temporal-cicd bot pushed a commit to temporalio/api-go that referenced this pull request Feb 3, 2025
_**READ BEFORE MERGING:** All PRs require approval by both Server AND
SDK teams before merging! This is why the number of required approvals
is "2" and not "1"--two reviewers from the same team is NOT sufficient.
If your PR is not approved by someone in BOTH teams, it may be summarily
reverted._

<!-- Describe what has changed in this PR -->
**What changed?**
Add `OnConflictOptions` to `StartWorkflowExecutionRequest`

<!-- Tell your future self why have you made these changes -->
**Why?**
When the workflow id conflict policy is `USE_EXISTING`,
`OnConflictOptions` add the ability to change the existing running
workflow.

<!-- Are there any breaking changes on binary or code level? -->
**Breaking changes**

<!-- If this breaks the Server, please provide the Server PR to merge
right after this PR was merged. -->
**Server PR**
temporalio/temporal#7080
@rodrigozhou rodrigozhou force-pushed the rodrigozhou/on-conflict-options branch from 9c3304b to 924f1d9 Compare February 3, 2025 20:38
@rodrigozhou rodrigozhou force-pushed the rodrigozhou/on-conflict-options branch from 924f1d9 to d01c49a Compare February 3, 2025 20:39
@rodrigozhou rodrigozhou merged commit 72c6218 into main Feb 3, 2025
50 checks passed
@rodrigozhou rodrigozhou deleted the rodrigozhou/on-conflict-options branch February 3, 2025 21:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants