Skip to content

Conversation

@zhaojuanmao
Copy link
Contributor

Summary:
This diff is to support python user defined function over rpc for #23110, work flow is like this:

  1. pickle python udf
  2. pass pickle to C++
  3. C++ pass over rpc from client to server
  4. server call runPythonUDF() python function to unpickle and run python udf and pickle the udf result using python embedder
  5. pass back serialized result from server to client
  6. client call loadPythonUDFResult() python function to unpickle result
  7. return it to python

right now, put rpc_sync_builtin() and rpc_async_builtin() as temporary interfaces for builtin operator remote calls, they accept qualified name string, this interface can execute builtin operators in C++ land.

rpc_sync() and rpc_async() accept python callables only right now, it could be user define python functions or builtin operator python functions, the python functions will be executed in python land.

once we can resolve builtin operator python callables to qualified name string, we can merge rpc_sync_builtin() into rpc_sync() then

Differential Revision: D16390764

@pytorchbot pytorchbot added caffe2 module: build Build system issues oncall: distributed Add this issue/PR to distributed oncall triage queue module: internals Related to internal abstractions in c10 and ATen module: tests Issues related to tests (not the torch.testing module) labels Jul 30, 2019
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we'd better assert here.

assert _agent is not None, "init_rpc(..) has not been called to setup rpc_agent yet."

Otherwise, the error message could look like

Traceback (most recent call last):
  File "/usr/local/fbcode/platform007/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/usr/local/fbcode/platform007/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "/data/users/shihaoxu/fbsource/fbcode/buck-out/dev/gen/caffe2/torch/fb/modelparallel/prototype/pytorch/tests/test_rpc#binary,link
-tree/caffe2/torch/fb/modelparallel/prototype/pytorch/rpc.py", line 1057, in _peer_ping
    _low_level_rpc_no_result(_rpc_ping, inputs=[], worker_id=i)
  File "/data/users/shihaoxu/fbsource/fbcode/buck-out/dev/gen/caffe2/torch/fb/modelparallel/prototype/pytorch/tests/test_rpc#binary,link
-tree/caffe2/torch/fb/modelparallel/prototype/pytorch/rpc.py", line 484, in _low_level_rpc_no_result
    _call_rpc(RPCRequest(func, inputs, kwargs), worker_id)
  File "/data/users/shihaoxu/fbsource/fbcode/buck-out/dev/gen/caffe2/torch/fb/modelparallel/prototype/pytorch/tests/test_rpc#binary,link
-tree/caffe2/torch/fb/modelparallel/prototype/pytorch/rpc.py", line 472, in _call_rpc
    dst_name=WorkerCtx.instance().name_for_id(worker_id),
  File "/data/users/shihaoxu/fbsource/fbcode/buck-out/dev/gen/caffe2/torch/fb/modelparallel/prototype/pytorch/tests/test_rpc#binary,link
-tree/caffe2/torch/fb/modelparallel/prototype/pytorch/comm.py", line 131, in send_callable
    dist.rpc_async(dst_name, _run_request, obj)
  File "/data/users/shihaoxu/fbsource/fbcode/buck-out/dev/gen/caffe2/torch/fb/modelparallel/prototype/pytorch/tests/test_rpc#binary,link
-tree/torch/distributed/rpc.py", line 95, in rpc_async
    return invoke_rpc_python_udf(_agent, to, serialize(PythonUDF(func, args, kwargs)))
TypeError: invoke_rpc_python_udf(): incompatible function arguments. The following argument types are supported:
    1. (arg0: torch.distributed.RpcAgent, arg1: str, arg2: str) -> torch.distributed.FutureMessage

Invoked with: None, 'w:0', b'\x80\x03ctorch.distributed.internal_rpc_utils\nPythonUDF\nq\x00ccaffe2.torch.fb.modelparallel.prototype.pyt
orch.comm\n_run_request\nq\x01ccaffe2.torch.fb.modelparallel.prototype.pytorch.rpc\nRPCRequest\nq\x02)\x81q\x03}q\x04(X\x04\x00\x00\x00f
uncq\x05ccaffe2.torch.fb.modelparallel.prototype.pytorch.rpc\n_rpc_ping\nq\x06X\x06\x00\x00\x00callerq\x07X\x03\x00\x00\x00w:1q\x08X\x06
\x00\x00\x00outputq\tNX\x06\x00\x00\x00inputsq\n]q\x0bX\x06\x00\x00\x00kwargsq\x0cNX\x0c\x00\x00\x00client_scopeq\rccaffe2.torch.fb.mode
lparallel.prototype.pytorch.rpc\nRRefId\nq\x0eK\x01K\x00\x86q\x0f\x81q\x10ub}q\x11\x87q\x12\x81q\x13.'
Traceback (most recent call last):

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in which case _agent is None? maybe we should fix that first

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been renamed.

