Add OnConflictOptions to StartWorkflowExecution#7080
Conversation
b3cd07b to
74c0732
Compare
41335d0 to
3a5a6f6
Compare
d2b40ac to
b7ed8a7
Compare
bergundy
left a comment
There was a problem hiding this comment.
I want to see functional test coverage for as many of the possible cases. I don't feel comfortable merging this otherwise.
Some tests off the top of my head:
- Test that the event is created as expected with links, callbacks, and request ID attached.
- Test that callbacks appear in the describe response.
- Test request ID based deduping works for the attached request ID
| RequestID string | ||
| RunID string | ||
| State enumsspb.WorkflowExecutionState | ||
| Status enumspb.WorkflowExecutionStatus | ||
| LastWriteVersion int64 | ||
| StartTime *time.Time | ||
| AttachedRequestIDs []string |
There was a problem hiding this comment.
Please document the difference between the RequestID and AttachedRequestIDs fields and when to use either.
go.mod
Outdated
| go.opentelemetry.io/otel/sdk/metric v1.31.0 | ||
| go.opentelemetry.io/otel/trace v1.31.0 | ||
| go.temporal.io/api v1.43.1-0.20241219191931-461db5a23056 | ||
| go.temporal.io/api v1.43.2-0.20250115015852-6bdd96365d41 |
There was a problem hiding this comment.
We should merge the API PR before this server PR.
proto/internal/temporal/server/api/persistence/v1/executions.proto
Outdated
Show resolved
Hide resolved
proto/internal/temporal/server/api/persistence/v1/executions.proto
Outdated
Show resolved
Hide resolved
| if err := wh.validateOnConflictOptions(request.OnConflictOptions); err != nil { | ||
| return nil, err | ||
| } |
There was a problem hiding this comment.
Yes, I know is unnecessary for now. Just being preemptive adding the function to validate.
| versioningOverride *workflowpb.VersioningOverride, | ||
| attachRequestID string, | ||
| attachCompletionCallbacks []*commonpb.Callback, | ||
| links []*commonpb.Link, |
There was a problem hiding this comment.
Maybe instead accept a historypb.WorkflowExecutionOptionsUpdatedEventAttributes and links here to avoid this getting out of control?
There was a problem hiding this comment.
Hm, I think it would make sense to use historyservice.UpdateWorkflowExecutionOptionsRequest, but it would add some logic in the event factory (do the mask processing here), which I'm not sure it makes sense.
I'd leave as it is for now, and refactor later. I'm already planning to refactor the startworkflow api to call the updateworkflowwithoptions api in which I can address this issue together.
| versioningOverride *workflowpb.VersioningOverride, | ||
| attachRequestID string, | ||
| attachCompletionCallbacks []*commonpb.Callback, |
There was a problem hiding this comment.
Same as above, probably better to accept the attributes here.
| versioningOverride *workflowpb.VersioningOverride, | ||
| attachRequestID string, | ||
| attachCompletionCallbacks []*commonpb.Callback, |
There was a problem hiding this comment.
Same, accept the event attributes here instead.
b7ed8a7 to
33229ff
Compare
3a5a6f6 to
9031420
Compare
| func WorkflowExecutionRequestIDsToBlob(requestIDs *persistencespb.WorkflowExecutionRequestIDs) (*commonpb.DataBlob, error) { | ||
| return proto3Encode(requestIDs) | ||
| } | ||
|
|
||
| func WorkflowExecutionRequestIDsFromBlob(blob []byte, encoding string) (*persistencespb.WorkflowExecutionRequestIDs, error) { | ||
| if blob == nil { | ||
| return nil, nil | ||
| } | ||
| result := &persistencespb.WorkflowExecutionRequestIDs{} | ||
| return result, proto3Decode(blob, encoding, result) | ||
| } |
There was a problem hiding this comment.
This is code duplication for now (from the serializer.go file).
We need to refactor the code to pass the serializer object to the DB plugins to be able to serialize/deserialize fields from DB.
There was a problem hiding this comment.
Will you do the refactor? Not blocking this PR, but ideally we'd at least track this.
proto/internal/temporal/server/api/persistence/v1/executions.proto
Outdated
Show resolved
Hide resolved
proto/internal/temporal/server/api/persistence/v1/executions.proto
Outdated
Show resolved
Hide resolved
bergundy
left a comment
There was a problem hiding this comment.
Note that we also need to ensure that the WorkflowExecutionOptionsUpdated event is cherry picked (reapplied) on conflict resolution.
| executionStateDatablob, err := serialization.WorkflowExecutionStateToBlob( | ||
| &persistencespb.WorkflowExecutionState{ | ||
| RunId: runID, | ||
| CreateRequestId: createRequestID, |
There was a problem hiding this comment.
Should we stop storing this now that we have the map?
There was a problem hiding this comment.
I'd leave for a while first to transition to use the new field.
| @@ -122,13 +122,18 @@ type ( | |||
|
|
|||
| // CurrentWorkflowConditionFailedError represents a failed conditional update for current workflow record | |||
| CurrentWorkflowConditionFailedError struct { | |||
There was a problem hiding this comment.
Should we just return the entire *persistencespb.WorkflowExecutionState object here?
There was a problem hiding this comment.
Hm, yes, we can do that, but it's kind of a big refactor. I can do this in another PR.
| "time" | ||
|
|
||
| "github.com/pborman/uuid" | ||
| "github.com/google/uuid" |
There was a problem hiding this comment.
Thanks! I've been wanting to get rid of this dependency and forbid it in our linter but haven't gotten around to it yet.
proto/internal/temporal/server/api/persistence/v1/executions.proto
Outdated
Show resolved
Hide resolved
| versioningOverride *workflowpb.VersioningOverride, | ||
| unsetVersioningOverride bool, | ||
| attachRequestID string, | ||
| attachCompletionCallbacks []*commonpb.Callback, |
There was a problem hiding this comment.
Still think you should just accept *historypb.WorkflowExecutionOptionsUpdatedEventAttributes here and avoid this function signature growing every time we add another attribute.
There was a problem hiding this comment.
It's a bit complicated. This is the event factory, ie., that's the function that builds *historypb.WorkflowExecutionOptionsUpdatedEventAttributes. I want to change to get the history request, but then it would add some logic to translate the request input to event attributes, which is not something we do so far.
There was a problem hiding this comment.
You should be able to accept the attributes in the event factory, not blocking this PR though.
| err := api.ResolveWorkflowIDReusePolicy( | ||
| workflowKey, | ||
| currentWorkflowConditionFailed.Status, | ||
| currentWorkflowConditionFailed.RequestID, | ||
| s.request.StartRequest.GetWorkflowIdReusePolicy(), | ||
| ) |
There was a problem hiding this comment.
Is currentWorkflowConditionFailed.RequestID the current request's ID? Should we be checking that the request IDs in the DB and request matches? (asking for lack of context on this part of the code).
There was a problem hiding this comment.
currentWorkflowConditionFailed.RequestID is the existing current workflow, which can be running or completed already.
33229ff to
51e80d1
Compare
51e80d1 to
4c09e73
Compare
There was a problem hiding this comment.
This is a fix in the history event attributes assertion.
If the field is expected to be nil, the assertion below at L527 will fail because actualV is something like var actualV any = (*T)(nil), and this is different than var expectedValue any = nil.
proto/internal/temporal/server/api/persistence/v1/executions.proto
Outdated
Show resolved
Hide resolved
abcee87 to
7ff6845
Compare
bergundy
left a comment
There was a problem hiding this comment.
Overall LGTM but I'm missing a few test cases:
- Start without OnConflictOptions and same different request ID fails (for good measure)
- Start fails due to validation (e.g. max callback limit reached), does not attach a request ID to the current executions table (e.g. a following request doesn't get duduped).
- XDC test and/or reset test that verifies the update event is applied and start requests get properly deduped
There was a problem hiding this comment.
shall we change other places that uses current execution row to use the blob instead? e.g. GetCurrentExecution in sql/execution.go.
we still do double write so doesn't really matter right now.
There was a problem hiding this comment.
I thought about the same, but I'll check and see if it's gonna be an improvement or not in another PR.
7ff6845 to
46835e2
Compare
_**READ BEFORE MERGING:** All PRs require approval by both Server AND SDK teams before merging! This is why the number of required approvals is "2" and not "1"--two reviewers from the same team is NOT sufficient. If your PR is not approved by someone in BOTH teams, it may be summarily reverted._ <!-- Describe what has changed in this PR --> **What changed?** Add `OnConflictOptions` to `StartWorkflowExecutionRequest` <!-- Tell your future self why have you made these changes --> **Why?** When the workflow id conflict policy is `USE_EXISTING`, `OnConflictOptions` add the ability to change the existing running workflow. <!-- Are there any breaking changes on binary or code level? --> **Breaking changes** <!-- If this breaks the Server, please provide the Server PR to merge right after this PR was merged. --> **Server PR** temporalio/temporal#7080
_**READ BEFORE MERGING:** All PRs require approval by both Server AND SDK teams before merging! This is why the number of required approvals is "2" and not "1"--two reviewers from the same team is NOT sufficient. If your PR is not approved by someone in BOTH teams, it may be summarily reverted._ <!-- Describe what has changed in this PR --> **What changed?** Add `OnConflictOptions` to `StartWorkflowExecutionRequest` <!-- Tell your future self why have you made these changes --> **Why?** When the workflow id conflict policy is `USE_EXISTING`, `OnConflictOptions` add the ability to change the existing running workflow. <!-- Are there any breaking changes on binary or code level? --> **Breaking changes** <!-- If this breaks the Server, please provide the Server PR to merge right after this PR was merged. --> **Server PR** temporalio/temporal#7080
9c3304b to
924f1d9
Compare
924f1d9 to
d01c49a
Compare
What changed?
Add
OnConflictOptionstoStartWorkflowExecution.USE_EXISTING, then no-op;WorkflowExecutionOptionsUpdatedEventto the existing running workflow.Why?
Ability to attach completion callbacks to existing running workflows.
How did you test it?
WIP: writing tests
Potential risks
Documentation
Is hotfix candidate?