@@ -30,8 +30,8 @@ import (
3030 "path/filepath"
3131 "regexp"
3232 "strings"
33- "syscall"
3433 "sync"
34+ "syscall"
3535 "time"
3636
3737 "github.com/apache/beam/sdks/v2/go/pkg/beam/artifact"
@@ -216,10 +216,10 @@ func launchSDKProcess() error {
216216
217217 // Keep track of child PIDs for clean shutdown without zombies
218218 childPids := struct {
219- v []int
219+ v []int
220220 canceled bool
221- mu sync.Mutex
222- } {v : make ([]int , 0 , len (workerIds ))}
221+ mu sync.Mutex
222+ }{v : make ([]int , 0 , len (workerIds ))}
223223
224224 // Forward trapped signals to child process groups in order to terminate them gracefully and avoid zombies
225225 go func () {
@@ -251,20 +251,33 @@ func launchSDKProcess() error {
251251 go func (workerId string ) {
252252 defer wg .Done ()
253253
254- childPids .mu .Lock ()
255- if childPids .canceled {
254+ errorCount := 0
255+ for {
256+ childPids .mu .Lock ()
257+ if childPids .canceled {
258+ childPids .mu .Unlock ()
259+ return
260+ }
261+ log .Printf ("Executing Python (worker %v): python %v" , workerId , strings .Join (args , " " ))
262+ cmd := StartCommandEnv (map [string ]string {"WORKER_ID" : workerId }, "python" , args ... )
263+ childPids .v = append (childPids .v , cmd .Process .Pid )
256264 childPids .mu .Unlock ()
257- return
258- }
259- log .Printf ("Executing Python (worker %v): python %v" , workerId , strings .Join (args , " " ))
260- cmd := StartCommandEnv (map [string ]string {"WORKER_ID" : workerId }, "python" , args ... )
261- childPids .v = append (childPids .v , cmd .Process .Pid )
262- childPids .mu .Unlock ()
263-
264- if err := cmd .Wait (); err != nil {
265- log .Printf ("Python (worker %v) exited: %v" , workerId , err )
266- } else {
267- log .Printf ("Python (worker %v) exited." , workerId )
265+
266+ if err := cmd .Wait (); err != nil {
267+ // Retry on fatal errors, like OOMs and segfaults, not just
268+ // DoFns throwing exceptions.
269+ errorCount += 1
270+ if errorCount < 4 {
271+ log .Printf ("Python (worker %v) exited %v times: %v\n restarting SDK process" ,
272+ workerId , errorCount , err )
273+ } else {
274+ log .Fatalf ("Python (worker %v) exited %v times: %v\n out of retries, failing container" ,
275+ workerId , errorCount , err )
276+ }
277+ } else {
278+ log .Printf ("Python (worker %v) exited." , workerId )
279+ break
280+ }
268281 }
269282 }(workerId )
270283 }
@@ -297,7 +310,7 @@ func StartCommandEnv(env map[string]string, prog string, args ...string) *exec.C
297310func setupVenv (baseDir , workerId string ) (string , error ) {
298311 log .Printf ("Initializing temporary Python venv ..." )
299312
300- dir := filepath .Join (baseDir , "beam-venv-worker-" + workerId )
313+ dir := filepath .Join (baseDir , "beam-venv-worker-" + workerId )
301314 if _ , err := os .Stat (dir ); ! os .IsNotExist (err ) {
302315 // Probably leftovers from a previous run
303316 log .Printf ("Cleaning up previous venv ..." )
0 commit comments