Skip to content

Conversation

carlydf
Copy link
Contributor

@carlydf carlydf commented Feb 6, 2025

What changed?

Use MutableSideEffect to access dynamic config from inside workflows.

Why?

To prevent non-determinism in the workflow code.
To allow the workflows to use the latest dynamic config values at all times.

How did you test it?

Tests are failing for other reasons, so I'm unable to test until I pull in those changes.

Potential risks

Documentation

Is hotfix candidate?

@carlydf carlydf requested a review from a team as a code owner February 6, 2025 04:15
@carlydf carlydf changed the title pass dynamic config collection to workflows Use MutableSideEffect to access dynamic config from inside workflows Feb 6, 2025
@ShahabT ShahabT force-pushed the versioning-3.1-merge branch 3 times, most recently from 122d338 to 9140a68 Compare February 6, 2025 17:17
defaultVisibilityGrace = 3 * time.Minute
)

func DrainageWorkflowWithDC(dc *dynamicconfig.Collection, ns string) func(ctx workflow.Context, version *deploymentspb.WorkerDeploymentVersion, first bool) error {
return func(ctx workflow.Context, version *deploymentspb.WorkerDeploymentVersion, first bool) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer not writing the whole workflow in an anonymous function inside the main one. Just return a closure that calls the workflow function with the config so it's still at the top level, and the thing that binds in the config can be mostly ignored.

defaultVisibilityGrace = 3 * time.Minute
)

func DrainageWorkflowWithDC(dc *dynamicconfig.Collection, ns string) func(ctx workflow.Context, version *deploymentspb.WorkerDeploymentVersion, first bool) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and in the other workflows too: Instead of passing in the whole Collection, I think it would be good to follow the usual pattern of making a Config struct that has only the 1-2 needed values as functions, already bound to the Collection. It's easier to test that way. (Though I have mixed feelings on this one, when it gets big enough I think the Config struct pattern gets annoying)

defaultVisibilityGrace = 3 * time.Minute
)

func DrainageWorkflowWithDC(dc *dynamicconfig.Collection, ns string) func(ctx workflow.Context, version *deploymentspb.WorkerDeploymentVersion, first bool) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and in the other workflows too: The config or dc or whatever parameter/field it is should be named something like unsafeConfig or nondeterministicConfig or something better, and with a big comment, to make it clear that you can't just use it from the workflow, it has to be done in a [mutable]sideeffect. I feel like it's inviting bugs otherwise

}
}
}

func getRefreshInterval(ctx workflow.Context, dc *dynamicconfig.Collection, ns string) (time.Duration, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we consolidate all these so there's one mutablesideeffect that returns a struct of all the needed values?

Comment on lines 129 to 137
aDur, ok := a.(time.Duration)
if !ok {
return false
}
bDur, ok := b.(time.Duration)
if !ok {
return false
}
return aDur == bDur
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this not work?

Suggested change
aDur, ok := a.(time.Duration)
if !ok {
return false
}
bDur, ok := b.(time.Duration)
if !ok {
return false
}
return aDur == bDur
return a == b

}
}
}

