Documentation
¶
Index ¶
- Constants
- Variables
- func GetNextFibonacciValues(m, n int) (int, int)
- func IsCompatibleK8S(kw *KubeUnit, versionStr string) bool
- func IsComplete(workState int) bool
- func IsPending(err error) bool
- func NewPythonUnit(baseWorkUnit BaseWorkUnitForWorkUnit, plugin string, function string, ...) *pythonUnit
- func ParseTime(s string) *time.Time
- func ReadFileToString(filename string) (string, error)
- func ShouldUseReconnect(kw *KubeUnit) bool
- func WorkStateToString(workState int) string
- type BaseWorkUnit
- func (bwu *BaseWorkUnit) CancelContext()
- func (bwu *BaseWorkUnit) Debug(format string, v ...interface{})
- func (bwu *BaseWorkUnit) Error(format string, v ...interface{})
- func (bwu *BaseWorkUnit) GetCancel() context.CancelFunc
- func (bwu *BaseWorkUnit) GetContext() context.Context
- func (bwu *BaseWorkUnit) GetStatusCopy() StatusFileData
- func (bwu *BaseWorkUnit) GetStatusLock() *sync.RWMutex
- func (bwu *BaseWorkUnit) GetStatusWithoutExtraData() *StatusFileData
- func (bwu *BaseWorkUnit) GetWorkceptor() *Workceptor
- func (bwu *BaseWorkUnit) ID() string
- func (bwu *BaseWorkUnit) Info(format string, v ...interface{})
- func (bwu *BaseWorkUnit) Init(w *Workceptor, unitID string, workType string, fs FileSystemer)
- func (bwu *BaseWorkUnit) LastUpdateError() error
- func (bwu *BaseWorkUnit) Load() error
- func (bwu *BaseWorkUnit) MonitorLocalStatus()
- func (bwu *BaseWorkUnit) Release(force bool) error
- func (bwu *BaseWorkUnit) Save() error
- func (bwu *BaseWorkUnit) SetFromParams(_ map[string]string) error
- func (bwu *BaseWorkUnit) SetStatusExtraData(ed interface{})
- func (bwu *BaseWorkUnit) SetWorkceptor(w *Workceptor)
- func (bwu *BaseWorkUnit) Status() *StatusFileData
- func (bwu *BaseWorkUnit) StatusFileName() string
- func (bwu *BaseWorkUnit) StdoutFileName() string
- func (bwu *BaseWorkUnit) UnitDir() string
- func (bwu *BaseWorkUnit) UnredactedStatus() *StatusFileData
- func (bwu *BaseWorkUnit) UpdateBasicStatus(state int, detail string, stdoutSize int64)
- func (bwu *BaseWorkUnit) UpdateFullStatus(statusFunc func(*StatusFileData))
- func (bwu *BaseWorkUnit) Warning(format string, v ...interface{})
- type BaseWorkUnitForWorkUnit
- type CommandExtraData
- type CommandWorkerCfg
- type FileReadCloser
- type FileSystem
- type FileSystemer
- type FileWriteCloser
- type KubeAPIWrapper
- func (ku KubeAPIWrapper) BuildConfigFromFlags(masterURL string, kubeconfigPath string) (*rest.Config, error)
- func (ku KubeAPIWrapper) Create(ctx context.Context, clientset kubernetes.Interface, namespace string, ...) (*corev1.Pod, error)
- func (ku KubeAPIWrapper) Delete(ctx context.Context, clientset kubernetes.Interface, namespace string, ...) error
- func (ku KubeAPIWrapper) Get(ctx context.Context, clientset kubernetes.Interface, namespace string, ...) (*corev1.Pod, error)
- func (ku KubeAPIWrapper) GetLogs(clientset kubernetes.Interface, namespace string, name string, ...) *rest.Request
- func (ku KubeAPIWrapper) InClusterConfig() (*rest.Config, error)
- func (ku KubeAPIWrapper) List(ctx context.Context, clientset kubernetes.Interface, namespace string, ...) (*corev1.PodList, error)
- func (ku KubeAPIWrapper) NewClientConfigFromBytes(configBytes []byte) (clientcmd.ClientConfig, error)
- func (ku KubeAPIWrapper) NewDefaultClientConfigLoadingRules() *clientcmd.ClientConfigLoadingRules
- func (ku KubeAPIWrapper) NewFakeAlwaysRateLimiter() flowcontrol.RateLimiter
- func (ku KubeAPIWrapper) NewFakeNeverRateLimiter() flowcontrol.RateLimiter
- func (ku KubeAPIWrapper) NewForConfig(c *rest.Config) (kubernetes.Interface, error)
- func (ku KubeAPIWrapper) NewNotFound(qualifiedResource schema.GroupResource, name string) *apierrors.StatusError
- func (ku KubeAPIWrapper) NewSPDYExecutor(config *rest.Config, method string, url *url.URL) (remotecommand.Executor, error)
- func (ku KubeAPIWrapper) OneTermEqualSelector(k string, v string) fields.Selector
- func (ku KubeAPIWrapper) StreamWithContext(ctx context.Context, exec remotecommand.Executor, ...) error
- func (ku KubeAPIWrapper) SubResource(clientset kubernetes.Interface, podName string, podNamespace string) *rest.Request
- func (ku KubeAPIWrapper) UntilWithSync(ctx context.Context, lw cache.ListerWatcher, objType runtime.Object, ...) (*watch.Event, error)
- func (ku KubeAPIWrapper) Watch(ctx context.Context, clientset kubernetes.Interface, namespace string, ...) (watch.Interface, error)
- type KubeAPIer
- type KubeExtraData
- type KubePodStateHelper
- type KubeUnit
- func (kw *KubeUnit) Cancel() error
- func (kw *KubeUnit) CreatePod(env map[string]string) error
- func (kw *KubeUnit) GetKubeRetryCount() int
- func (kw *KubeUnit) GetKubeTimeoutStart() time.Duration
- func (kw *KubeUnit) GetSleepDuration(multipler int) time.Duration
- func (kw *KubeUnit) KubeLoggingWithReconnect(streamWait *sync.WaitGroup, stdout *STDoutWriter, stdinErr *error, ...)
- func (kw KubeUnit) PodContainerHealthy(pod *corev1.Pod, containerName string) (bool, error)
- func (kw KubeUnit) PodHealthy(pod *corev1.Pod, containerName string) (bool, error)
- func (kw *KubeUnit) ProcessLogLine(line string, sinceTime time.Time, successfulWrite bool) (msg string, newSinceTime time.Time, shouldSkip bool)
- func (kw *KubeUnit) Release(force bool) error
- func (kw *KubeUnit) Restart() error
- func (kw *KubeUnit) RunWorkUsingLogger()
- func (kw *KubeUnit) SetClientset(clientset kubernetes.Interface)
- func (kw *KubeUnit) SetFromParams(params map[string]string) error
- func (kw *KubeUnit) Start() error
- func (kw *KubeUnit) Status() *StatusFileData
- func (kw *KubeUnit) UnredactedStatus() *StatusFileData
- type KubeWorkerCfg
- func (cfg KubeWorkerCfg) GetVerifySignature() bool
- func (cfg KubeWorkerCfg) GetWorkType() string
- func (cfg KubeWorkerCfg) NewWorker(bwu BaseWorkUnitForWorkUnit, w *Workceptor, unitID string, workType string) WorkUnit
- func (cfg KubeWorkerCfg) NewkubeWorker(bwu BaseWorkUnitForWorkUnit, w *Workceptor, unitID string, workType string, ...) WorkUnit
- func (cfg KubeWorkerCfg) Prepare() error
- func (cfg KubeWorkerCfg) Run() error
- type NetceptorForWorkceptor
- type NewWorkerFunc
- type RemoteExtraData
- type STDinReader
- type STDoutWriter
- type ServerForWorkceptor
- type SigningKeyPrivateCfg
- type StatusFileData
- func (sfd *StatusFileData) Load(filename string) error
- func (sfd *StatusFileData) Save(filename string) error
- func (sfd *StatusFileData) UpdateBasicStatus(filename string, state int, detail string, stdoutSize int64) error
- func (sfd *StatusFileData) UpdateFullStatus(filename string, statusFunc func(*StatusFileData)) error
- type VerifyingKeyPublicCfg
- type WorkPythonCfg
- type WorkUnit
- type Workceptor
- func (w *Workceptor) AllocateRemoteUnit(remoteNode, remoteWorkType, workUnitID string, tlsClient, ttl string, ...) (WorkUnit, error)
- func (w *Workceptor) AllocateUnit(workTypeName string, workUnitID string, params map[string]string) (WorkUnit, error)
- func (w *Workceptor) CancelUnit(unitID string) error
- func (w *Workceptor) GetResults(ctx context.Context, unitID string, startPos int64) (chan []byte, error)
- func (w *Workceptor) ListKnownUnitIDs() []string
- func (w *Workceptor) RegisterWithControlService(cs ServerForWorkceptor) error
- func (w *Workceptor) RegisterWorker(typeName string, newWorkerFunc NewWorkerFunc, verifySignature bool) error
- func (w *Workceptor) ReleaseUnit(unitID string, force bool) error
- func (w *Workceptor) ShouldVerifySignature(workType string, signWork bool) bool
- func (w *Workceptor) StartUnit(unitID string) error
- func (w *Workceptor) UnitStatus(unitID string) (*StatusFileData, error)
- func (w *Workceptor) VerifySignature(signature string) error
- type WorkerConfig
Constants ¶
const ( SuccessWorkSleep = 1 * time.Second // Normal time to wait between checks MaxWorkSleep = 1 * time.Minute // Max time to ever wait between checks )
Work sleep constants.
const ( WorkStatePending = 0 WorkStateRunning = 1 WorkStateSucceeded = 2 WorkStateFailed = 3 WorkStateCanceled = 4 )
Work state constants.
const WorkerContainerName = "worker"
Variables ¶
var ErrImagePullBackOff = fmt.Errorf("container failed to start")
ErrImagePullBackOff is returned when the image for the container in the Pod cannot be pulled.
var ErrPending = fmt.Errorf("operation pending")
ErrPending is returned when an operation hasn't succeeded or failed yet.
var ErrPodCompleted = fmt.Errorf("pod ran to completion")
ErrPodCompleted is returned when pod has already completed before we could attach.
var ErrPodFailed = fmt.Errorf("pod failed to start")
ErrPodFailed is returned when pod has failed before we could attach.
Functions ¶
func GetNextFibonacciValues ¶ added in v1.6.2
GetNextFibonacciValues gets the next values in the Fibonacci sequence. Returned values will not be negative or larger than 1000.
func IsCompatibleK8S ¶ added in v1.4.5
func IsComplete ¶
IsComplete returns true if a given WorkState indicates the job is finished.
func NewPythonUnit ¶ added in v1.5.4
func NewPythonUnit(baseWorkUnit BaseWorkUnitForWorkUnit, plugin string, function string, config map[string]any) *pythonUnit
NewPythonUnit creates a new pythonUnit using the given parameters.
func ReadFileToString ¶ added in v1.5.6
ReadFileToString reads a file and returns its contents as a string. If filename is empty, it returns an empty string.
func ShouldUseReconnect ¶ added in v1.4.5
func WorkStateToString ¶
WorkStateToString returns a string representation of a WorkState.
Types ¶
type BaseWorkUnit ¶
type BaseWorkUnit struct {
// contains filtered or unexported fields
}
BaseWorkUnit includes data common to all work units, and partially implements the WorkUnit interface.
func (*BaseWorkUnit) CancelContext ¶ added in v1.4.2
func (bwu *BaseWorkUnit) CancelContext()
func (*BaseWorkUnit) Debug ¶ added in v1.3.0
func (bwu *BaseWorkUnit) Debug(format string, v ...interface{})
Debug logs message with unitID prepended.
func (*BaseWorkUnit) Error ¶ added in v1.3.0
func (bwu *BaseWorkUnit) Error(format string, v ...interface{})
Error logs message with unitID prepended.
func (*BaseWorkUnit) GetCancel ¶ added in v1.4.4
func (bwu *BaseWorkUnit) GetCancel() context.CancelFunc
func (*BaseWorkUnit) GetContext ¶ added in v1.4.4
func (bwu *BaseWorkUnit) GetContext() context.Context
func (*BaseWorkUnit) GetStatusCopy ¶ added in v1.4.4
func (bwu *BaseWorkUnit) GetStatusCopy() StatusFileData
func (*BaseWorkUnit) GetStatusLock ¶ added in v1.4.4
func (bwu *BaseWorkUnit) GetStatusLock() *sync.RWMutex
func (*BaseWorkUnit) GetStatusWithoutExtraData ¶ added in v1.4.4
func (bwu *BaseWorkUnit) GetStatusWithoutExtraData() *StatusFileData
func (*BaseWorkUnit) GetWorkceptor ¶ added in v1.4.4
func (bwu *BaseWorkUnit) GetWorkceptor() *Workceptor
func (*BaseWorkUnit) ID ¶
func (bwu *BaseWorkUnit) ID() string
ID returns the unique identifier of this work unit.
func (*BaseWorkUnit) Info ¶ added in v1.3.0
func (bwu *BaseWorkUnit) Info(format string, v ...interface{})
Info logs message with unitID prepended.
func (*BaseWorkUnit) Init ¶
func (bwu *BaseWorkUnit) Init(w *Workceptor, unitID string, workType string, fs FileSystemer)
Init initializes the basic work unit data, in memory only.
func (*BaseWorkUnit) LastUpdateError ¶
func (bwu *BaseWorkUnit) LastUpdateError() error
LastUpdateError returns the last error (including nil) resulting from an UpdateBasicStatus or UpdateFullStatus.
func (*BaseWorkUnit) MonitorLocalStatus ¶ added in v1.4.2
func (bwu *BaseWorkUnit) MonitorLocalStatus()
MonitorLocalStatus watches a unit dir and keeps the in-memory workUnit up to date with status changes.
func (*BaseWorkUnit) Release ¶
func (bwu *BaseWorkUnit) Release(force bool) error
Release releases this unit of work, deleting its files.
func (*BaseWorkUnit) SetFromParams ¶
func (bwu *BaseWorkUnit) SetFromParams(_ map[string]string) error
SetFromParams sets the in-memory state from parameters.
func (*BaseWorkUnit) SetStatusExtraData ¶ added in v1.4.4
func (bwu *BaseWorkUnit) SetStatusExtraData(ed interface{})
func (*BaseWorkUnit) SetWorkceptor ¶ added in v1.4.4
func (bwu *BaseWorkUnit) SetWorkceptor(w *Workceptor)
func (*BaseWorkUnit) Status ¶
func (bwu *BaseWorkUnit) Status() *StatusFileData
Status returns a copy of the status currently loaded in memory (use Load to get it from disk).
func (*BaseWorkUnit) StatusFileName ¶
func (bwu *BaseWorkUnit) StatusFileName() string
StatusFileName returns the full path to the status file in the unit dir.
func (*BaseWorkUnit) StdoutFileName ¶
func (bwu *BaseWorkUnit) StdoutFileName() string
StdoutFileName returns the full path to the stdout file in the unit dir.
func (*BaseWorkUnit) UnitDir ¶
func (bwu *BaseWorkUnit) UnitDir() string
UnitDir returns the unit directory of this work unit.
func (*BaseWorkUnit) UnredactedStatus ¶
func (bwu *BaseWorkUnit) UnredactedStatus() *StatusFileData
UnredactedStatus returns a copy of the status currently loaded in memory, including secrets.
func (*BaseWorkUnit) UpdateBasicStatus ¶
func (bwu *BaseWorkUnit) UpdateBasicStatus(state int, detail string, stdoutSize int64)
UpdateBasicStatus atomically updates key fields in the status metadata file. Errors are logged rather than returned. Passing -1 as stdoutSize leaves it unchanged.
func (*BaseWorkUnit) UpdateFullStatus ¶
func (bwu *BaseWorkUnit) UpdateFullStatus(statusFunc func(*StatusFileData))
UpdateFullStatus atomically updates the whole status record. Changes should be made in the callback function. Errors are logged rather than returned.
func (*BaseWorkUnit) Warning ¶ added in v1.3.0
func (bwu *BaseWorkUnit) Warning(format string, v ...interface{})
Warning logs message with unitID prepended.
type BaseWorkUnitForWorkUnit ¶ added in v1.4.4
type BaseWorkUnitForWorkUnit interface {
CancelContext()
ID() string
Init(w *Workceptor, unitID string, workType string, fs FileSystemer)
LastUpdateError() error
Load() error
MonitorLocalStatus()
Release(force bool) error
Save() error
SetFromParams(_ map[string]string) error
Status() *StatusFileData
StatusFileName() string
StdoutFileName() string
UnitDir() string
UnredactedStatus() *StatusFileData
UpdateBasicStatus(state int, detail string, stdoutSize int64)
UpdateFullStatus(statusFunc func(*StatusFileData))
GetStatusCopy() StatusFileData
GetStatusWithoutExtraData() *StatusFileData
SetStatusExtraData(interface{})
GetStatusLock() *sync.RWMutex
GetWorkceptor() *Workceptor
SetWorkceptor(*Workceptor)
GetContext() context.Context
GetCancel() context.CancelFunc
}
type CommandExtraData ¶ added in v1.4.4
CommandExtraData is the content of the ExtraData JSON field for a command worker.
type CommandWorkerCfg ¶ added in v1.4.0
type CommandWorkerCfg struct {
WorkType string `required:"true" description:"Name for this worker type"`
Command string `required:"true" description:"Command to run to process units of work"`
Params string `description:"Command-line parameters"`
AllowRuntimeParams bool `description:"Allow users to add more parameters" default:"false"`
VerifySignature bool `description:"Verify a signed work submission" default:"false"`
}
CommandWorkerCfg is the cmdline configuration object for a worker that runs a command.
func (CommandWorkerCfg) GetVerifySignature ¶ added in v1.4.0
func (cfg CommandWorkerCfg) GetVerifySignature() bool
func (CommandWorkerCfg) GetWorkType ¶ added in v1.4.0
func (cfg CommandWorkerCfg) GetWorkType() string
func (CommandWorkerCfg) NewWorker ¶ added in v1.4.0
func (cfg CommandWorkerCfg) NewWorker(bwu BaseWorkUnitForWorkUnit, w *Workceptor, unitID string, workType string) WorkUnit
func (CommandWorkerCfg) Run ¶ added in v1.4.0
func (cfg CommandWorkerCfg) Run() error
Run runs the action.
type FileReadCloser ¶ added in v1.4.2
type FileReadCloser interface {
io.ReadCloser
}
FileReadCloser wraps io.ReadCloser.
type FileSystem ¶ added in v1.4.2
type FileSystem struct{}
FileSystem represents the real filesystem.
func (FileSystem) Open ¶ added in v1.4.2
func (FileSystem) Open(name string) (*os.File, error)
Open opens a file.
func (FileSystem) RemoveAll ¶ added in v1.4.2
func (FileSystem) RemoveAll(path string) error
RemoveAll removes path and any children it contains.
type FileSystemer ¶ added in v1.4.2
type FileSystemer interface {
OpenFile(name string, flag int, perm os.FileMode) (*os.File, error)
Stat(name string) (os.FileInfo, error)
Open(name string) (*os.File, error)
RemoveAll(path string) error
}
FileSystemer represents a filesystem.
type FileWriteCloser ¶ added in v1.4.2
type FileWriteCloser interface {
io.WriteCloser
}
FileWriteCloser wraps io.WriteCloser.
type KubeAPIWrapper ¶ added in v1.4.5
type KubeAPIWrapper struct{}
func (KubeAPIWrapper) BuildConfigFromFlags ¶ added in v1.4.5
func (KubeAPIWrapper) Create ¶ added in v1.4.5
func (ku KubeAPIWrapper) Create(ctx context.Context, clientset kubernetes.Interface, namespace string, pod *corev1.Pod, opts metav1.CreateOptions) (*corev1.Pod, error)
func (KubeAPIWrapper) Delete ¶ added in v1.4.5
func (ku KubeAPIWrapper) Delete(ctx context.Context, clientset kubernetes.Interface, namespace string, name string, opts metav1.DeleteOptions) error
func (KubeAPIWrapper) Get ¶ added in v1.4.5
func (ku KubeAPIWrapper) Get(ctx context.Context, clientset kubernetes.Interface, namespace string, name string, opts metav1.GetOptions) (*corev1.Pod, error)
func (KubeAPIWrapper) GetLogs ¶ added in v1.4.5
func (ku KubeAPIWrapper) GetLogs(clientset kubernetes.Interface, namespace string, name string, opts *corev1.PodLogOptions) *rest.Request
func (KubeAPIWrapper) InClusterConfig ¶ added in v1.4.5
func (ku KubeAPIWrapper) InClusterConfig() (*rest.Config, error)
func (KubeAPIWrapper) List ¶ added in v1.4.5
func (ku KubeAPIWrapper) List(ctx context.Context, clientset kubernetes.Interface, namespace string, opts metav1.ListOptions) (*corev1.PodList, error)
func (KubeAPIWrapper) NewClientConfigFromBytes ¶ added in v1.4.5
func (ku KubeAPIWrapper) NewClientConfigFromBytes(configBytes []byte) (clientcmd.ClientConfig, error)
func (KubeAPIWrapper) NewDefaultClientConfigLoadingRules ¶ added in v1.4.5
func (ku KubeAPIWrapper) NewDefaultClientConfigLoadingRules() *clientcmd.ClientConfigLoadingRules
func (KubeAPIWrapper) NewFakeAlwaysRateLimiter ¶ added in v1.4.5
func (ku KubeAPIWrapper) NewFakeAlwaysRateLimiter() flowcontrol.RateLimiter
func (KubeAPIWrapper) NewFakeNeverRateLimiter ¶ added in v1.4.5
func (ku KubeAPIWrapper) NewFakeNeverRateLimiter() flowcontrol.RateLimiter
func (KubeAPIWrapper) NewForConfig ¶ added in v1.4.5
func (ku KubeAPIWrapper) NewForConfig(c *rest.Config) (kubernetes.Interface, error)
func (KubeAPIWrapper) NewNotFound ¶ added in v1.4.5
func (ku KubeAPIWrapper) NewNotFound(qualifiedResource schema.GroupResource, name string) *apierrors.StatusError
func (KubeAPIWrapper) NewSPDYExecutor ¶ added in v1.4.5
func (ku KubeAPIWrapper) NewSPDYExecutor(config *rest.Config, method string, url *url.URL) (remotecommand.Executor, error)
func (KubeAPIWrapper) OneTermEqualSelector ¶ added in v1.4.5
func (ku KubeAPIWrapper) OneTermEqualSelector(k string, v string) fields.Selector
func (KubeAPIWrapper) StreamWithContext ¶ added in v1.4.5
func (ku KubeAPIWrapper) StreamWithContext(ctx context.Context, exec remotecommand.Executor, options remotecommand.StreamOptions) error
func (KubeAPIWrapper) SubResource ¶ added in v1.4.5
func (ku KubeAPIWrapper) SubResource(clientset kubernetes.Interface, podName string, podNamespace string) *rest.Request
func (KubeAPIWrapper) UntilWithSync ¶ added in v1.4.5
func (ku KubeAPIWrapper) UntilWithSync(ctx context.Context, lw cache.ListerWatcher, objType runtime.Object, precondition watch2.PreconditionFunc, conditions ...watch2.ConditionFunc) (*watch.Event, error)
func (KubeAPIWrapper) Watch ¶ added in v1.4.5
func (ku KubeAPIWrapper) Watch(ctx context.Context, clientset kubernetes.Interface, namespace string, opts metav1.ListOptions) (watch.Interface, error)
type KubeAPIer ¶ added in v1.4.5
type KubeAPIer interface {
NewNotFound(schema.GroupResource, string) *apierrors.StatusError
OneTermEqualSelector(string, string) fields.Selector
NewForConfig(*rest.Config) (kubernetes.Interface, error)
GetLogs(kubernetes.Interface, string, string, *corev1.PodLogOptions) *rest.Request
Get(context.Context, kubernetes.Interface, string, string, metav1.GetOptions) (*corev1.Pod, error)
Create(context.Context, kubernetes.Interface, string, *corev1.Pod, metav1.CreateOptions) (*corev1.Pod, error)
List(context.Context, kubernetes.Interface, string, metav1.ListOptions) (*corev1.PodList, error)
Watch(context.Context, kubernetes.Interface, string, metav1.ListOptions) (watch.Interface, error)
Delete(context.Context, kubernetes.Interface, string, string, metav1.DeleteOptions) error
SubResource(kubernetes.Interface, string, string) *rest.Request
InClusterConfig() (*rest.Config, error)
NewDefaultClientConfigLoadingRules() *clientcmd.ClientConfigLoadingRules
BuildConfigFromFlags(string, string) (*rest.Config, error)
NewClientConfigFromBytes([]byte) (clientcmd.ClientConfig, error)
NewSPDYExecutor(*rest.Config, string, *url.URL) (remotecommand.Executor, error)
StreamWithContext(context.Context, remotecommand.Executor, remotecommand.StreamOptions) error
UntilWithSync(context.Context, cache.ListerWatcher, runtime.Object, watch2.PreconditionFunc, ...watch2.ConditionFunc) (*watch.Event, error)
NewFakeNeverRateLimiter() flowcontrol.RateLimiter
NewFakeAlwaysRateLimiter() flowcontrol.RateLimiter
}
type KubeExtraData ¶ added in v1.4.5
type KubeExtraData struct {
Image string
Command string
Params string
KubeNamespace string
KubeConfig string
KubePod string
PodName string
}
kubeExtraData is the content of the ExtraData JSON field for a Kubernetes worker.
type KubePodStateHelper ¶ added in v1.6.0
type KubeUnit ¶ added in v1.4.5
type KubeUnit struct {
BaseWorkUnitForWorkUnit
KubeAPIWrapperInstance KubeAPIer
Pod *corev1.Pod
// contains filtered or unexported fields
}
KubeUnit implements the WorkUnit interface.
func (*KubeUnit) Cancel ¶ added in v1.4.5
Cancel releases resources associated with a job, including cancelling it if running.
func (*KubeUnit) GetKubeRetryCount ¶ added in v1.6.0
func (*KubeUnit) GetKubeTimeoutStart ¶ added in v1.6.0
func (*KubeUnit) GetSleepDuration ¶ added in v1.6.0
func (*KubeUnit) KubeLoggingWithReconnect ¶ added in v1.5.2
func (KubeUnit) PodContainerHealthy ¶ added in v1.6.0
PodContainerHealthy checks if the pod has successfully completed its application logic. this is called after podInfrastructureSuccess has confirmed the pod is in a terminal state.
func (KubeUnit) PodHealthy ¶ added in v1.6.0
PodHealthy checks if the pod and container are in a healthy state.
func (*KubeUnit) ProcessLogLine ¶ added in v1.6.0
func (kw *KubeUnit) ProcessLogLine(line string, sinceTime time.Time, successfulWrite bool) (msg string, newSinceTime time.Time, shouldSkip bool)
ProcessLogLine handles timestamp parsing and stripping from log lines.
func (*KubeUnit) Release ¶ added in v1.4.5
Release releases resources associated with a job. Implies Cancel.
func (*KubeUnit) Restart ¶ added in v1.4.5
Restart resumes monitoring a job after a Receptor restart.
func (*KubeUnit) RunWorkUsingLogger ¶ added in v1.6.0
func (kw *KubeUnit) RunWorkUsingLogger()
RunWorkUsingLogger orchestrates the complete workflow for running work in a Kubernetes pod using logger-based streaming. This method is exposed publicly to enable comprehensive testing of the complex pod lifecycle, stdin/stdout streaming, and error handling logic.
The method handles: - Creating new pods or resuming existing ones - Setting up SPDY executors for stdin streaming - Managing goroutines for stdin/stdout coordination - Error propagation and status transitions - Proper cleanup and resource management.
func (*KubeUnit) SetClientset ¶ added in v1.5.6
func (kw *KubeUnit) SetClientset(clientset kubernetes.Interface)
SetClientset sets the clientset for testing purposes.
func (*KubeUnit) SetFromParams ¶ added in v1.4.5
SetFromParams sets the in-memory state from parameters.
func (*KubeUnit) Status ¶ added in v1.4.5
func (kw *KubeUnit) Status() *StatusFileData
Status returns a copy of the status currently loaded in memory.
func (*KubeUnit) UnredactedStatus ¶ added in v1.4.5
func (kw *KubeUnit) UnredactedStatus() *StatusFileData
Status returns a copy of the status currently loaded in memory.
type KubeWorkerCfg ¶ added in v1.4.0
type KubeWorkerCfg struct {
WorkType string `required:"true" description:"Name for this worker type"`
Namespace string `description:"Kubernetes namespace to create pods in"`
Image string `description:"Container image to use for the worker pod"`
Command string `description:"Command to run in the container (overrides entrypoint)"`
Params string `description:"Command-line parameters to pass to the entrypoint"`
AuthMethod string `description:"One of: kubeconfig, incluster" default:"incluster"`
KubeConfig string `description:"Kubeconfig filename (for authmethod=kubeconfig)"`
Pod string `description:"Pod definition filename, in json or yaml format"`
AllowRuntimeAuth bool `description:"Allow passing API parameters at runtime" default:"false"`
AllowRuntimeCommand bool `description:"Allow specifying image & command at runtime" default:"false"`
AllowRuntimeParams bool `description:"Allow adding command parameters at runtime" default:"false"`
AllowRuntimePod bool `description:"Allow passing Pod at runtime" default:"false"`
DeletePodOnRestart bool `description:"On restart, delete the pod if in pending state" default:"true"`
StreamMethod string `description:"Method for connecting to worker pods: logger or tcp" default:"logger"`
VerifySignature bool `description:"Verify a signed work submission" default:"false"`
}
KubeWorkerCfg is the cmdline configuration object for a Kubernetes worker plugin.
func (KubeWorkerCfg) GetVerifySignature ¶ added in v1.4.0
func (cfg KubeWorkerCfg) GetVerifySignature() bool
func (KubeWorkerCfg) GetWorkType ¶ added in v1.4.0
func (cfg KubeWorkerCfg) GetWorkType() string
func (KubeWorkerCfg) NewWorker ¶ added in v1.4.0
func (cfg KubeWorkerCfg) NewWorker(bwu BaseWorkUnitForWorkUnit, w *Workceptor, unitID string, workType string) WorkUnit
NewWorker is a factory to produce worker instances.
func (KubeWorkerCfg) NewkubeWorker ¶ added in v1.4.5
func (cfg KubeWorkerCfg) NewkubeWorker(bwu BaseWorkUnitForWorkUnit, w *Workceptor, unitID string, workType string, kawi KubeAPIer) WorkUnit
func (KubeWorkerCfg) Prepare ¶ added in v1.4.0
func (cfg KubeWorkerCfg) Prepare() error
Prepare inspects the configuration for validity.
func (KubeWorkerCfg) Run ¶ added in v1.4.0
func (cfg KubeWorkerCfg) Run() error
Run runs the action.
type NetceptorForWorkceptor ¶ added in v1.4.2
type NetceptorForWorkceptor interface {
NodeID() string
AddWorkCommand(typeName string, verifySignature bool) error
GetClientTLSConfig(name string, expectedHostName string, expectedHostNameType netceptor.ExpectedHostnameType) (*tls.Config, error) // have a common pkg for types
GetLogger() *logger.ReceptorLogger
DialContext(ctx context.Context, node string, service string, tlscfg *tls.Config) (*netceptor.Conn, error) // create an interface for Conn
}
NetceptorForWorkceptor is a interface to decouple workceptor from netceptor. it includes only the functions that workceptor uses.
type NewWorkerFunc ¶
type NewWorkerFunc func(bwu BaseWorkUnitForWorkUnit, w *Workceptor, unitID string, workType string) WorkUnit
NewWorkerFunc represents a factory of WorkUnit instances.
type RemoteExtraData ¶ added in v1.4.4
type RemoteExtraData struct {
RemoteNode string
RemoteWorkType string
RemoteParams map[string]string
RemoteUnitID string
RemoteStarted bool
LocalCancelled bool
LocalReleased bool
SignWork bool
TLSClient string
Expiration time.Time
}
RemoteExtraData is the content of the ExtraData JSON field for a remote work unit.
type STDinReader ¶ added in v1.4.2
type STDinReader struct {
// contains filtered or unexported fields
}
STDinReader reads from a stdin file and provides a Done function.
func NewStdinReader ¶ added in v1.4.2
func NewStdinReader(fs FileSystemer, unitdir string) (*STDinReader, error)
NewStdinReader allocates a new stdinReader, which reads from a stdin file and provides a Done function.
func (*STDinReader) Done ¶ added in v1.4.2
func (sr *STDinReader) Done() <-chan struct{}
Done returns a channel that will be closed on error (including EOF) in the reader.
func (*STDinReader) Error ¶ added in v1.4.2
func (sr *STDinReader) Error() error
Error returns the most recent error encountered in the reader.
func (*STDinReader) Read ¶ added in v1.4.2
func (sr *STDinReader) Read(p []byte) (n int, err error)
Read reads data from the stdout file, implementing io.Reader.
func (*STDinReader) SetReader ¶ added in v1.4.2
func (sr *STDinReader) SetReader(reader FileReadCloser)
SetReader sets the reader var.
type STDoutWriter ¶ added in v1.4.2
type STDoutWriter struct {
// contains filtered or unexported fields
}
STDoutWriter writes to a stdout file while also updating the status file.
func NewStdoutWriter ¶ added in v1.4.2
func NewStdoutWriter(fs FileSystemer, unitdir string) (*STDoutWriter, error)
NewStdoutWriter allocates a new stdoutWriter, which writes to both the stdout and status files.
func (*STDoutWriter) SetWriter ¶ added in v1.4.2
func (sw *STDoutWriter) SetWriter(writer FileWriteCloser)
SetWriter sets the writer var.
func (*STDoutWriter) Size ¶ added in v1.4.2
func (sw *STDoutWriter) Size() int64
Size returns the current size of the stdout file.
type ServerForWorkceptor ¶ added in v1.4.4
type ServerForWorkceptor interface {
AddControlFunc(name string, cType controlsvc.ControlCommandType) error
ConnectionListener(ctx context.Context, listener net.Listener)
RunControlSession(conn net.Conn)
RunControlSvc(ctx context.Context, service string, tlscfg *tls.Config, unixSocket string, unixSocketPermissions fs.FileMode, tcpListen string, tcptls *tls.Config) error
SetServerNet(n controlsvc.Neter)
SetServerTLS(t controlsvc.Tlser)
SetServerUtils(u controlsvc.Utiler)
SetupConnection(conn net.Conn)
}
type SigningKeyPrivateCfg ¶ added in v1.4.0
type SigningKeyPrivateCfg struct {
PrivateKey string `description:"Private key to sign work submissions" barevalue:"yes" default:""`
TokenExpiration string `description:"Expiration of the signed json web token, e.g. 3h or 3h30m" default:""`
}
func (SigningKeyPrivateCfg) Prepare ¶ added in v1.4.0
func (cfg SigningKeyPrivateCfg) Prepare() error
func (SigningKeyPrivateCfg) PrepareSigningKeyPrivateCfg ¶ added in v1.4.0
func (cfg SigningKeyPrivateCfg) PrepareSigningKeyPrivateCfg() (*time.Duration, error)
type StatusFileData ¶
type StatusFileData struct {
State int
Detail string
StdoutSize int64
WorkType string
ExtraData interface{}
}
StatusFileData is the structure of the JSON data saved to a status file. This struct should only contain value types, except for ExtraData.
func (*StatusFileData) Load ¶
func (sfd *StatusFileData) Load(filename string) error
Load loads status from a file.
func (*StatusFileData) Save ¶
func (sfd *StatusFileData) Save(filename string) error
Save saves status to a file.
func (*StatusFileData) UpdateBasicStatus ¶
func (sfd *StatusFileData) UpdateBasicStatus(filename string, state int, detail string, stdoutSize int64) error
UpdateBasicStatus atomically updates key fields in the status metadata file. Errors are logged rather than returned. Passing -1 as stdoutSize leaves it unchanged.
func (*StatusFileData) UpdateFullStatus ¶
func (sfd *StatusFileData) UpdateFullStatus(filename string, statusFunc func(*StatusFileData)) error
UpdateFullStatus atomically updates the status metadata file. Changes should be made in the callback function. Errors are logged rather than returned.
type VerifyingKeyPublicCfg ¶ added in v1.4.0
type VerifyingKeyPublicCfg struct {
PublicKey string `description:"Public key to verify signed work submissions" barevalue:"yes" default:""`
}
func (VerifyingKeyPublicCfg) Prepare ¶ added in v1.4.0
func (cfg VerifyingKeyPublicCfg) Prepare() error
func (VerifyingKeyPublicCfg) PrepareVerifyingKeyPublicCfg ¶ added in v1.4.0
func (cfg VerifyingKeyPublicCfg) PrepareVerifyingKeyPublicCfg() error
type WorkPythonCfg ¶ added in v1.4.9
type WorkPythonCfg struct {
WorkType string `required:"true" description:"Name for this worker type"`
Plugin string `required:"true" description:"Python module name of the worker plugin"`
Function string `required:"true" description:"Receptor-exported function to call"`
Config map[string]interface{} `description:"Plugin-specific configuration"`
}
workPythonCfg is the cmdline configuration object for a Python worker plugin.
func (WorkPythonCfg) NewWorker ¶ added in v1.4.9
func (cfg WorkPythonCfg) NewWorker(_ BaseWorkUnitForWorkUnit, w *Workceptor, unitID string, workType string) WorkUnit
NewWorker is a factory to produce worker instances.
func (WorkPythonCfg) Run ¶ added in v1.4.9
func (cfg WorkPythonCfg) Run() error
Run runs the action.
type WorkUnit ¶
type WorkUnit interface {
ID() string
UnitDir() string
StatusFileName() string
StdoutFileName() string
Save() error
Load() error
SetFromParams(params map[string]string) error
UpdateBasicStatus(state int, detail string, stdoutSize int64)
UpdateFullStatus(statusFunc func(*StatusFileData))
LastUpdateError() error
Status() *StatusFileData
UnredactedStatus() *StatusFileData
Start() error
Restart() error
Cancel() error
Release(force bool) error
}
WorkUnit represents a local unit of work.
func NewRemoteWorker ¶ added in v1.4.4
func NewRemoteWorker(bwu BaseWorkUnitForWorkUnit, w *Workceptor, unitID, workType string) WorkUnit
type Workceptor ¶
type Workceptor struct {
Cancel context.CancelFunc
SigningKey string
SigningExpiration time.Duration
VerifyingKey string
// contains filtered or unexported fields
}
Workceptor is the main object that handles unit-of-work management.
var MainInstance *Workceptor
MainInstance is the global instance of Workceptor instantiated by the command-line main() function.
func New ¶
func New(ctx context.Context, nc NetceptorForWorkceptor, baseDir string) (*Workceptor, error)
New constructs a new Workceptor instance. baseDir is the parent directory where node-specific work directories will be created. A node-specific subdirectory will be created under baseDir using the node ID.
func (*Workceptor) AllocateRemoteUnit ¶
func (w *Workceptor) AllocateRemoteUnit(remoteNode, remoteWorkType, workUnitID string, tlsClient, ttl string, signWork bool, params map[string]string) (WorkUnit, error)
AllocateRemoteUnit creates a new remote work unit and generates a local identifier for it.
func (*Workceptor) AllocateUnit ¶
func (w *Workceptor) AllocateUnit(workTypeName string, workUnitID string, params map[string]string) (WorkUnit, error)
AllocateUnit creates a new local work unit and generates an identifier for it.
func (*Workceptor) CancelUnit ¶
func (w *Workceptor) CancelUnit(unitID string) error
CancelUnit cancels a unit of work, killing any processes.
func (*Workceptor) GetResults ¶
func (w *Workceptor) GetResults(ctx context.Context, unitID string, startPos int64) (chan []byte, error)
GetResults returns a live stream of the results of a unit.
func (*Workceptor) ListKnownUnitIDs ¶
func (w *Workceptor) ListKnownUnitIDs() []string
ListKnownUnitIDs returns a slice containing the known unit IDs.
func (*Workceptor) RegisterWithControlService ¶
func (w *Workceptor) RegisterWithControlService(cs ServerForWorkceptor) error
RegisterWithControlService registers this workceptor instance with a control service instance.
func (*Workceptor) RegisterWorker ¶
func (w *Workceptor) RegisterWorker(typeName string, newWorkerFunc NewWorkerFunc, verifySignature bool) error
RegisterWorker notifies the Workceptor of a new kind of work that can be done.
func (*Workceptor) ReleaseUnit ¶
func (w *Workceptor) ReleaseUnit(unitID string, force bool) error
ReleaseUnit releases (deletes) resources from a unit of work, including stdout. Release implies Cancel.
func (*Workceptor) ShouldVerifySignature ¶
func (w *Workceptor) ShouldVerifySignature(workType string, signWork bool) bool
func (*Workceptor) StartUnit ¶
func (w *Workceptor) StartUnit(unitID string) error
StartUnit starts a unit of work.
func (*Workceptor) UnitStatus ¶
func (w *Workceptor) UnitStatus(unitID string) (*StatusFileData, error)
UnitStatus returns the state of a unit.
func (*Workceptor) VerifySignature ¶
func (w *Workceptor) VerifySignature(signature string) error
type WorkerConfig ¶ added in v1.4.0
type WorkerConfig interface {
GetWorkType() string
GetVerifySignature() bool
NewWorker(bwu BaseWorkUnitForWorkUnit, w *Workceptor, unitID string, workType string) WorkUnit
}
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package mock_workceptor is a generated GoMock package.
|
Package mock_workceptor is a generated GoMock package. |