Skip to content

Commit fea9c4b

Browse files
[BEAM-13386] Add RLock support for cloudpickle (#16250)
1 parent a9e40c2 commit fea9c4b

3 files changed

Lines changed: 23 additions & 0 deletions

File tree

sdks/python/apache_beam/internal/cloudpickle_pickler.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
# Pickling, especially unpickling, causes broken module imports on Python 3
4444
# if executed concurrently, see: BEAM-8651, http://bugs.python.org/issue38884.
4545
_pickle_lock = threading.RLock()
46+
RLOCK_TYPE = type(_pickle_lock)
4647

4748

4849
def dumps(o, enable_trace=True, use_zlib=False):
@@ -56,6 +57,10 @@ def dumps(o, enable_trace=True, use_zlib=False):
5657
pickler.dispatch_table[type(flags.FLAGS)] = _pickle_absl_flags
5758
except NameError:
5859
pass
60+
try:
61+
pickler.dispatch_table[RLOCK_TYPE] = _pickle_rlock
62+
except NameError:
63+
pass
5964
pickler.dump(o)
6065
s = file.getvalue()
6166

@@ -99,6 +104,10 @@ def _create_absl_flags():
99104
return flags.FLAGS
100105

101106

107+
def _pickle_rlock(obj):
108+
return RLOCK_TYPE, tuple([])
109+
110+
102111
def dump_session(file_path):
103112
# It is possible to dump session with cloudpickle. However, since references
104113
# are saved it should not be necessary. See https://s.apache.org/beam-picklers

sdks/python/apache_beam/internal/cloudpickle_pickler_test.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
# pytype: skip-file
2121

2222
import sys
23+
import threading
2324
import types
2425
import unittest
2526

@@ -91,6 +92,12 @@ def foo():
9192

9293
self.assertEqual('expected_value', loads(dumps(foo))())
9394

95+
def test_pickle_rlock(self):
96+
rlock_instance = threading.RLock()
97+
rlock_type = type(rlock_instance)
98+
99+
self.assertIsInstance(loads(dumps(rlock_instance)), rlock_type)
100+
94101
@unittest.skipIf(NO_MAPPINGPROXYTYPE, 'test if MappingProxyType introduced')
95102
def test_dump_and_load_mapping_proxy(self):
96103
self.assertEqual(

sdks/python/apache_beam/internal/pickler_test.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
# pytype: skip-file
2121

2222
import sys
23+
import threading
2324
import types
2425
import unittest
2526

@@ -87,6 +88,12 @@ def test_recursive_class(self):
8788
'RecursiveClass:abc',
8889
loads(dumps(module_test.RecursiveClass('abc').datum)))
8990

91+
def test_pickle_rlock(self):
92+
rlock_instance = threading.RLock()
93+
rlock_type = type(rlock_instance)
94+
95+
self.assertIsInstance(loads(dumps(rlock_instance)), rlock_type)
96+
9097
@unittest.skipIf(NO_MAPPINGPROXYTYPE, 'test if MappingProxyType introduced')
9198
def test_dump_and_load_mapping_proxy(self):
9299
self.assertEqual(

0 commit comments

Comments
 (0)