Skip to content

Commit 8cfee7d

Browse files
authored
Merge pull request apache#25473 Add retry logic to Python boot script.
2 parents 5b4edaf + e999995 commit 8cfee7d

1 file changed

Lines changed: 31 additions & 18 deletions

File tree

sdks/python/container/boot.go

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ import (
3030
"path/filepath"
3131
"regexp"
3232
"strings"
33-
"syscall"
3433
"sync"
34+
"syscall"
3535
"time"
3636

3737
"github.com/apache/beam/sdks/v2/go/pkg/beam/artifact"
@@ -216,10 +216,10 @@ func launchSDKProcess() error {
216216

217217
// Keep track of child PIDs for clean shutdown without zombies
218218
childPids := struct {
219-
v []int
219+
v []int
220220
canceled bool
221-
mu sync.Mutex
222-
} {v: make([]int, 0, len(workerIds))}
221+
mu sync.Mutex
222+
}{v: make([]int, 0, len(workerIds))}
223223

224224
// Forward trapped signals to child process groups in order to terminate them gracefully and avoid zombies
225225
go func() {
@@ -251,20 +251,33 @@ func launchSDKProcess() error {
251251
go func(workerId string) {
252252
defer wg.Done()
253253

254-
childPids.mu.Lock()
255-
if childPids.canceled {
254+
errorCount := 0
255+
for {
256+
childPids.mu.Lock()
257+
if childPids.canceled {
258+
childPids.mu.Unlock()
259+
return
260+
}
261+
log.Printf("Executing Python (worker %v): python %v", workerId, strings.Join(args, " "))
262+
cmd := StartCommandEnv(map[string]string{"WORKER_ID": workerId}, "python", args...)
263+
childPids.v = append(childPids.v, cmd.Process.Pid)
256264
childPids.mu.Unlock()
257-
return
258-
}
259-
log.Printf("Executing Python (worker %v): python %v", workerId, strings.Join(args, " "))
260-
cmd := StartCommandEnv(map[string]string{"WORKER_ID": workerId}, "python", args...)
261-
childPids.v = append(childPids.v, cmd.Process.Pid)
262-
childPids.mu.Unlock()
263-
264-
if err := cmd.Wait(); err != nil {
265-
log.Printf("Python (worker %v) exited: %v", workerId, err)
266-
} else {
267-
log.Printf("Python (worker %v) exited.", workerId)
265+
266+
if err := cmd.Wait(); err != nil {
267+
// Retry on fatal errors, like OOMs and segfaults, not just
268+
// DoFns throwing exceptions.
269+
errorCount += 1
270+
if errorCount < 4 {
271+
log.Printf("Python (worker %v) exited %v times: %v\nrestarting SDK process",
272+
workerId, errorCount, err)
273+
} else {
274+
log.Fatalf("Python (worker %v) exited %v times: %v\nout of retries, failing container",
275+
workerId, errorCount, err)
276+
}
277+
} else {
278+
log.Printf("Python (worker %v) exited.", workerId)
279+
break
280+
}
268281
}
269282
}(workerId)
270283
}
@@ -297,7 +310,7 @@ func StartCommandEnv(env map[string]string, prog string, args ...string) *exec.C
297310
func setupVenv(baseDir, workerId string) (string, error) {
298311
log.Printf("Initializing temporary Python venv ...")
299312

300-
dir := filepath.Join(baseDir, "beam-venv-worker-" + workerId)
313+
dir := filepath.Join(baseDir, "beam-venv-worker-"+workerId)
301314
if _, err := os.Stat(dir); !os.IsNotExist(err) {
302315
// Probably leftovers from a previous run
303316
log.Printf("Cleaning up previous venv ...")

0 commit comments

Comments
 (0)