Documentation
¶
Index ¶
- func FilterWorkflowHistoriesBundle(bundle []byte, tryReplay func(singleExecutionBundle []byte) error) ([]byte, int, error)
- func GetWorkflowHistoriesBundle(ctx context.Context, client *Client, w WorkflowDeclaration) ([]byte, error)
- func ReplayWorkflow(historiesBytes []byte, fn any, opts worker.WorkflowReplayerOptions) error
- type Activity
- func (a Activity[Param, Return]) AppendFuture(ctx workflow.Context, futures *[]workflow.Future, param Param)
- func (a Activity[Param, Return]) Execute(ctx workflow.Context, param Param) workflow.Future
- func (a Activity[Param, Return]) Run(ctx workflow.Context, param Param) (Return, error)
- func (a Activity[Param, Return]) WithImplementation(fn func(context.Context, Param) (Return, error)) *ActivityWithImpl
- type ActivityWithImpl
- type Client
- type QueryHandler
- type Queue
- type Registerable
- type Worker
- type Workflow
- func (w Workflow[Param, Return]) Execute(ctx context.Context, temporalClient *Client, opts client.StartWorkflowOptions, ...) (client.WorkflowRun, error)
- func (w Workflow[Param, Return]) ExecuteChild(ctx workflow.Context, opts workflow.ChildWorkflowOptions, param Param) workflow.ChildWorkflowFuture
- func (w Workflow[Param, Return]) Name() string
- func (w Workflow[Param, Return]) ReplayWorkflow(historiesBytes []byte, fn func(workflow.Context, Param) (Return, error), ...) error
- func (w Workflow[Param, Return]) Run(ctx context.Context, temporalClient *Client, opts client.StartWorkflowOptions, ...) (Return, error)
- func (w Workflow[Param, Return]) RunChild(ctx workflow.Context, opts workflow.ChildWorkflowOptions, param Param) (Return, error)
- func (w Workflow[Param, Return]) SetSchedule(ctx context.Context, temporalClient *Client, opts client.ScheduleOptions, ...) error
- func (w Workflow[Param, Return]) WithImplementation(fn func(workflow.Context, Param) (Return, error)) *WorkflowWithImpl[Param, Return]
- type WorkflowDeclaration
- type WorkflowSignal
- func (s *WorkflowSignal[WP, WR, SP]) AddToSelector(ctx workflow.Context, selector workflow.Selector, fn func(SP)) workflow.Selector
- func (s *WorkflowSignal[WP, WR, SP]) Name() string
- func (s *WorkflowSignal[WP, WR, SP]) Receive(ctx workflow.Context) SP
- func (s *WorkflowSignal[WP, WR, SP]) Signal(ctx context.Context, temporalClient *Client, workflowID, runID string, ...) error
- func (s *WorkflowSignal[WP, WR, SP]) SignalWithStart(ctx context.Context, temporalClient *Client, opts client.StartWorkflowOptions, ...) (client.WorkflowRun, error)
- func (s *WorkflowSignal[WP, WR, SP]) TryReceive(ctx workflow.Context) (value SP, ok bool)
- type WorkflowWithImpl
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func FilterWorkflowHistoriesBundle ¶
func FilterWorkflowHistoriesBundle(bundle []byte, tryReplay func(singleExecutionBundle []byte) error) ([]byte, int, error)
FilterWorkflowHistoriesBundle filters a histories bundle to keep only executions where tryReplay returns nil. It returns the filtered bundle as serialized bytes. If no executions pass the filter, it returns an error.
func GetWorkflowHistoriesBundle ¶
func GetWorkflowHistoriesBundle(ctx context.Context, client *Client, w WorkflowDeclaration) ([]byte, error)
GetWorkflowHistoriesBundle connects to the temporal server and fetches the most recent 10 open and 10 closed executions. It returns a byte seralized piece of data that can be used immediately or in the future to call ReplayWorkflow.
func ReplayWorkflow ¶
func ReplayWorkflow(historiesBytes []byte, fn any, opts worker.WorkflowReplayerOptions) error
ReplayWorkflow is meant to be used in tests with the output of GetWorkflowHistoriesBundle to check if the given workflow implementation (fn) is compatible with previous executions captured at the time when GetWorkflowHistoriesBundle was run.
Types ¶
type Activity ¶
Activity is used for interacting with activities in a safe way that takes into account the input and output types, queue name and other properties.
func NewActivity ¶
NewActivity declares the existence of an activity on a given queue with a given name.
func NewActivityPositional ¶
NewActivityPositional declares the existence of an activity on a given queue with a given name. Instead of passing the Param struct directly to the activity, it passes each field of the struct as a separate positional argument in the order they are defined.
func (Activity[Param, Return]) AppendFuture ¶
func (a Activity[Param, Return]) AppendFuture(ctx workflow.Context, futures *[]workflow.Future, param Param)
AppendFuture executes the activity and appends the resulting future to the provided slice of futures.
func (Activity[Param, Return]) Execute ¶
Execute asynchronously executes the activity and returns a promise.
func (Activity[Param, Return]) Run ¶
Run synchronously executes the activity and returns the result.
func (Activity[Param, Return]) WithImplementation ¶
func (a Activity[Param, Return]) WithImplementation(fn func(context.Context, Param) (Return, error)) *ActivityWithImpl
WithImplementation should be called to create the parameters for NewWorker(). It declares which function implements the activity.
type ActivityWithImpl ¶
type ActivityWithImpl struct {
// contains filtered or unexported fields
}
ActivityWithImpl is a temporary struct that implements Registerable. It's meant to be passed into `tempts.NewWorker`.
type Client ¶
Client is a wrapper for the temporal SDK client that keeps track of which namepace the client is connected to to return more useful errors if the wrong namespace is used.
func NewFromSDK ¶
NewFromSDK allows the caller to pass in an existing temporal SDK client and manually specify which name that client was connected to.
func NewLazyClient ¶
NewLazyClient is equivalent to Dial, but doesn't conect to the server until necessary.
type QueryHandler ¶
type QueryHandler[Param, Return any] struct { // contains filtered or unexported fields }
QueryHandler is used for interacting with queries on workflows. Currently there's no way to enforce the connection between the query and the workflow it should be valid on.
func NewQueryHandler ¶
func NewQueryHandler[Param, Return any](queryName string) *QueryHandler[Param, Return]
NewQueryHandler declares the name and types for a query to a workflow.
func (*QueryHandler[Param, Return]) Query ¶
func (q *QueryHandler[Param, Return]) Query(ctx context.Context, temporalClient *Client, workflowID, runID string, p Param) (Return, error)
Query executes the query and returns the response.
func (*QueryHandler[Param, Return]) SetHandler ¶
func (q *QueryHandler[Param, Return]) SetHandler(ctx workflow.Context, fn func(Param) (Return, error))
SetHandler should be called by a workflow to define how the query should be handled when sent to this workflow to execute.
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue is the declaration of a temporal, queue, which is used for routing workflows and activities to workers.
func (*Queue) RegisterMockFallbacks ¶
func (q *Queue) RegisterMockFallbacks(r registry)
RegisterMockFallbacks registers fake activities and workflows for queue in the context of a unit test. This is necessary so that the test environment knows their types and they can be mocked. Any unmocked activities or workflows trigger a panic and fail the test with a descriptive error message.
type Registerable ¶
type Registerable interface {
// contains filtered or unexported methods
}
Registerable can be created by calling WithImplementation() on activity or workflow definitions. It's a parameter to `tempts.NewWorker()`.
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker represents a temporal worker that connects to the temporal server to execute activities and workflows.
func NewWorker ¶
func NewWorker(queue *Queue, registerables []Registerable) (*Worker, error)
NewWorker defines a worker along with all of the workflows and activities. Example usage:
wrk, err := tempts.NewWorker(queueMain, []tempts.Registerable{
activityTypeFormatName.WithImplementation(activityFormatName),
activityTypeGreet.WithImplementation(activityGreet),
workflowTypeFormatAndGreet.WithImplementation(workflowFormatAndGreet),
workflowTypeJustGreet.WithImplementation(workflowJustGreet),
})
type Workflow ¶
Workflow is used for interacting with workflows in a safe way that takes into account the input and output types, queue name and other properties. Workflows are resumable functions registered on workers that execute activities.
func NewWorkflow ¶
NewWorkflow declares the existence of a workflow on a given queue with a given name.
func NewWorkflowPositional ¶
func NewWorkflowPositional[Param any, Return any](queue *Queue, name string) Workflow[Param, Return]
NewWorkflowPositional declares the existence of a workflow on a given queue with a given name. Instead of passing the Param struct directly to the workflow, it passes each field of the struct as a separate positional argument in the order they are defined.
func (Workflow[Param, Return]) Execute ¶
func (w Workflow[Param, Return]) Execute(ctx context.Context, temporalClient *Client, opts client.StartWorkflowOptions, param Param) (client.WorkflowRun, error)
Execute asynchronously executes the workflow and returns a promise.
func (Workflow[Param, Return]) ExecuteChild ¶
func (w Workflow[Param, Return]) ExecuteChild(ctx workflow.Context, opts workflow.ChildWorkflowOptions, param Param) workflow.ChildWorkflowFuture
Execute asynchronously executes the workflow from another parent workflow and returns a promise.
func (Workflow[Param, Return]) ReplayWorkflow ¶
func (w Workflow[Param, Return]) ReplayWorkflow(historiesBytes []byte, fn func(workflow.Context, Param) (Return, error), opts worker.WorkflowReplayerOptions) error
ReplayWorkflow is a typed version of the package-level ReplayWorkflow that handles positional workflow replay automatically. For positional workflows, fn is wrapped to accept positional arguments matching the on-wire format before reconstructing the Param struct.
func (Workflow[Param, Return]) Run ¶
func (w Workflow[Param, Return]) Run(ctx context.Context, temporalClient *Client, opts client.StartWorkflowOptions, param Param) (Return, error)
Run executes the workflow and synchronously returns the output.
func (Workflow[Param, Return]) RunChild ¶
func (w Workflow[Param, Return]) RunChild(ctx workflow.Context, opts workflow.ChildWorkflowOptions, param Param) (Return, error)
Run executes the workflow from another parent workflow and synchronously returns the output.
func (Workflow[Param, Return]) SetSchedule ¶
func (w Workflow[Param, Return]) SetSchedule(ctx context.Context, temporalClient *Client, opts client.ScheduleOptions, param Param) error
SetSchedule creates or updates the schedule to match the given definition. WARNING: This feature is not as seamless as it could be because of the complex API exposed by temporal. In some cases, when the schedule has been modified in some non-updateable way, this method can't update the schedule and it returns an error.
func (Workflow[Param, Return]) WithImplementation ¶
func (w Workflow[Param, Return]) WithImplementation(fn func(workflow.Context, Param) (Return, error)) *WorkflowWithImpl[Param, Return]
WithImplementation should be called to create the parameters for NewWorker(). It declares which function implements the workflow.
type WorkflowDeclaration ¶
type WorkflowDeclaration interface {
Name() string
// contains filtered or unexported methods
}
WorkflowDeclaration always contains Workflow but doesn't have type parameters, so it can be passed into non-generic functions.
type WorkflowSignal ¶
type WorkflowSignal[WorkflowParam, WorkflowReturn, SignalParam any] struct { // contains filtered or unexported fields }
WorkflowSignal is a type-safe signal scoped to a specific workflow. It captures the workflow's parameter and return types to enable type-safe SignalWithStart.
func NewWorkflowSignal ¶
func NewWorkflowSignal[SignalParam any, WorkflowParam, WorkflowReturn any]( w *Workflow[WorkflowParam, WorkflowReturn], signalName string, ) *WorkflowSignal[WorkflowParam, WorkflowReturn, SignalParam]
NewWorkflowSignal declares a signal for a specific workflow. The signal is scoped to this workflow, enabling type-safe SignalWithStart operations.
The signalName is used as the Temporal channel name (via workflow.GetSignalChannel) and must match when sending signals from external code.
func (*WorkflowSignal[WP, WR, SP]) AddToSelector ¶
func (s *WorkflowSignal[WP, WR, SP]) AddToSelector(ctx workflow.Context, selector workflow.Selector, fn func(SP)) workflow.Selector
AddToSelector adds this signal to a workflow.Selector with a typed callback. This is the recommended way to handle multiple signals in a select-style pattern.
Example:
selector := workflow.NewSelector(ctx)
signal1.AddToSelector(ctx, selector, func(val Signal1Param) {
// handle signal1
})
signal2.AddToSelector(ctx, selector, func(val Signal2Param) {
// handle signal2
})
selector.Select(ctx)
func (*WorkflowSignal[WP, WR, SP]) Name ¶
func (s *WorkflowSignal[WP, WR, SP]) Name() string
Name returns the signal name, which is also the underlying Temporal channel name.
func (*WorkflowSignal[WP, WR, SP]) Receive ¶
func (s *WorkflowSignal[WP, WR, SP]) Receive(ctx workflow.Context) SP
Receive blocks until a signal is received and returns the typed value. For handling multiple signals, use AddToSelector instead.
func (*WorkflowSignal[WP, WR, SP]) Signal ¶
func (s *WorkflowSignal[WP, WR, SP]) Signal( ctx context.Context, temporalClient *Client, workflowID, runID string, param SP, ) error
Signal sends the signal to an already-running workflow.
func (*WorkflowSignal[WP, WR, SP]) SignalWithStart ¶
func (s *WorkflowSignal[WP, WR, SP]) SignalWithStart( ctx context.Context, temporalClient *Client, opts client.StartWorkflowOptions, workflowParam WP, signalParam SP, ) (client.WorkflowRun, error)
SignalWithStart atomically starts the workflow if it doesn't exist, or signals it if it does. This is useful for ensuring exactly-once workflow creation with an initial signal.
func (*WorkflowSignal[WP, WR, SP]) TryReceive ¶
func (s *WorkflowSignal[WP, WR, SP]) TryReceive(ctx workflow.Context) (value SP, ok bool)
TryReceive attempts to receive a signal without blocking. Returns the value and whether a value was received. For handling multiple signals, use AddToSelector instead.
type WorkflowWithImpl ¶
WorkflowWithImpl is a temporary struct that implements Registerable. It's meant to be passed into `tempts.NewWorker`.
func (WorkflowWithImpl[Param, Return]) ExecuteInTest ¶
func (w WorkflowWithImpl[Param, Return]) ExecuteInTest(e testEnvironment, p Param) (Return, error)
ExecuteInTest executes the given workflow implementation in a unit test and returns the output of the workflow.
func (WorkflowWithImpl[Param, Return]) Name ¶
func (w WorkflowWithImpl[Param, Return]) Name() string
Name returns the name of the workflow being implemented.