MessageType::PYTHON_UDF_OP -> MessageType::PYTHON_UDF_OP

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MessageType:: PYTHON_UDF_RET -> MessageType:: PYTHON_RET

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to discuss more about this line.

It looks to me that our user starts with launching a Python interpreter (a C program) to interpret a piece of Python code. This Python interpreter, loads a pybind11 module, torch.distributed, that bundles several C++ classes.
We use this special binding module, backed by C++ classes, to instantiate a RpcAgent C++ instance, and assigned it to a variable torch.distributed.rpc._agent.
While, in the constructor of RpcAgent, PythonRpcHandler::init() is called and creates a second Python interpreter ?? (If I understand the doc correctly)
This second Python interpreter imports the module, torch.distributed.internal_rpc_utils, and use the utilities in it.
Although it can successfully run the function sent by the client and return correct results back to the client.
It's impossible for the client to change the global state in the server, which is hold by the first Python interpreter.


There is a requirement in our RRef prototype implementation that we need to allow an RPC client (an RRef user), to dictate the RPC server (RRef owner) to change it's global state.
The state is stored in a module's global variable, which lives in the original Python Interpreter.

For example,

  1. A client could ask the server to fetch an RRef instance that matches the ref_id in the message, from the server's global RRef registry.
  2. A client could send termination signal message to a server, the server, on receiving the signal, should set it's status to termination_ongoing=True, so that it will delete the remote references it holds and notifies the owners about reducing ref counts before it goes away.

I read this doc, https://docs.python.org/3/extending/index.html#extending-index

It looks to me there are 2 ways for C/C++ to interact with Python.

  1. Write modules in C or C++ to extend the Python interpreter with new modules.

  2. Sometimes, rather than creating an extension that runs inside the Python interpreter as the main application, it is desirable to instead embed the CPython runtime inside a larger application.

Looks to me Py_Initialize() belongs to the interface for the seconds case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we only call Py_Initialize() once to import interpreter, as init() only is called once...? although we can enforce once as well, but I do not understand why it is called twice?

also could you please share what kind of error you encountered for the integration? we can sync up offline

Copy link
Contributor

@xush6528 xush6528 Aug 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhaojuanmao
NVM. There is a call to create a new Interpreter.
https://docs.python.org/3/c-api/init.html#c.Py_NewInterpreter

So I think Py_Initialize ensures singleton and initializes only once.

Copy link
Contributor

@xush6528 xush6528 Aug 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a flag to ensure we will call Py_Finalize() only if Py_initialize() is called in embedder

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this into the destructor of ProcessGroupAgent.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More generally, the lifetime of PythonRpcHandler's "static" variables actually seems directly tied to ProcessGroupAgent lifetime. If this is true, can we just make PythonRpcHandler a member of ProcessGroupAgent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that was initial thought, but turn out we need to call interface of PythonRpcHandler when we get result from future. At that point, future does not have processGroupAgent.

Copy link
Contributor

@xush6528 xush6528 Aug 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could remove this line, Py_Initialize();.

As documentation says,

This initializes the table of loaded modules (sys.modules), and creates the fundamental modules builtins, main and sys. It also initializes the module search path (sys.path). It does not set sys.argv;

Since the code has run to here, the interpreter system-level modules must have already been initialized, so no need to call initialize again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we still need this if the application is C++ application and no python environment. added to check Py_IsInitialized() or not first, to ensure Py_Initialize() is called only once

test/test_rpc.py Outdated
Copy link
Contributor

@xush6528 xush6528 Aug 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be flaky, in worst case, a worker could have sent out it's request, but hasn't received a request yet. In that case, the global var it owns is not changed.

This is the code to simulate the worst case.

        dstRank = n % self.world_size
        import time; time.sleep(self.rank)
        ret = dist.rpc_sync('worker%d' % dstRank, modify_global_var, args=(True,))
        self.assertEqual(global_var, True)

Fix:

        dstRank = n % self.world_size
        ret = dist.rpc_sync('worker%d' % dstRank, modify_global_var, args=(True,))
        dist.barrier()  # After this barrier, we know that rank[i] has processed the request to modify the global var from rank[i - 1]. 
        self.assertEqual(global_var, True)

image

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking more about this. It seems neither dist.barrer() nor the existing sync_rpc can guarantee a fix on this problem. Because they only enforces all sends are done, but there is no guarantee that all received messages are processed.

This can be fixed when we switch to ThreadPool for recv and let sync_rpc block until both send and recv are done. But the question is what should we wait for if received messages trigger subsequent sends (and those sends can trigger additional sends).

I am thinking about the just doing following:

  1. block send queue, and wait till all sends are done.
  2. block recv queue, and wait till all recvs are done.

This means that we at least wait until existing sends are done and they are processed on the callee side (sufficient for this test). And we are not going to wait for additional send/recv triggered by existing sends. Does this make sense? @xush6528 @zhaojuanmao

