-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Use MutableSideEffect to access dynamic config from inside workflows #7268
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
122d338
to
9140a68
Compare
06ce272
to
e0fa16e
Compare
defaultVisibilityGrace = 3 * time.Minute | ||
) | ||
|
||
func DrainageWorkflowWithDC(dc *dynamicconfig.Collection, ns string) func(ctx workflow.Context, version *deploymentspb.WorkerDeploymentVersion, first bool) error { | ||
return func(ctx workflow.Context, version *deploymentspb.WorkerDeploymentVersion, first bool) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer not writing the whole workflow in an anonymous function inside the main one. Just return a closure that calls the workflow function with the config so it's still at the top level, and the thing that binds in the config can be mostly ignored.
defaultVisibilityGrace = 3 * time.Minute | ||
) | ||
|
||
func DrainageWorkflowWithDC(dc *dynamicconfig.Collection, ns string) func(ctx workflow.Context, version *deploymentspb.WorkerDeploymentVersion, first bool) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here and in the other workflows too: Instead of passing in the whole Collection, I think it would be good to follow the usual pattern of making a Config struct that has only the 1-2 needed values as functions, already bound to the Collection. It's easier to test that way. (Though I have mixed feelings on this one, when it gets big enough I think the Config struct pattern gets annoying)
defaultVisibilityGrace = 3 * time.Minute | ||
) | ||
|
||
func DrainageWorkflowWithDC(dc *dynamicconfig.Collection, ns string) func(ctx workflow.Context, version *deploymentspb.WorkerDeploymentVersion, first bool) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here and in the other workflows too: The config or dc or whatever parameter/field it is should be named something like unsafeConfig
or nondeterministicConfig
or something better, and with a big comment, to make it clear that you can't just use it from the workflow, it has to be done in a [mutable]sideeffect. I feel like it's inviting bugs otherwise
} | ||
} | ||
} | ||
|
||
func getRefreshInterval(ctx workflow.Context, dc *dynamicconfig.Collection, ns string) (time.Duration, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we consolidate all these so there's one mutablesideeffect that returns a struct of all the needed values?
aDur, ok := a.(time.Duration) | ||
if !ok { | ||
return false | ||
} | ||
bDur, ok := b.(time.Duration) | ||
if !ok { | ||
return false | ||
} | ||
return aDur == bDur |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this not work?
aDur, ok := a.(time.Duration) | |
if !ok { | |
return false | |
} | |
bDur, ok := b.(time.Duration) | |
if !ok { | |
return false | |
} | |
return aDur == bDur | |
return a == b |
} | ||
} | ||
} | ||
|
||
func getRefreshInterval(ctx workflow.Context, dc *dynamicconfig.Collection, ns string) (time.Duration, error) { | ||
get := func(ctx workflow.Context) interface{} { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use any
everywhere, it's more readable
@@ -154,7 +161,7 @@ func (d *WorkflowRunner) run(ctx workflow.Context) error { | |||
// Note, if update requests come in faster than they | |||
// are handled, there will not be a moment where the workflow has | |||
// nothing pending which means this will run forever. | |||
return workflow.NewContinueAsNewError(ctx, Workflow, d.WorkerDeploymentWorkflowArgs) | |||
return workflow.NewContinueAsNewError(ctx, WorkflowWithDC(d.dc), d.WorkerDeploymentWorkflowArgs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use the name here, I'm worried it'll get confused with all the closures.
return workflow.NewContinueAsNewError(ctx, WorkflowWithDC(d.dc), d.WorkerDeploymentWorkflowArgs) | |
return workflow.NewContinueAsNewError(ctx, WorkerDeploymentWorkflowType, d.WorkerDeploymentWorkflowArgs) |
getMaxVersionsInDeployment := func(ctx workflow.Context) interface{} { | ||
return dynamicconfig.MatchingMaxVersionsInDeployment.Get(d.dc)(d.a.namespace.Name().String()) | ||
} | ||
intEq := func(a, b interface{}) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same == here?
d.logger.Error("error decoding max versions: ", err) | ||
return err | ||
} | ||
if len(d.State.Versions) >= maxVersions { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mixing the sideeffect and logic in one function feels a little weird, can you have one function to handle all the side effect business, returning a struct of concrete values (or a single one) and another function for the workflow logic (comparison)?
…v-limits # Conflicts: # service/worker/workerdeployment/workflow.go
@@ -411,7 +411,9 @@ func (d *VersionWorkflowRunner) handleRegisterWorker(ctx workflow.Context, args | |||
err = workflow.Await(ctx, func() bool { return d.VersionState.StartedDeploymentWorkflow }) | |||
if err != nil { | |||
d.logger.Error("Update canceled before deployment workflow started") | |||
return err | |||
// TODO: This is likely due to too many deployments, but make sure we exculed other possible errors here and send a proper error message all the time. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// TODO: This is likely due to too many deployments, but make sure we exculed other possible errors here and send a proper error message all the time. | |
// TODO (Carly): This is likely due to too many deployments, but make sure we exculed other possible errors here and send a proper error message all the time. |
RequestId: d.newUUID(ctx), | ||
}).Get(ctx, nil) | ||
if err != nil { | ||
// TODO: make sure the error message that goes to the user is informative and has the limit mentioned |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// TODO: make sure the error message that goes to the user is informative and has the limit mentioned | |
// TODO (Carly): make sure the error message that goes to the user is informative and has the limit mentioned |
@@ -411,7 +411,9 @@ func (d *VersionWorkflowRunner) handleRegisterWorker(ctx workflow.Context, args | |||
err = workflow.Await(ctx, func() bool { return d.VersionState.StartedDeploymentWorkflow }) | |||
if err != nil { | |||
d.logger.Error("Update canceled before deployment workflow started") | |||
return err | |||
// TODO: This is likely due to too many deployments, but make sure we exculed other possible errors here and send a proper error message all the time. | |||
// TODO: mention the limit in here or make sure matching does in the error returned to the poller |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// TODO: mention the limit in here or make sure matching does in the error returned to the poller | |
// TODO (Carly): mention the limit in here or make sure matching does in the error returned to the poller |
maxVersions := d.getMaxVersions(ctx) | ||
|
||
if len(d.State.Versions) >= maxVersions { | ||
return temporal.NewApplicationError(fmt.Sprintf("cannot add version, already at max versions %d", maxVersions), errTooManyVersions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return temporal.NewApplicationError(fmt.Sprintf("cannot add version, already at max versions %d", maxVersions), errTooManyVersions) | |
err := d.tryDeleteVersion(ctx) | |
if err != nil { | |
return temporal.NewApplicationError(fmt.Sprintf("cannot add version, already at max versions %d", maxVersions), errTooManyVersions) | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
important
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the test that tests this was skipped.. which is why it didn't catch this regfression
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, good catch!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I approve this once d.tryDeleteVersion is added back. I can't technically approve it since it's my PR though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some suggestions and nitpicks but nothing blocking
refresh, err := getSafeDurationConfig(ctx, "getDrainageRefreshInterval", unsafeRefreshIntervalGetter, defaultVisibilityRefresh) | ||
if err != nil { | ||
return err | ||
} | ||
_ = workflow.Sleep(ctx, refresh) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the delay of a workflow.Sleep is allowed to change without determinism error, so actually you can use the config directly here. also ok to use sideeffect to consistent if you want.
@@ -139,3 +144,18 @@ func DecodeWorkerDeploymentMemo(memo *commonpb.Memo) *deploymentspb.WorkerDeploy | |||
} | |||
return &workerDeploymentWorkflowMemo | |||
} | |||
|
|||
func getSafeDurationConfig(ctx workflow.Context, id string, unsafeGetter func() any, defaultValue time.Duration) (time.Duration, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can easily be generic if you want to do that now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have to do it later
@@ -411,7 +411,9 @@ func (d *VersionWorkflowRunner) handleRegisterWorker(ctx workflow.Context, args | |||
err = workflow.Await(ctx, func() bool { return d.VersionState.StartedDeploymentWorkflow }) | |||
if err != nil { | |||
d.logger.Error("Update canceled before deployment workflow started") | |||
return err | |||
// TODO: This is likely due to too many deployments, but make sure we exculed other possible errors here and send a proper error message all the time. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// TODO: This is likely due to too many deployments, but make sure we exculed other possible errors here and send a proper error message all the time. | |
// TODO: This is likely due to too many deployments, but make sure we excluded other possible errors here and send a proper error message all the time. |
return nil | ||
} | ||
|
||
func (d *WorkflowRunner) getMaxVersions(ctx workflow.Context) int { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make getSafeDurationConfig
generic and use it here?
…7268) ## What changed? Use MutableSideEffect to access dynamic config from inside workflows. ## Why? To prevent non-determinism in the workflow code. To allow the workflows to use the latest dynamic config values at all times. ## How did you test it? Tests are failing for other reasons, so I'm unable to test until I pull in those changes. ## Potential risks <!-- Assuming the worst case, what can be broken when deploying this change to production? --> ## Documentation <!-- Have you made sure this change doesn't falsify anything currently stated in `docs/`? If significant new behavior is added, have you described that in `docs/`? --> ## Is hotfix candidate? <!-- Is this PR a hotfix candidate or does it require a notification to be sent to the broader community? (Yes/No) --> --------- Co-authored-by: ShahabT <shahab.tajik@temporal.io>
…7268) ## What changed? Use MutableSideEffect to access dynamic config from inside workflows. ## Why? To prevent non-determinism in the workflow code. To allow the workflows to use the latest dynamic config values at all times. ## How did you test it? Tests are failing for other reasons, so I'm unable to test until I pull in those changes. ## Potential risks <!-- Assuming the worst case, what can be broken when deploying this change to production? --> ## Documentation <!-- Have you made sure this change doesn't falsify anything currently stated in `docs/`? If significant new behavior is added, have you described that in `docs/`? --> ## Is hotfix candidate? <!-- Is this PR a hotfix candidate or does it require a notification to be sent to the broader community? (Yes/No) --> --------- Co-authored-by: ShahabT <shahab.tajik@temporal.io>
What changed?
Use MutableSideEffect to access dynamic config from inside workflows.
Why?
To prevent non-determinism in the workflow code.
To allow the workflows to use the latest dynamic config values at all times.
How did you test it?
Tests are failing for other reasons, so I'm unable to test until I pull in those changes.
Potential risks
Documentation
Is hotfix candidate?