Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 28 additions & 29 deletions agent/agent_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,35 +8,34 @@ import (
// AgentConfiguration is the run-time configuration for an agent that
// has been loaded from the config file and command-line params
type AgentConfiguration struct {
ConfigPath string
BootstrapScript string
BuildPath string
HooksPath string
AdditionalHooksPaths []string
SocketsPath string
GitMirrorsPath string
GitMirrorsLockTimeout int
GitMirrorsSkipUpdate bool
PluginsPath string
GitCheckoutFlags string
GitCloneFlags string
GitCloneMirrorFlags string
GitCleanFlags string
GitFetchFlags string
GitSubmodules bool
AllowedRepositories []*regexp.Regexp
AllowedPlugins []*regexp.Regexp
AllowedEnvironmentVariables []*regexp.Regexp
SSHKeyscan bool
CommandEval bool
PluginsEnabled bool
PluginValidation bool
PluginsAlwaysCloneFresh bool
LocalHooksEnabled bool
StrictSingleHooks bool
RunInPty bool
KubernetesExec bool
KubernetesLogCollectionGracePeriod time.Duration
ConfigPath string
BootstrapScript string
BuildPath string
HooksPath string
AdditionalHooksPaths []string
SocketsPath string
GitMirrorsPath string
GitMirrorsLockTimeout int
GitMirrorsSkipUpdate bool
PluginsPath string
GitCheckoutFlags string
GitCloneFlags string
GitCloneMirrorFlags string
GitCleanFlags string
GitFetchFlags string
GitSubmodules bool
AllowedRepositories []*regexp.Regexp
AllowedPlugins []*regexp.Regexp
AllowedEnvironmentVariables []*regexp.Regexp
SSHKeyscan bool
CommandEval bool
PluginsEnabled bool
PluginValidation bool
PluginsAlwaysCloneFresh bool
LocalHooksEnabled bool
StrictSingleHooks bool
RunInPty bool
KubernetesExec bool

SigningJWKSFile string // Where to find the key to sign pipeline uploads with (passed through to jobs, they might be uploading pipelines)
SigningJWKSKeyID string // The key ID to sign pipeline uploads with
Expand Down
2 changes: 1 addition & 1 deletion agent/agent_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ func (a *AgentWorker) Stop(graceful bool) {
a.logger.Info("Gracefully stopping agent. Since there is no job running, the agent will disconnect immediately")
}
}
} else {
} else { // ungraceful
// If there's a job running, kill it, then disconnect
if a.jobRunner != nil {
a.logger.Info("Forcefully stopping agent. The current job will be canceled before disconnecting...")
Expand Down
66 changes: 12 additions & 54 deletions agent/run_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,11 +373,6 @@ One or more containers connected to the agent, but then stopped communicating wi
func (r *JobRunner) cleanup(ctx context.Context, wg *sync.WaitGroup, exit core.ProcessExit, ignoreAgentInDispatches *bool) {
finishedAt := time.Now()

// In Kubernetes mode, wait for command containers to finish before stopping log collection
if r.conf.KubernetesExec {
r.waitForKubernetesProcessesToComplete(ctx)
}

// Flush the job logs. If the process is never started, then logs from prior to the attempt to
// start the process will still be buffered. Also, there may still be logs in the buffer that
// were left behind because the uploader goroutine exited before it could flush them.
Expand Down Expand Up @@ -432,50 +427,6 @@ func (r *JobRunner) cleanup(ctx context.Context, wg *sync.WaitGroup, exit core.P
r.agentLogger.Info("Finished job %s for build at %s", r.conf.Job.ID, r.conf.Job.Env["BUILDKITE_BUILD_URL"])
}

// waitForKubernetesProcessesToComplete waits for Kubernetes command containers to finish
// before stopping log collection, ensuring post-command hook logs are captured.
func (r *JobRunner) waitForKubernetesProcessesToComplete(ctx context.Context) {
r.agentLogger.Debug("[JobRunner] Waiting for Kubernetes processes to complete before stopping log collection")

// Guard against nil process
if r.process == nil {
r.agentLogger.Debug("[JobRunner] No process to wait for")
return
}

// Wait for the process to actually start before waiting for it to complete
select {
case <-r.process.Started():
// Process has started, we can now safely wait for completion
case <-ctx.Done():
r.agentLogger.Debug("[JobRunner] Context cancelled before process started, skipping wait")
return
}

gracePeriod := r.conf.AgentConfiguration.KubernetesLogCollectionGracePeriod

waitCtx := ctx
if gracePeriod >= 0 {
r.agentLogger.Debug("[JobRunner] Using log collection grace period: %v", gracePeriod)
var cancel context.CancelFunc
waitCtx, cancel = context.WithTimeout(ctx, gracePeriod)
defer cancel()
} else {
r.agentLogger.Debug("[JobRunner] No log collection grace period configured, waiting until process completes")
}

select {
case <-r.process.Done():
r.agentLogger.Debug("[JobRunner] Kubernetes processes completed, stopping log collection")
case <-waitCtx.Done():
if ctx.Err() != nil {
r.agentLogger.Info("[JobRunner] Parent context cancelled while waiting for Kubernetes processes")
} else {
r.agentLogger.Info("[JobRunner] Timeout waiting for Kubernetes processes, stopping log collection")
}
}
}

// streamJobLogsAfterProcessStart waits for the process to start, then grabs the job output
// every few seconds and sends it back to Buildkite.
func (r *JobRunner) streamJobLogsAfterProcessStart(ctx context.Context, wg *sync.WaitGroup) {
Expand Down Expand Up @@ -575,9 +526,9 @@ func (r *JobRunner) Cancel() error {
}

r.agentLogger.Info(
"Canceling job %s with a grace period of %ds %s",
"Canceling job %s with a signal grace period of %v %s",
r.conf.Job.ID,
r.conf.AgentConfiguration.CancelGracePeriod,
r.conf.AgentConfiguration.SignalGracePeriod,
reason,
)

Expand All @@ -589,9 +540,16 @@ func (r *JobRunner) Cancel() error {
}

select {
// Grace period for cancelling
case <-time.After(time.Second * time.Duration(r.conf.AgentConfiguration.CancelGracePeriod)):
r.agentLogger.Info("Job %s hasn't stopped in time, terminating", r.conf.Job.ID)
// Grace period between Interrupt and Terminate = the signal grace period.
// Extra time between the end of the signal grace period and the end of the
// cancel grace period is the time we (agent side) needs to upload logs and
// disconnect (if the agent is exiting).
case <-time.After(r.conf.AgentConfiguration.SignalGracePeriod):
r.agentLogger.Info(
"Job %s hasn't stopped within %v, terminating",
r.conf.Job.ID,
r.conf.AgentConfiguration.SignalGracePeriod,
)

// Terminate the process as we've exceeded our context
return r.process.Terminate()
Expand Down
Loading