-
Notifications
You must be signed in to change notification settings - Fork 26.3k
python udf over rpc #23569
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
python udf over rpc #23569
Conversation
e27639c to
fc8d69a
Compare
fc8d69a to
d5d498f
Compare
torch/distributed/rpc.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we'd better assert here.
assert _agent is not None, "init_rpc(..) has not been called to setup rpc_agent yet."Otherwise, the error message could look like
Traceback (most recent call last):
File "/usr/local/fbcode/platform007/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/usr/local/fbcode/platform007/lib/python3.6/threading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File "/data/users/shihaoxu/fbsource/fbcode/buck-out/dev/gen/caffe2/torch/fb/modelparallel/prototype/pytorch/tests/test_rpc#binary,link
-tree/caffe2/torch/fb/modelparallel/prototype/pytorch/rpc.py", line 1057, in _peer_ping
_low_level_rpc_no_result(_rpc_ping, inputs=[], worker_id=i)
File "/data/users/shihaoxu/fbsource/fbcode/buck-out/dev/gen/caffe2/torch/fb/modelparallel/prototype/pytorch/tests/test_rpc#binary,link
-tree/caffe2/torch/fb/modelparallel/prototype/pytorch/rpc.py", line 484, in _low_level_rpc_no_result
_call_rpc(RPCRequest(func, inputs, kwargs), worker_id)
File "/data/users/shihaoxu/fbsource/fbcode/buck-out/dev/gen/caffe2/torch/fb/modelparallel/prototype/pytorch/tests/test_rpc#binary,link
-tree/caffe2/torch/fb/modelparallel/prototype/pytorch/rpc.py", line 472, in _call_rpc
dst_name=WorkerCtx.instance().name_for_id(worker_id),
File "/data/users/shihaoxu/fbsource/fbcode/buck-out/dev/gen/caffe2/torch/fb/modelparallel/prototype/pytorch/tests/test_rpc#binary,link
-tree/caffe2/torch/fb/modelparallel/prototype/pytorch/comm.py", line 131, in send_callable
dist.rpc_async(dst_name, _run_request, obj)
File "/data/users/shihaoxu/fbsource/fbcode/buck-out/dev/gen/caffe2/torch/fb/modelparallel/prototype/pytorch/tests/test_rpc#binary,link
-tree/torch/distributed/rpc.py", line 95, in rpc_async
return invoke_rpc_python_udf(_agent, to, serialize(PythonUDF(func, args, kwargs)))
TypeError: invoke_rpc_python_udf(): incompatible function arguments. The following argument types are supported:
1. (arg0: torch.distributed.RpcAgent, arg1: str, arg2: str) -> torch.distributed.FutureMessage
Invoked with: None, 'w:0', b'\x80\x03ctorch.distributed.internal_rpc_utils\nPythonUDF\nq\x00ccaffe2.torch.fb.modelparallel.prototype.pyt
orch.comm\n_run_request\nq\x01ccaffe2.torch.fb.modelparallel.prototype.pytorch.rpc\nRPCRequest\nq\x02)\x81q\x03}q\x04(X\x04\x00\x00\x00f
uncq\x05ccaffe2.torch.fb.modelparallel.prototype.pytorch.rpc\n_rpc_ping\nq\x06X\x06\x00\x00\x00callerq\x07X\x03\x00\x00\x00w:1q\x08X\x06
\x00\x00\x00outputq\tNX\x06\x00\x00\x00inputsq\n]q\x0bX\x06\x00\x00\x00kwargsq\x0cNX\x0c\x00\x00\x00client_scopeq\rccaffe2.torch.fb.mode
lparallel.prototype.pytorch.rpc\nRRefId\nq\x0eK\x01K\x00\x86q\x0f\x81q\x10ub}q\x11\x87q\x12\x81q\x13.'
Traceback (most recent call last):There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in which case _agent is None? maybe we should fix that first
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has been renamed.
MessageType::PYTHON_UDF_OP -> MessageType::PYTHON_UDF_OP
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MessageType:: PYTHON_UDF_RET -> MessageType:: PYTHON_RET
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to discuss more about this line.
It looks to me that our user starts with launching a Python interpreter (a C program) to interpret a piece of Python code. This Python interpreter, loads a pybind11 module, torch.distributed, that bundles several C++ classes.
We use this special binding module, backed by C++ classes, to instantiate a RpcAgent C++ instance, and assigned it to a variable torch.distributed.rpc._agent.
While, in the constructor of RpcAgent, PythonRpcHandler::init() is called and creates a second Python interpreter ?? (If I understand the doc correctly)
This second Python interpreter imports the module, torch.distributed.internal_rpc_utils, and use the utilities in it.
Although it can successfully run the function sent by the client and return correct results back to the client.
It's impossible for the client to change the global state in the server, which is hold by the first Python interpreter.
There is a requirement in our RRef prototype implementation that we need to allow an RPC client (an RRef user), to dictate the RPC server (RRef owner) to change it's global state.
The state is stored in a module's global variable, which lives in the original Python Interpreter.
For example,
- A client could ask the server to fetch an RRef instance that matches the
ref_idin the message, from the server's global RRef registry. - A client could send termination signal message to a server, the server, on receiving the signal, should set it's status to
termination_ongoing=True, so that it will delete the remote references it holds and notifies the owners about reducing ref counts before it goes away.
I read this doc, https://docs.python.org/3/extending/index.html#extending-index
It looks to me there are 2 ways for C/C++ to interact with Python.
-
Write modules in C or C++ to extend the Python interpreter with new modules.
-
Sometimes, rather than creating an extension that runs inside the Python interpreter as the main application, it is desirable to instead embed the CPython runtime inside a larger application.
Looks to me Py_Initialize() belongs to the interface for the seconds case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought we only call Py_Initialize() once to import interpreter, as init() only is called once...? although we can enforce once as well, but I do not understand why it is called twice?
also could you please share what kind of error you encountered for the integration? we can sync up offline
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhaojuanmao
NVM. There is a call to create a new Interpreter.
https://docs.python.org/3/c-api/init.html#c.Py_NewInterpreter
So I think Py_Initialize ensures singleton and initializes only once.
d5d498f to
b45d978
Compare
b45d978 to
3f77936
Compare
3f77936 to
f7bd37f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This "kill"s Python interpreter and fails unit tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a flag to ensure we will call Py_Finalize() only if Py_initialize() is called in embedder
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move this into the destructor of ProcessGroupAgent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More generally, the lifetime of PythonRpcHandler's "static" variables actually seems directly tied to ProcessGroupAgent lifetime. If this is true, can we just make PythonRpcHandler a member of ProcessGroupAgent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that was initial thought, but turn out we need to call interface of PythonRpcHandler when we get result from future. At that point, future does not have processGroupAgent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could remove this line, Py_Initialize();.
As documentation says,
This initializes the table of loaded modules (sys.modules), and creates the fundamental modules builtins, main and sys. It also initializes the module search path (sys.path). It does not set sys.argv;
Since the code has run to here, the interpreter system-level modules must have already been initialized, so no need to call initialize again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we still need this if the application is C++ application and no python environment. added to check Py_IsInitialized() or not first, to ensure Py_Initialize() is called only once
f7bd37f to
c85b516
Compare
test/test_rpc.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be flaky, in worst case, a worker could have sent out it's request, but hasn't received a request yet. In that case, the global var it owns is not changed.
This is the code to simulate the worst case.
dstRank = n % self.world_size
import time; time.sleep(self.rank)
ret = dist.rpc_sync('worker%d' % dstRank, modify_global_var, args=(True,))
self.assertEqual(global_var, True)Fix:
dstRank = n % self.world_size
ret = dist.rpc_sync('worker%d' % dstRank, modify_global_var, args=(True,))
dist.barrier() # After this barrier, we know that rank[i] has processed the request to modify the global var from rank[i - 1].
self.assertEqual(global_var, True)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking more about this. It seems neither dist.barrer() nor the existing sync_rpc can guarantee a fix on this problem. Because they only enforces all sends are done, but there is no guarantee that all received messages are processed.
This can be fixed when we switch to ThreadPool for recv and let sync_rpc block until both send and recv are done. But the question is what should we wait for if received messages trigger subsequent sends (and those sends can trigger additional sends).
I am thinking about the just doing following:
- block send queue, and wait till all sends are done.
- block recv queue, and wait till all recvs are done.
This means that we at least wait until existing sends are done and they are processed on the callee side (sufficient for this test). And we are not going to wait for additional send/recv triggered by existing sends. Does this make sense? @xush6528 @zhaojuanmao
(this is not a change request for this PR, just discussion)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In terms of changes, if we all agree that dist.barrier does not guarantee to fix the problem, shall we disable the test for now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good, let me disable the test for now
c85b516 to
933712d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Under what circumstances will we be sending Python UDFs to remote processes that aren't already running Python? Does it simplify matters if you can assume that Python workers always start up from Python?
ezyang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Beyond some of the smaller requests, at a larger level I'd like some clarity on the lifetime of PythonRpcHandler object, and the relationship between C++ only execution versus C++ and Python execution.
Sorry about the late review; release was this week and I kept putting this PR review off XD
|
Thanks for your comments @ezyang!
built in op can be executed remotely in C++ purely, because JIT support so. |
80a8f0b to
6d05028
Compare
|
Thanks everyone for giving such great feedback! I resolved most of comments including switch c python API to pybind11 API, and others. Would like to address the comment about making binary format consistent btw python UDF and builtin operator in follow up PR. |
mrshenli
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding Python UDF support for RPC!
Approving, but please consider performing the following two actions before merging:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(This does not need to be fixed in this PR)
IIUC, this is trying to address @ezyang's comments regarding PythonRpcHandler's lifetime. It seems to me this needs to be wrapped by some macro, because we might want to run pure C++ RPC in the future, where this should not depend on Python.
torch/CMakeLists.txt
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Does not need to be addressed in this PR)
When coming up with file names, I used camel for class files and snake for utility files, and then order all files here based on ascii order. This strategy was learnt as mix from cuda and c10d folders. @pietern @ezyang Should we all stay with snake naming for files? Do we have a convention for this?
(pointed out by @satgera )This probably is my fault in the first place, and I will submit a fix for it in a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd aim for self-consistency, and the prevailing convention in torch/csrc is snake case.
Can we create github issues for things to address in followup PRs? This way we wouldn't lose track of these things. |
6d05028 to
c347426
Compare
c347426 to
d9832df
Compare
|
the windows build failure is not related, all other checks are clean. macos build locally, import torch and they are fine |
facebook-github-bot
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhaojuanmao is landing this pull request. If you are a Facebook employee, you can view this diff on Phabricator.
d9832df to
6f02e54
Compare
Summary: This diff is to support python user defined function over rpc for pytorch#23110, work flow is like this: 1. pickle python udf 2. pass pickle to C++ 3. C++ pass over rpc from client to server 4. server call runPythonUDF() python function to unpickle and run python udf and pickle the udf result using python embedder 6. pass back serialized result from server to client 7. client call loadPythonUDFResult() python function to unpickle result 7. return it to python right now, put rpc_sync_builtin() and rpc_async_builtin() as temporary interfaces for builtin operator remote calls, they accept qualified name string, this interface can execute builtin operators in C++ land. rpc_sync() and rpc_async() accept python callables only right now, it could be user define python functions or builtin operator python functions, the python functions will be executed in python land. once we can resolve builtin operator python callables to qualified name string, we can merge rpc_sync_builtin() into rpc_sync() then Pull Request resolved: pytorch#23569 Test Plan: unit tests Differential Revision: D16390764 fbshipit-source-id: 9373024280d08bc953391464f221e8ab65e9ba10
6f02e54 to
694a4de
Compare

Summary:
This diff is to support python user defined function over rpc for #23110, work flow is like this:
right now, put rpc_sync_builtin() and rpc_async_builtin() as temporary interfaces for builtin operator remote calls, they accept qualified name string, this interface can execute builtin operators in C++ land.
rpc_sync() and rpc_async() accept python callables only right now, it could be user define python functions or builtin operator python functions, the python functions will be executed in python land.
once we can resolve builtin operator python callables to qualified name string, we can merge rpc_sync_builtin() into rpc_sync() then
Differential Revision: D16390764