Skip to content

Commit f552d49

Browse files
Catch non-zero exit from pipe commands
This will allow problems with piped commands to be detected. This will also allow tasks to be retried where errors are rare (such as network problems in piped commands).
1 parent df34793 commit f552d49

File tree

1 file changed

+5
-1
lines changed

1 file changed

+5
-1
lines changed

python/pyspark/rdd.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -704,7 +704,11 @@ def pipe_objs(out):
704704
out.write(s.encode('utf-8'))
705705
out.close()
706706
Thread(target=pipe_objs, args=[pipe.stdin]).start()
707-
return (x.rstrip(b'\n').decode('utf-8') for x in iter(pipe.stdout.readline, b''))
707+
result = (x.rstrip(b'\n').decode('utf-8') for x in iter(pipe.stdout.readline, b''))
708+
pipe.wait()
709+
if pipe.returncode:
710+
raise Exception("Pipe function `%s' exited with error code %d" %(command, pipe.returncode) )
711+
return result
708712
return self.mapPartitions(func)
709713

710714
def foreach(self, f):

0 commit comments

Comments
 (0)