(this is not a change request for this PR, just discussion)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In terms of changes, if we all agree that dist.barrier does not guarantee to fix the problem, shall we disable the test for now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, let me disable the test for now

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Under what circumstances will we be sending Python UDFs to remote processes that aren't already running Python? Does it simplify matters if you can assume that Python workers always start up from Python?

Copy link
Contributor

@ezyang ezyang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Beyond some of the smaller requests, at a larger level I'd like some clarity on the lifetime of PythonRpcHandler object, and the relationship between C++ only execution versus C++ and Python execution.

Sorry about the late review; release was this week and I kept putting this PR review off XD

@zhaojuanmao
Copy link
Contributor Author

Thanks for your comments @ezyang!

  1. for PythonRpcHandler, we do not want it to have same lifetime as processGroupAgent, because some interface like "loadPythonUDFResult()" need to be called without processGroupAgent. Ideally all interfaces except init() of PythonRpcHandler should be able to be called everywhere. I think it makes sense to make it as singleton class or just make it as namespace + global functions/variables. How do you think?

  2. "the relationship between C++ only execution versus C++ and Python execution." -- would you please clarify it more? I do not quite understand it :(.

built in op can be executed remotely in C++ purely, because JIT support so.
for pure python function, we have to pickle it and unpickle it on python land, and execute it on python land as well. that is why C++ and python execution are mixed

@gqchen gqchen self-requested a review August 9, 2019 22:48
@zhaojuanmao zhaojuanmao requested a review from ezyang August 9, 2019 22:54
@zhaojuanmao
Copy link
Contributor Author

Thanks everyone for giving such great feedback!

I resolved most of comments including switch c python API to pybind11 API, and others.

Would like to address the comment about making binary format consistent btw python UDF and builtin operator in follow up PR.

Copy link
Contributor

@mrshenli mrshenli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding Python UDF support for RPC!

Approving, but please consider performing the following two actions before merging:

  1. Check with @gqchen to see if there is any additional blocking change requests.
  2. Make sure this works under OSX. I am not sure if CI tests for MaxOS here are sufficient. #23228 run into this problem previously.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(This does not need to be fixed in this PR)

IIUC, this is trying to address @ezyang's comments regarding PythonRpcHandler's lifetime. It seems to me this needs to be wrapped by some macro, because we might want to run pure C++ RPC in the future, where this should not depend on Python.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Does not need to be addressed in this PR)

When coming up with file names, I used camel for class files and snake for utility files, and then order all files here based on ascii order. This strategy was learnt as mix from cuda and c10d folders. @pietern @ezyang Should we all stay with snake naming for files? Do we have a convention for this?

(pointed out by @satgera )This probably is my fault in the first place, and I will submit a fix for it in a separate PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd aim for self-consistency, and the prevailing convention in torch/csrc is snake case.

@pritamdamania87
Copy link
Contributor

Yep, that's not a problem specific for this PR. And I think can be fixed in follow PRs. It's a good point to add tests for kwargs, and I think this can also be done in followup PRs?

Can we create github issues for things to address in followup PRs? This way we wouldn't lose track of these things.

@mrshenli
Copy link
Contributor

Can we create github issues for things to address in followup PRs? This way we wouldn't lose track of these things.

Yep, see #24252 #24249 and #24247

@zhaojuanmao
Copy link
Contributor Author

the windows build failure is not related, all other checks are clean. macos build locally, import torch and they are fine

Copy link
Contributor

@facebook-github-bot facebook-github-bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhaojuanmao is landing this pull request. If you are a Facebook employee, you can view this diff on Phabricator.

Summary:
This diff is to support python user defined function over rpc for pytorch#23110, work flow is like this:
1. pickle python udf
2. pass pickle to C++
3. C++ pass over rpc from client to server
4. server call runPythonUDF() python function to unpickle and run python udf and pickle the udf result using python embedder
6. pass back serialized result from server to client
7. client call loadPythonUDFResult() python function to unpickle result
7. return it to python

right now, put rpc_sync_builtin() and rpc_async_builtin() as temporary interfaces for builtin operator remote calls, they accept qualified name string, this interface can execute builtin operators in C++ land.

rpc_sync() and rpc_async() accept python callables only right now, it could be user define python functions or builtin operator python functions, the python functions will be executed in python land.

once we can resolve builtin operator python callables to qualified name string, we can merge rpc_sync_builtin() into rpc_sync() then
Pull Request resolved: pytorch#23569

Test Plan: unit tests

Differential Revision: D16390764

fbshipit-source-id: 9373024280d08bc953391464f221e8ab65e9ba10
@zhaojuanmao zhaojuanmao deleted the export-D16390764 branch August 28, 2019 07:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

caffe2 module: build Build system issues module: internals Related to internal abstractions in c10 and ATen module: pybind Related to our Python bindings / interactions with other Python libraries module: tests Issues related to tests (not the torch.testing module) oncall: distributed Add this issue/PR to distributed oncall triage queue

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants