Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ require (
go.opentelemetry.io/otel/sdk v1.34.0
go.opentelemetry.io/otel/sdk/metric v1.34.0
go.opentelemetry.io/otel/trace v1.34.0
go.temporal.io/api v1.50.1-0.20250716005608-066a1646935b
go.temporal.io/api v1.50.1-0.20250718211104-1dc43113346a
go.temporal.io/sdk v1.34.0
go.uber.org/automaxprocs v1.6.0
go.uber.org/fx v1.23.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -399,8 +399,8 @@ go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4=
go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4=
go.temporal.io/api v1.50.1-0.20250716005608-066a1646935b h1:LYSZOIWDNtnQ3vX+2YZKoAjIuLtxRv3oTMD8/Zot0w4=
go.temporal.io/api v1.50.1-0.20250716005608-066a1646935b/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM=
go.temporal.io/api v1.50.1-0.20250718211104-1dc43113346a h1:6FHXvm124f1Sbge/4VrARNeEMpDkQyNr5axDfebE580=
go.temporal.io/api v1.50.1-0.20250718211104-1dc43113346a/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM=
go.temporal.io/sdk v1.34.0 h1:VLg/h6ny7GvLFVoQPqz2NcC93V9yXboQwblkRvZ1cZE=
go.temporal.io/sdk v1.34.0/go.mod h1:iE4U5vFrH3asOhqpBBphpj9zNtw8btp8+MSaf5A0D3w=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
Expand Down
123 changes: 108 additions & 15 deletions service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4505,6 +4505,8 @@ func (wh *WorkflowHandler) StartBatchOperation(
var resetParams batcher.ResetParams
var updateOptionsParams batcher.UpdateOptionsParams
var unpauseActivitiesParams batcher.UnpauseActivitiesParams
var resetActivitiesParams batcher.ResetActivitiesParams
var updateActivitiesOptionsParams batcher.UpdateActivitiesOptionsParams
switch op := request.Operation.(type) {
case *workflowservice.StartBatchOperationRequest_TerminationOperation:
identity = op.TerminationOperation.GetIdentity()
Expand Down Expand Up @@ -4553,7 +4555,9 @@ func (wh *WorkflowHandler) StartBatchOperation(
identity = op.UpdateWorkflowOptionsOperation.GetIdentity()
operationType = batcher.BatchTypeUpdateOptions
updateOptionsParams.WorkflowExecutionOptions = op.UpdateWorkflowOptionsOperation.GetWorkflowExecutionOptions()
updateOptionsParams.UpdateMask = op.UpdateWorkflowOptionsOperation.GetUpdateMask()
if updateMask := op.UpdateWorkflowOptionsOperation.GetUpdateMask(); updateMask != nil {
updateOptionsParams.UpdateMask = &batcher.FieldMask{Paths: updateMask.Paths}
}
// TODO(carlydf): remove hacky usage of deprecated fields later, after adding support for oneof in BatchParams encoder
if o := updateOptionsParams.WorkflowExecutionOptions.VersioningOverride; o.GetOverride() != nil {
deprecatedOverride := &workflowpb.VersioningOverride{}
Expand Down Expand Up @@ -4583,7 +4587,7 @@ func (wh *WorkflowHandler) StartBatchOperation(
visibilityQuery = fmt.Sprintf("(%s) AND (%s)", visibilityQuery, unpauseCause)
unpauseActivitiesParams.ActivityType = a.Type
case *batchpb.BatchOperationUnpauseActivities_MatchAll:
if a.MatchAll == false {
if !a.MatchAll {
return nil, serviceerror.NewInvalidArgument("Either activity type must be set, or match all should be set to true")
}
wildCardUnpause := fmt.Sprintf("%s STARTS_WITH 'property:activityType='", searchattribute.TemporalPauseInfo)
Expand All @@ -4594,24 +4598,107 @@ func (wh *WorkflowHandler) StartBatchOperation(
unpauseActivitiesParams.ResetAttempts = op.UnpauseActivitiesOperation.ResetAttempts
unpauseActivitiesParams.ResetHeartbeat = op.UnpauseActivitiesOperation.ResetHeartbeat
unpauseActivitiesParams.Jitter = op.UnpauseActivitiesOperation.Jitter.AsDuration()
case *workflowservice.StartBatchOperationRequest_ResetActivitiesOperation:
operationType = batcher.BatchTypeResetActivities
if op.ResetActivitiesOperation == nil {
return nil, serviceerror.NewInvalidArgument("reset activities operation is not set")
}
if op.ResetActivitiesOperation.GetActivity() == nil {
return nil, serviceerror.NewInvalidArgument("activity filter must be set")
}

switch a := op.ResetActivitiesOperation.GetActivity().(type) {
case *batchpb.BatchOperationResetActivities_Type:
if len(a.Type) == 0 {
return nil, serviceerror.NewInvalidArgument("Either activity type must be set, or match all should be set to true")
}
resetActivitiesParams.ActivityType = a.Type
case *batchpb.BatchOperationResetActivities_MatchAll:
if !a.MatchAll {
return nil, serviceerror.NewInvalidArgument("Either activity type must be set, or match all should be set to true")
}
resetActivitiesParams.MatchAll = true
}

resetActivitiesParams.ResetAttempts = op.ResetActivitiesOperation.ResetAttempts
resetActivitiesParams.ResetHeartbeat = op.ResetActivitiesOperation.ResetHeartbeat
resetActivitiesParams.Jitter = op.ResetActivitiesOperation.Jitter.AsDuration()
resetActivitiesParams.KeepPaused = op.ResetActivitiesOperation.KeepPaused
resetActivitiesParams.RestoreOriginalOptions = op.ResetActivitiesOperation.RestoreOriginalOptions
resetActivitiesParams.Identity = op.ResetActivitiesOperation.GetIdentity()
case *workflowservice.StartBatchOperationRequest_UpdateActivityOptionsOperation:
operationType = batcher.BatchTypeUpdateActivitiesOptions
if op.UpdateActivityOptionsOperation == nil {
return nil, serviceerror.NewInvalidArgument("update activity options operation is not set")
}
if op.UpdateActivityOptionsOperation.GetActivityOptions() != nil && op.UpdateActivityOptionsOperation.GetRestoreOriginal() {
return nil, serviceerror.NewInvalidArgument("cannot set both activity options and restore original")
}
if op.UpdateActivityOptionsOperation.GetActivityOptions() == nil && !op.UpdateActivityOptionsOperation.GetRestoreOriginal() {
return nil, serviceerror.NewInvalidArgument("Either activity type must be set, or restore original should be set to true")
}

switch a := op.UpdateActivityOptionsOperation.GetActivity().(type) {
case *batchpb.BatchOperationUpdateActivityOptions_Type:
if len(a.Type) == 0 {
return nil, serviceerror.NewInvalidArgument("Either activity type must be set, or match all should be set to true")
}
updateActivitiesOptionsParams.ActivityType = a.Type
case *batchpb.BatchOperationUpdateActivityOptions_MatchAll:
if !a.MatchAll {
return nil, serviceerror.NewInvalidArgument("Either activity type must be set, or match all should be set to true")
}
updateActivitiesOptionsParams.MatchAll = true
}

updateActivitiesOptionsParams.RestoreOriginal = op.UpdateActivityOptionsOperation.GetRestoreOriginal()
updateActivitiesOptionsParams.Identity = op.UpdateActivityOptionsOperation.GetIdentity()
if updateMask := op.UpdateActivityOptionsOperation.GetUpdateMask(); updateMask != nil {
updateActivitiesOptionsParams.UpdateMask = &batcher.FieldMask{Paths: updateMask.Paths}
}
if ao := op.UpdateActivityOptionsOperation.GetActivityOptions(); ao != nil {
updateActivitiesOptionsParams.ActivityOptions = &batcher.ActivityOptions{
ScheduleToStartTimeout: ao.ScheduleToStartTimeout.AsDuration(),
ScheduleToCloseTime: ao.ScheduleToCloseTimeout.AsDuration(),
StartToCloseTimeout: ao.StartToCloseTimeout.AsDuration(),
HeartbeatTimeout: ao.HeartbeatTimeout.AsDuration(),
}
if rp := ao.RetryPolicy; rp != nil {
updateActivitiesOptionsParams.ActivityOptions.RetryPolicy = &batcher.RetryPolicy{
InitialInterval: rp.InitialInterval.AsDuration(),
MaximumInterval: rp.MaximumInterval.AsDuration(),
BackoffCoefficient: rp.BackoffCoefficient,
NonRetryableErrorTypes: rp.NonRetryableErrorTypes,
MaximumAttempts: rp.MaximumAttempts,
}
}
if tq := ao.TaskQueue; tq != nil {
updateActivitiesOptionsParams.ActivityOptions.TaskQueue = &batcher.TaskQueue{
Name: tq.Name,
Kind: int32(tq.Kind),
}
}
}
default:
return nil, serviceerror.NewInvalidArgumentf("The operation type %T is not supported", op)
}

input := &batcher.BatchParams{
Namespace: request.GetNamespace(),
Query: visibilityQuery,
Executions: request.GetExecutions(),
Reason: request.GetReason(),
BatchType: operationType,
RPS: float64(request.GetMaxOperationsPerSecond()),
TerminateParams: batcher.TerminateParams{},
CancelParams: batcher.CancelParams{},
SignalParams: signalParams,
DeleteParams: batcher.DeleteParams{},
ResetParams: resetParams,
UpdateOptionsParams: updateOptionsParams,
UnpauseActivitiesParams: unpauseActivitiesParams,
Namespace: request.GetNamespace(),
Query: visibilityQuery,
Executions: request.GetExecutions(),
Reason: request.GetReason(),
BatchType: operationType,
RPS: float64(request.GetMaxOperationsPerSecond()),
TerminateParams: batcher.TerminateParams{},
CancelParams: batcher.CancelParams{},
SignalParams: signalParams,
DeleteParams: batcher.DeleteParams{},
ResetParams: resetParams,
UpdateOptionsParams: updateOptionsParams,
UnpauseActivitiesParams: unpauseActivitiesParams,
ResetActivitiesParams: resetActivitiesParams,
UpdateActivitiesOptionsParams: updateActivitiesOptionsParams,
}
inputPayload, err := sdk.PreferProtoDataConverter.ToPayloads(input)
if err != nil {
Expand Down Expand Up @@ -4777,6 +4864,12 @@ func (wh *WorkflowHandler) DescribeBatchOperation(
operationType = enumspb.BATCH_OPERATION_TYPE_RESET
case batcher.BatchTypeUpdateOptions:
operationType = enumspb.BATCH_OPERATION_TYPE_UPDATE_EXECUTION_OPTIONS
case batcher.BatchTypeUpdateActivitiesOptions:
operationType = enumspb.BATCH_OPERATION_TYPE_UPDATE_ACTIVITY_OPTIONS
case batcher.BatchTypeResetActivities:
operationType = enumspb.BATCH_OPERATION_TYPE_RESET_ACTIVITY
case batcher.BatchTypeUnpauseActivities:
operationType = enumspb.BATCH_OPERATION_TYPE_UNPAUSE_ACTIVITY
default:
operationType = enumspb.BATCH_OPERATION_TYPE_UNSPECIFIED
wh.throttledLogger.Warn("Unknown batch operation type", tag.NewStringTag("batch-operation-type", operationTypeString))
Expand Down
75 changes: 74 additions & 1 deletion service/worker/batcher/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/pborman/uuid"
activitypb "go.temporal.io/api/activity/v1"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
Expand All @@ -25,6 +26,7 @@ import (
"go.temporal.io/server/common/sdk"
"golang.org/x/time/rate"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/fieldmaskpb"
)

const (
Expand Down Expand Up @@ -366,10 +368,81 @@ func startTaskProcessor(
RunId: runID,
},
WorkflowExecutionOptions: batchParams.UpdateOptionsParams.WorkflowExecutionOptions,
UpdateMask: batchParams.UpdateOptionsParams.UpdateMask,
UpdateMask: &fieldmaskpb.FieldMask{Paths: batchParams.UpdateOptionsParams.UpdateMask.Paths},
})
return err
})

case BatchTypeResetActivities:
err = processTask(ctx, limiter, task,
func(workflowID, runID string) error {
resetRequest := &workflowservice.ResetActivityRequest{
Namespace: batchParams.Namespace,
Execution: &commonpb.WorkflowExecution{
WorkflowId: workflowID,
RunId: runID,
},
Identity: batchParams.ResetActivitiesParams.Identity,
Activity: &workflowservice.ResetActivityRequest_Type{Type: batchParams.ResetActivitiesParams.ActivityType},
ResetHeartbeat: batchParams.ResetActivitiesParams.ResetHeartbeat,
Jitter: durationpb.New(batchParams.ResetActivitiesParams.Jitter),
KeepPaused: batchParams.ResetActivitiesParams.KeepPaused,
RestoreOriginalOptions: batchParams.ResetActivitiesParams.RestoreOriginalOptions,
}

if batchParams.ResetActivitiesParams.MatchAll {
resetRequest.Activity = &workflowservice.ResetActivityRequest_MatchAll{MatchAll: true}
} else {
resetRequest.Activity = &workflowservice.ResetActivityRequest_Type{Type: batchParams.ResetActivitiesParams.ActivityType}
}

_, err = frontendClient.ResetActivity(ctx, resetRequest)
return err
})
case BatchTypeUpdateActivitiesOptions:
err = processTask(ctx, limiter, task,
func(workflowID, runID string) error {
updateRequest := &workflowservice.UpdateActivityOptionsRequest{
Namespace: batchParams.Namespace,
Execution: &commonpb.WorkflowExecution{
WorkflowId: workflowID,
RunId: runID,
},
Activity: &workflowservice.UpdateActivityOptionsRequest_Type{Type: batchParams.UpdateActivitiesOptionsParams.ActivityType},
UpdateMask: &fieldmaskpb.FieldMask{Paths: batchParams.UpdateActivitiesOptionsParams.UpdateMask.Paths},
RestoreOriginal: batchParams.UpdateActivitiesOptionsParams.RestoreOriginal,
Identity: batchParams.UpdateActivitiesOptionsParams.Identity,
}

if ao := batchParams.UpdateActivitiesOptionsParams.ActivityOptions; ao != nil {
updateRequest.ActivityOptions = &activitypb.ActivityOptions{
ScheduleToStartTimeout: durationpb.New(ao.ScheduleToStartTimeout),
ScheduleToCloseTimeout: durationpb.New(ao.ScheduleToCloseTime),
StartToCloseTimeout: durationpb.New(ao.StartToCloseTimeout),
HeartbeatTimeout: durationpb.New(ao.HeartbeatTimeout),
}

if rp := ao.RetryPolicy; rp != nil {
updateRequest.ActivityOptions.RetryPolicy = &commonpb.RetryPolicy{
InitialInterval: durationpb.New(rp.InitialInterval),
BackoffCoefficient: rp.BackoffCoefficient,
MaximumInterval: durationpb.New(rp.MaximumInterval),
MaximumAttempts: rp.MaximumAttempts,
NonRetryableErrorTypes: rp.NonRetryableErrorTypes,
}
}
}

if batchParams.UpdateActivitiesOptionsParams.MatchAll {
updateRequest.Activity = &workflowservice.UpdateActivityOptionsRequest_MatchAll{MatchAll: true}
} else {
updateRequest.Activity = &workflowservice.UpdateActivityOptionsRequest_Type{Type: batchParams.UpdateActivitiesOptionsParams.ActivityType}
}

_, err = frontendClient.UpdateActivityOptions(ctx, updateRequest)
return err
})
// QUESTION seankane (2025-07-18): why do we not have a default case and return an error? @yuri/@chetan
}
if err != nil {
metrics.BatcherProcessorFailures.With(metricsHandler).Record(1)
Expand Down
68 changes: 66 additions & 2 deletions service/worker/batcher/workflow.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package batcher

import (
"errors"
"fmt"
"time"

Expand All @@ -11,7 +12,6 @@ import (
"go.temporal.io/sdk/workflow"
"go.temporal.io/server/common/searchattribute"
"go.temporal.io/server/common/worker_versioning"
"google.golang.org/protobuf/types/known/fieldmaskpb"
)

const (
Expand Down Expand Up @@ -42,6 +42,10 @@ const (
BatchTypeUpdateOptions = "update_options"
// BatchTypePauseActivities is batch type for unpausing activities
BatchTypeUnpauseActivities = "unpause_activities"
// BatchTypeUpdateActivitiesOptions is batch type for updating the options of activities
BatchTypeUpdateActivitiesOptions = "update_activity_options"
// BatchTypeResetActivities is batch type for resetting activities
BatchTypeResetActivities = "reset_activities"
)

var (
Expand Down Expand Up @@ -92,7 +96,7 @@ type (
// UpdateOptionsParams is the parameters for updating workflow execution options
UpdateOptionsParams struct {
WorkflowExecutionOptions *workflowpb.WorkflowExecutionOptions
UpdateMask *fieldmaskpb.FieldMask
UpdateMask *FieldMask
}

UnpauseActivitiesParams struct {
Expand All @@ -103,6 +107,52 @@ type (
Jitter time.Duration
}

UpdateActivitiesOptionsParams struct {
Identity string
ActivityType string
MatchAll bool
ActivityOptions *ActivityOptions
UpdateMask *FieldMask
RestoreOriginal bool
}

ActivityOptions struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is suboptimal solution. Instead it would have been better to serialize the proto structs manually or to define the workflow inputs with protos.

TaskQueue *TaskQueue
ScheduleToCloseTime time.Duration
ScheduleToStartTimeout time.Duration
StartToCloseTimeout time.Duration
HeartbeatTimeout time.Duration
RetryPolicy *RetryPolicy
}

TaskQueue struct {
Name string
Kind int32
}

RetryPolicy struct {
InitialInterval time.Duration
BackoffCoefficient float64
MaximumInterval time.Duration
MaximumAttempts int32
NonRetryableErrorTypes []string
}

ResetActivitiesParams struct {
Identity string
ActivityType string
MatchAll bool
ResetAttempts bool
ResetHeartbeat bool
KeepPaused bool
Jitter time.Duration
RestoreOriginalOptions bool
}

FieldMask struct {
Paths []string
}

// BatchParams is the parameters for batch operation workflow
BatchParams struct {
// Target namespace to execute batch operation
Expand Down Expand Up @@ -131,6 +181,10 @@ type (
UpdateOptionsParams UpdateOptionsParams
// UnpauseActivitiesParams is params only for BatchTypeUnpauseActivities
UnpauseActivitiesParams UnpauseActivitiesParams
// UpdateActivitiesOptionsParams is params only for BatchTypeUpdateActivitiesOptions
UpdateActivitiesOptionsParams UpdateActivitiesOptionsParams
// ResetActivitiesParams is params only for BatchTypeResetActivities
ResetActivitiesParams ResetActivitiesParams

// RPS sets the requests-per-second limit for the batch.
// The default (and max) is defined by `worker.BatcherRPS` in the dynamic config.
Expand Down Expand Up @@ -257,6 +311,16 @@ func validateParams(params BatchParams) error {
return fmt.Errorf("must provide ActivityType or MatchAll")
}
return nil
case BatchTypeResetActivities:
if params.ResetActivitiesParams.ActivityType == "" && !params.ResetActivitiesParams.MatchAll {
return errors.New("must provide ActivityType or MatchAll")
}
return nil
case BatchTypeUpdateActivitiesOptions:
if params.UpdateActivitiesOptionsParams.ActivityType == "" && !params.UpdateActivitiesOptionsParams.MatchAll {
return errors.New("must provide ActivityType or MatchAll")
}
return nil
default:
return fmt.Errorf("not supported batch type: %v", params.BatchType)
}
Expand Down
Loading
Loading