-
Notifications
You must be signed in to change notification settings - Fork 26.3k
[RFC] make python udf serialization format to be binary plus tensor tables #26314
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…ables make python udf serialization format to be binary plus tensor tables, so that tensors can be attached to autograd graph, handled in the same way as builtin operators Differential Revision: [D17405686](https://our.internmc.facebook.com/intern/diff/D17405686/) [ghstack-poisoned]
…ables make python udf serialization format to be binary plus tensor tables, so that tensors can be attached to autograd graph, handled in the same way as builtin operators Differential Revision: [D17405686](https://our.internmc.facebook.com/intern/diff/D17405686/) ghstack-source-id: 90192887 Pull Request resolved: #26314
|
initial diff to proof of concept, one basic unit test passed; planning to add more unit tests and optimize codes a little bit |
…us tensor tables" make python udf serialization format to be binary plus tensor tables, so that tensors can be attached to autograd graph, handled in the same way as builtin operators Differential Revision: [D17405686](https://our.internmc.facebook.com/intern/diff/D17405686/) [ghstack-poisoned]
…us tensor tables" make python udf serialization format to be binary plus tensor tables, so that tensors can be attached to autograd graph, handled in the same way as builtin operators Differential Revision: [D17405686](https://our.internmc.facebook.com/intern/diff/D17405686/) [ghstack-poisoned]
…us tensor tables" make python udf serialization format to be binary plus tensor tables, so that tensors can be attached to autograd graph, handled in the same way as builtin operators Differential Revision: [D17405686](https://our.internmc.facebook.com/intern/diff/D17405686/) [ghstack-poisoned]
…ables Pull Request resolved: #26314 make python udf serialization format to be binary plus tensor tables, so that tensors can be attached to autograd graph, handled in the same way as builtin operators ghstack-source-id: 90311519 Differential Revision: [D17405686](https://our.internmc.facebook.com/intern/diff/D17405686/)
…us tensor tables" make python udf serialization format to be binary plus tensor tables, so that tensors can be attached to autograd graph, handled in the same way as builtin operators Differential Revision: [D17405686](https://our.internmc.facebook.com/intern/diff/D17405686/) [ghstack-poisoned]
…ables Pull Request resolved: #26314 make python udf serialization format to be binary plus tensor tables, so that tensors can be attached to autograd graph, handled in the same way as builtin operators ghstack-source-id: 90345144 Differential Revision: [D17405686](https://our.internmc.facebook.com/intern/diff/D17405686/)
pritamdamania87
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great! Just a few comments inline.
|
|
||
| # it is used to test python user defined function with tensor args over rpc | ||
| def my_tensor_function(a, b): | ||
| return a + b |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: empty line after this function.
| return a + b + c | ||
|
|
||
| # it is used to test python user defined function with tensor args over rpc | ||
| def my_tensor_function(a, b): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use type annotations? def my_tensor_function(torch.Tensor a, torch.Tensor b)?
test/test_rpc.py
Outdated
| return a + b | ||
| # it is used to test python user defined function with args that contain | ||
| # tensors over rpc | ||
| # a: list, b: my_class, c: dict |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use type annotations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
type annotation failed in CI python 2 build, although we skipped python 2 tests, but the helper function still need to build in python 2
test/test_rpc.py
Outdated
| n = self.rank + 1 | ||
| dst_rank = n % self.world_size | ||
| a = [torch.ones(n, n), torch.ones(n, n)] | ||
| b = my_class(torch.ones(n, n)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make my_class hold a really complex object with tensors inside? This would be a nice test case. One example would be something like: Tuple[Tensor, List[List[Tensor]], Dict[Tuple[Tensor, List[Tensor]], List[Tensor]]
| global _thread_local_tensor_tables | ||
| return _thread_local_tensor_tables.recv_tables[tensor_index] | ||
|
|
||
| def tensor_reducer(obj): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should add some python docs to all public methods in this file (ones without _).
| tensor_index = len(_thread_local_tensor_tables.send_tables) - 1 | ||
| return (_tensor_receiver, (tensor_index, )) | ||
|
|
||
| if sys.version_info >= (3, 0): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can we use six.PY3 instead? Also, IIRC, we only support py3 right? Do we still need this check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, because when internal_rpc_util module is imported by other files, the global variables will be checked at build time. it will fail to build in python2 if not adding the check
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why six.PY3 is better? trying to be consistent with other rpc files that are using sys.version_info
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just find six.PY3 to be nicer since its much more concise that doing comparisons on sys.version_info.
| _dispatch_table = copyreg.dispatch_table.copy() | ||
| _dispatch_table[torch.Tensor] = tensor_reducer | ||
|
|
||
| def serialize(obj): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can tensor_reducer and serialize be private?
| f = io.BytesIO() | ||
| p = pickle.Pickler(f) | ||
| p.dispatch_table = copyreg.dispatch_table.copy() | ||
| p.dispatch_table = _dispatch_table |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't _dispatch_table be global here?
| p.dispatch_table = copyreg.dispatch_table.copy() | ||
| p.dispatch_table = _dispatch_table | ||
| global _thread_local_tensor_tables | ||
| _thread_local_tensor_tables.send_tables = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not immediately clear why this is needed. I see it's related to the reducer in the dispatch table. Would it be possible to turn the dispatch table into a first class object and associate some methods with it? E.g. _dispatch_table.clear_send_tables() or something? Then we can keep all the logic related to pickling/unpickling in one place vs. mucking with globals.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pietern in order to achieve that we'd need a per-serialization dispatch table (or at least a thread-local dispatch table). I don't know what's the relative cost of copying the dispatch table, but I'd rather have a global dispatch table and a thread-local pickling context object instead.
===
On a separate note, I believe that there is a corner case not covered here: it is possible for a custom reducer to call an RPC. In that case you'll have nested calls to serialize on the same thread. Example:
class MyCustomClass:
def __getstate__(self):
dist.rpc(my_random_remote_function_called_inside_of_serialize, ..., to='w1')
return (...)
m = MyCustomClass()
rpc(my_function_taking_a_custom_class_instance, args=(m, ), 'w1')
In order to cover this corner case you'll need to save a possible current _thread_local_tensor_tables.send_tables and restore it after the call. Even if we go with alternative solutions such as suggested by Pieter we'll have to deal with this case.
…us tensor tables" make python udf serialization format to be binary plus tensor tables, so that tensors can be attached to autograd graph, handled in the same way as builtin operators Differential Revision: [D17405686](https://our.internmc.facebook.com/intern/diff/D17405686/) [ghstack-poisoned]
…ables Pull Request resolved: #26314 make python udf serialization format to be binary plus tensor tables, so that tensors can be attached to autograd graph, handled in the same way as builtin operators ghstack-source-id: 90539986 Differential Revision: [D17405686](https://our.internmc.facebook.com/intern/diff/D17405686/)
|
wrapped everything into InternalRPCPickler class, added complex class test case, and addressed some nits, tests passed locally |
aazzolini
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See inline comment for a potential corner case.
test/test_rpc.py
Outdated
| return a + b | ||
|
|
||
|
|
||
| def my_complex_tensor_function( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing that you didn't seem to have tested is UDFs that return a complex value containing tensors inside. We should test that as well as this will exercise a different serialization / deserialization codepath.
test/test_rpc.py
Outdated
| futs = [] | ||
| n = self.rank + 1 | ||
| dst_rank = n % self.world_size | ||
| for i in range(10): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason to make this range(10) instead of say range(100)? just making it more likely to exercise race conditions.
| p.dispatch_table = copyreg.dispatch_table.copy() | ||
| p.dispatch_table = _dispatch_table | ||
| global _thread_local_tensor_tables | ||
| _thread_local_tensor_tables.send_tables = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pietern in order to achieve that we'd need a per-serialization dispatch table (or at least a thread-local dispatch table). I don't know what's the relative cost of copying the dispatch table, but I'd rather have a global dispatch table and a thread-local pickling context object instead.
===
On a separate note, I believe that there is a corner case not covered here: it is possible for a custom reducer to call an RPC. In that case you'll have nested calls to serialize on the same thread. Example:
class MyCustomClass:
def __getstate__(self):
dist.rpc(my_random_remote_function_called_inside_of_serialize, ..., to='w1')
return (...)
m = MyCustomClass()
rpc(my_function_taking_a_custom_class_instance, args=(m, ), 'w1')
In order to cover this corner case you'll need to save a possible current _thread_local_tensor_tables.send_tables and restore it after the call. Even if we go with alternative solutions such as suggested by Pieter we'll have to deal with this case.
| # udf, return serialized result and tensor tables | ||
| def run_python_udf_internal(pickled_python_udf, tensors): | ||
| global _thread_local_tensor_tables | ||
| _thread_local_tensor_tables.recv_tables = tensors |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe nested deserialization could happen in the same thread too -- if for example setstate in a class actually calls into rpc -- see comment above.
…us tensor tables" make python udf serialization format to be binary plus tensor tables, so that tensors can be attached to autograd graph, handled in the same way as builtin operators Differential Revision: [D17405686](https://our.internmc.facebook.com/intern/diff/D17405686/) [ghstack-poisoned]
…us tensor tables" make python udf serialization format to be binary plus tensor tables, so that tensors can be attached to autograd graph, handled in the same way as builtin operators Differential Revision: [D17405686](https://our.internmc.facebook.com/intern/diff/D17405686/) [ghstack-poisoned]
…ables Pull Request resolved: #26314 make python udf serialization format to be binary plus tensor tables, so that tensors can be attached to autograd graph, handled in the same way as builtin operators ghstack-source-id: 90705251 Differential Revision: [D17405686](https://our.internmc.facebook.com/intern/diff/D17405686/)
…us tensor tables" make python udf serialization format to be binary plus tensor tables, so that tensors can be attached to autograd graph, handled in the same way as builtin operators Differential Revision: [D17405686](https://our.internmc.facebook.com/intern/diff/D17405686/) [ghstack-poisoned]
…ables Pull Request resolved: #26314 make python udf serialization format to be binary plus tensor tables, so that tensors can be attached to autograd graph, handled in the same way as builtin operators ghstack-source-id: 90731927 Differential Revision: [D17405686](https://our.internmc.facebook.com/intern/diff/D17405686/)
|
added changes and test cases to handle nested serialization and deserialization, also addressed other comments, tests are green so far |
|
@pytorchbot retest this please |
aazzolini
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good from my side, i'll leave for someone else to accept in case there's still pending requests.
|
|
||
| # save _thread_local_tensor_tables.send_tables if it is in nested call | ||
| global _thread_local_tensor_tables | ||
| if hasattr(_thread_local_tensor_tables, "send_tables"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe you can simplify this with :
old_send_tables = _thread_local_tensor_tables.send_tables if hasattr('send_tables', _thread_local_tensor_tqbles) else None
no need for should_restore.
…us tensor tables" make python udf serialization format to be binary plus tensor tables, so that tensors can be attached to autograd graph, handled in the same way as builtin operators Differential Revision: [D17405686](https://our.internmc.facebook.com/intern/diff/D17405686/) [ghstack-poisoned]
…ables Pull Request resolved: #26314 make python udf serialization format to be binary plus tensor tables, so that tensors can be attached to autograd graph, handled in the same way as builtin operators ghstack-source-id: 90843251 Differential Revision: [D17405686](https://our.internmc.facebook.com/intern/diff/D17405686/)
| _thread_local_tensor_tables.recv_tables = [] | ||
| _thread_local_tensor_tables.send_tables = [] | ||
|
|
||
| # This class provides serialize() and deserialize() interfaces to serialize |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use python docstrings for the class and member functions instead of multiple lines starting with '#'.
| if sys.version_info >= (3, 0): | ||
| _dispatch_table = copyreg.dispatch_table.copy() | ||
| _dispatch_table[torch.Tensor] = tensor_reducer | ||
| # Serialize non tensor data into binary string, tensor data into |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
python docstring
| # tables, this serialization format is consistent with builtin operator and args | ||
| # using JIT pickler. This format will make tensor handling in C++ much easier, | ||
| # e.g. attach tensor to distributed autograd graph in C++ | ||
| class InternalRPCPickler: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: make this _InternalRPCPickler to indicate its private.
| tensor_index = len(_thread_local_tensor_tables.send_tables) - 1 | ||
| return (_tensor_receiver, (tensor_index, )) | ||
|
|
||
| if sys.version_info >= (3, 0): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just find six.PY3 to be nicer since its much more concise that doing comparisons on sys.version_info.
pritamdamania87
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me! I have a few small comments inline, could you address those before landing? Also, looks like there are a few CI failures, lets make sure we check those before landing.
…us tensor tables" make python udf serialization format to be binary plus tensor tables, so that tensors can be attached to autograd graph, handled in the same way as builtin operators Differential Revision: [D17405686](https://our.internmc.facebook.com/intern/diff/D17405686/) [ghstack-poisoned]
…us tensor tables" make python udf serialization format to be binary plus tensor tables, so that tensors can be attached to autograd graph, handled in the same way as builtin operators Differential Revision: [D17405686](https://our.internmc.facebook.com/intern/diff/D17405686/) [ghstack-poisoned]
…ables Pull Request resolved: #26314 make python udf serialization format to be binary plus tensor tables, so that tensors can be attached to autograd graph, handled in the same way as builtin operators ghstack-source-id: 91035501 Differential Revision: [D17405686](https://our.internmc.facebook.com/intern/diff/D17405686/)
|
Hi @zhaojuanmao, as the implementation is finalized and approved, shall we drop the |
mrshenli
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thanks for adding this!
| # python2 does not have dispatch_table, add "if six.PY3" condition, | ||
| # as _InternalRPCPickler still got build in python2 even | ||
| # we skipped python 2 tests for rpc_test | ||
| if six.PY3: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: IIUC, OSS does not build Python files. Do you see errors in internal tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see errors in OSS tests
| import pickle | ||
| import traceback | ||
| import threading | ||
| import six |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: sort imports in ascii order
Pull Request resolved: #26314 make python udf serialization format to be binary plus tensor tables, so that tensors can be attached to autograd graph, handled in the same way as builtin operators ghstack-source-id: 91086305 Differential Revision: [D17405686](https://our.internmc.facebook.com/intern/diff/D17405686/)
|
that is weird, why the PR status is "Merged", I just run ghexport |
Pull Request resolved: #26314 make python udf serialization format to be binary plus tensor tables, so that tensors can be attached to autograd graph, handled in the same way as builtin operators ghstack-source-id: 91097229 Differential Revision: [D17405686](https://our.internmc.facebook.com/intern/diff/D17405686/)
|
since this PR can not be merged to trunk somehow, I will export another PR |
|
Hi @ezyang, landing hits merge conflicts on the corresponding, but the OSS PR was closed. Is this behavior expected? |
Stack from ghstack:
make python udf serialization format to be binary plus tensor tables, so that tensors can be attached to autograd graph, handled in the same way as builtin operators
Differential Revision: D17405686