func getRefreshInterval(ctx workflow.Context, dc *dynamicconfig.Collection, ns string) (time.Duration, error) {
get := func(ctx workflow.Context) interface{} {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use any everywhere, it's more readable

@@ -154,7 +161,7 @@ func (d *WorkflowRunner) run(ctx workflow.Context) error {
// Note, if update requests come in faster than they
// are handled, there will not be a moment where the workflow has
// nothing pending which means this will run forever.
return workflow.NewContinueAsNewError(ctx, Workflow, d.WorkerDeploymentWorkflowArgs)
return workflow.NewContinueAsNewError(ctx, WorkflowWithDC(d.dc), d.WorkerDeploymentWorkflowArgs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the name here, I'm worried it'll get confused with all the closures.

Suggested change
return workflow.NewContinueAsNewError(ctx, WorkflowWithDC(d.dc), d.WorkerDeploymentWorkflowArgs)
return workflow.NewContinueAsNewError(ctx, WorkerDeploymentWorkflowType, d.WorkerDeploymentWorkflowArgs)

getMaxVersionsInDeployment := func(ctx workflow.Context) interface{} {
return dynamicconfig.MatchingMaxVersionsInDeployment.Get(d.dc)(d.a.namespace.Name().String())
}
intEq := func(a, b interface{}) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same == here?

d.logger.Error("error decoding max versions: ", err)
return err
}
if len(d.State.Versions) >= maxVersions {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mixing the sideeffect and logic in one function feels a little weird, can you have one function to handle all the side effect business, returning a struct of concrete values (or a single one) and another function for the workflow logic (comparison)?

@@ -411,7 +411,9 @@ func (d *VersionWorkflowRunner) handleRegisterWorker(ctx workflow.Context, args
err = workflow.Await(ctx, func() bool { return d.VersionState.StartedDeploymentWorkflow })
if err != nil {
d.logger.Error("Update canceled before deployment workflow started")
return err
// TODO: This is likely due to too many deployments, but make sure we exculed other possible errors here and send a proper error message all the time.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// TODO: This is likely due to too many deployments, but make sure we exculed other possible errors here and send a proper error message all the time.
// TODO (Carly): This is likely due to too many deployments, but make sure we exculed other possible errors here and send a proper error message all the time.

RequestId: d.newUUID(ctx),
}).Get(ctx, nil)
if err != nil {
// TODO: make sure the error message that goes to the user is informative and has the limit mentioned
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// TODO: make sure the error message that goes to the user is informative and has the limit mentioned
// TODO (Carly): make sure the error message that goes to the user is informative and has the limit mentioned

@@ -411,7 +411,9 @@ func (d *VersionWorkflowRunner) handleRegisterWorker(ctx workflow.Context, args
err = workflow.Await(ctx, func() bool { return d.VersionState.StartedDeploymentWorkflow })
if err != nil {
d.logger.Error("Update canceled before deployment workflow started")
return err
// TODO: This is likely due to too many deployments, but make sure we exculed other possible errors here and send a proper error message all the time.
// TODO: mention the limit in here or make sure matching does in the error returned to the poller
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// TODO: mention the limit in here or make sure matching does in the error returned to the poller
// TODO (Carly): mention the limit in here or make sure matching does in the error returned to the poller

maxVersions := d.getMaxVersions(ctx)

if len(d.State.Versions) >= maxVersions {
return temporal.NewApplicationError(fmt.Sprintf("cannot add version, already at max versions %d", maxVersions), errTooManyVersions)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return temporal.NewApplicationError(fmt.Sprintf("cannot add version, already at max versions %d", maxVersions), errTooManyVersions)
err := d.tryDeleteVersion(ctx)
if err != nil {
return temporal.NewApplicationError(fmt.Sprintf("cannot add version, already at max versions %d", maxVersions), errTooManyVersions)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

important

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the test that tests this was skipped.. which is why it didn't catch this regfression

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, good catch!

Copy link
Contributor Author

@carlydf carlydf left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I approve this once d.tryDeleteVersion is added back. I can't technically approve it since it's my PR though

Copy link
Contributor

@dnr dnr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some suggestions and nitpicks but nothing blocking

Comment on lines +102 to +106
refresh, err := getSafeDurationConfig(ctx, "getDrainageRefreshInterval", unsafeRefreshIntervalGetter, defaultVisibilityRefresh)
if err != nil {
return err
}
_ = workflow.Sleep(ctx, refresh)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the delay of a workflow.Sleep is allowed to change without determinism error, so actually you can use the config directly here. also ok to use sideeffect to consistent if you want.

@@ -139,3 +144,18 @@ func DecodeWorkerDeploymentMemo(memo *commonpb.Memo) *deploymentspb.WorkerDeploy
}
return &workerDeploymentWorkflowMemo
}

func getSafeDurationConfig(ctx workflow.Context, id string, unsafeGetter func() any, defaultValue time.Duration) (time.Duration, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can easily be generic if you want to do that now

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have to do it later

@@ -411,7 +411,9 @@ func (d *VersionWorkflowRunner) handleRegisterWorker(ctx workflow.Context, args
err = workflow.Await(ctx, func() bool { return d.VersionState.StartedDeploymentWorkflow })
if err != nil {
d.logger.Error("Update canceled before deployment workflow started")
return err
// TODO: This is likely due to too many deployments, but make sure we exculed other possible errors here and send a proper error message all the time.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// TODO: This is likely due to too many deployments, but make sure we exculed other possible errors here and send a proper error message all the time.
// TODO: This is likely due to too many deployments, but make sure we excluded other possible errors here and send a proper error message all the time.

return nil
}

func (d *WorkflowRunner) getMaxVersions(ctx workflow.Context) int {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make getSafeDurationConfig generic and use it here?

@ShahabT ShahabT merged commit be8d90a into versioning-3.1-merge Feb 6, 2025
8 of 9 checks passed
@ShahabT ShahabT deleted the cdf/wv-limits branch February 6, 2025 21:58
ShahabT added a commit that referenced this pull request Feb 6, 2025
…7268)

## What changed?
Use MutableSideEffect to access dynamic config from inside workflows.

## Why?
To prevent non-determinism in the workflow code.
To allow the workflows to use the latest dynamic config values at all
times.

## How did you test it?
Tests are failing for other reasons, so I'm unable to test until I pull
in those changes.

## Potential risks
<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->

## Documentation
<!-- Have you made sure this change doesn't falsify anything currently
stated in `docs/`? If significant
new behavior is added, have you described that in `docs/`? -->

## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->

---------

Co-authored-by: ShahabT <shahab.tajik@temporal.io>
ShahabT added a commit that referenced this pull request Feb 6, 2025
…7268)

## What changed?
Use MutableSideEffect to access dynamic config from inside workflows.

## Why?
To prevent non-determinism in the workflow code.
To allow the workflows to use the latest dynamic config values at all
times.

## How did you test it?
Tests are failing for other reasons, so I'm unable to test until I pull
in those changes.

## Potential risks
<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->

## Documentation
<!-- Have you made sure this change doesn't falsify anything currently
stated in `docs/`? If significant
new behavior is added, have you described that in `docs/`? -->

## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->

---------

Co-authored-by: ShahabT <shahab.tajik@temporal.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants