[c10d] Make allreduce as a custom op#79582
[c10d] Make allreduce as a custom op#79582alanwaketan wants to merge 3 commits intogh/alanwaketan/34/basefrom
Conversation
Summary: This patch makes allreduce as a custom op such that it's dispatcher passable. It's one part of the effort to route comm ops to the dispatcher such that tracing mechanisms that relies on the dispatcher can trace them, e.g., LazyTensor and AOTAutograd. Test Plan: python test/distributed/test_c10d_nccl.py -k test_allreduce_ops python test/distributed/test_c10d_gloo.py -k test_allreduce_basics ...and other existing distributed tests. [ghstack-poisoned]
🔗 Helpful links
❌ 1 New FailuresAs of commit 771024f (more details on the Dr. CI page): Expand to see more
🕵️ 1 new failure recognized by patternsThe following CI failures do not appear to be due to upstream breakages
|
Summary: This patch makes allreduce as a custom op such that it's dispatcher passable. It's one part of the effort to route comm ops to the dispatcher such that tracing mechanisms that relies on the dispatcher can trace them, e.g., LazyTensor and AOTAutograd. Test Plan: python test/distributed/test_c10d_nccl.py -k test_allreduce_ops python test/distributed/test_c10d_gloo.py -k test_allreduce_basics ...and other existing distributed tests. [ghstack-poisoned]
Summary: This patch makes allreduce as a custom op such that it's dispatcher passable. It's one part of the effort to route comm ops to the dispatcher such that tracing mechanisms that relies on the dispatcher can trace them, e.g., LazyTensor and AOTAutograd. Test Plan: python test/distributed/test_c10d_nccl.py -k test_allreduce_ops python test/distributed/test_c10d_gloo.py -k test_allreduce_basics ...and other existing distributed tests. ghstack-source-id: 1b1d2c9 Pull Request resolved: #79582
| auto allreduce_fut = | ||
| ops::allreduce( | ||
| c10::intrusive_ptr<ProcessGroup>::unsafe_reclaim_from_nonowning( | ||
| state_), |
There was a problem hiding this comment.
I'm trying to convert a raw pointer to an intrusive_ptr. Is this the way to do so?
There was a problem hiding this comment.
curious, who owns this PG instance? I assume it is owned by Python PG object? If that's the case, will this mess up the refcnt. What happens when this tmp intrusive ptr exits scope?
There was a problem hiding this comment.
I think PG are normally owned by a Python object. AllReduceCommHook somehow holds a ProcessGroup* instead of intrusive_ptr. Therefore, I need to convert the raw pointer to a intrusive_ptr.
I don't believe this will mess up the refcnt. However, I actually think it's better to replace class AllReduceCommHook : public CppCommHookInterface<ProcessGroup*> with class AllReduceCommHook : public CppCommHookInterface<intrusive_ptr<ProcessGroup>>. What do you think?
There was a problem hiding this comment.
I don't believe this will mess up the refcnt.
If we create an intrusive ptr from the raw ptr, does this mean we have two separate entities tracking refcnt for the same raw ptr separately? One is the Python object, and another is this intrusive ptr?
I actually think it's better to replace class AllReduceCommHook : public CppCommHookInterface<ProcessGroup*> with class AllReduceCommHook : public CppCommHookInterface<intrusive_ptr>. What do you think?
Yep, this does sounds better to me.
There was a problem hiding this comment.
I think for intrusive_ptr the refcnt is stored in the object (ProcessGroup) itself. Intrusive_ptr is just a way to increment/decrement the refcnt. So it shouldn't matter.
Let me make a follow up patch on changing class AllReduceCommHook : public CppCommHookInterface<ProcessGroup*> to class AllReduceCommHook : public CppCommHookInterface<intrusive_ptr<ProcessGroup>>.
There was a problem hiding this comment.
Got it. Can we also add a comment for this in the code? Thank you!
wanchaol
left a comment
There was a problem hiding this comment.
I guess I can add tests to have a python tensor that override torch_dispatch to directly verify that.
I see, I guess we can add those tests later in a separate PR when we necessarily need it. There's two things on top of my head and need some inputs from @mrshenli, as these might be related to the actual node appears in the IR, we should get some clarify and make them consistent:
- about operator suffix and argument ordering: should we make the aten operator follow our python level API, or should we follow the ATen operator naming convention?
- should we let
wait()appear in the IR? This might be related to how the cuda stream sync works in our current tracer.
| root_rank, root_tensor, std::chrono::milliseconds(timeout)}); | ||
| } | ||
|
|
||
| c10::intrusive_ptr<ProcessGroup::Work> allreduce_( |
There was a problem hiding this comment.
Got it, thanks! One thing that captured my eyes about this TorchBind Work object, does not have methods like wait() binded, I guess this is fine initially as this PR is more about making it a dispatcher level op.
But I am wondering how this would be look like in our traced IR, should we have the wait() in the graph? how does the traced graph look like if we need async execution on a different cuda stream where user usually need to manually wait for stream? cc @mrshenli
I think we should follow the aten convention for the funtion schema as that will be easier for any tracer to interpret the ops. At least AOT would assume the aten convention.
Please see my other comments for the short term solution. Long term wise, yes we need a way to represent cuda streams in the graph. We don't know how yet. |
wanchaol
left a comment
There was a problem hiding this comment.
Looks good to me, looks like the CI failure is real:
Broken ops: [
c10d::broadcast(__torch__.torch.classes.c10d.ProcessGroup _0, Tensor[] _1, int _2, int _3, int _4) -> __torch__.torch.classes.c10d.Work _0
]
Could you fix the CI issue before landing? Thanks!
Thanks, Wanchao. I believe it's intended to break the schema. Do you know how to update the test expectation of the backward_compat test? |
Summary: This patch makes allreduce as a custom op such that it's dispatcher passable. It's one part of the effort to route comm ops to the dispatcher such that tracing mechanisms that relies on the dispatcher can trace them, e.g., LazyTensor and AOTAutograd. Test Plan: python test/distributed/test_c10d_nccl.py -k test_allreduce_ops python test/distributed/test_c10d_gloo.py -k test_allreduce_basics ...and other existing distributed tests. [ghstack-poisoned]
Summary: This patch makes allreduce as a custom op such that it's dispatcher passable. It's one part of the effort to route comm ops to the dispatcher such that tracing mechanisms that relies on the dispatcher can trace them, e.g., LazyTensor and AOTAutograd. Test Plan: python test/distributed/test_c10d_nccl.py -k test_allreduce_ops python test/distributed/test_c10d_gloo.py -k test_allreduce_basics ...and other existing distributed tests. ghstack-source-id: 220759b Pull Request resolved: #79582
| ("aten::segment_reduce", datetime.date(2022, 6, 30)), | ||
| ("aten::_segment_reduce_backward", datetime.date(2022, 6, 30)), | ||
| ("aten::empty.SymInt", datetime.date(9999, 1, 1)), | ||
| ("c10d::broadcast", datetime.date(2022, 6, 25)), |
There was a problem hiding this comment.
yeah this is the correct way :) although i am not sure which namespace broadcast got binded to, looks like it's c10d and we can see if this get it passed
|
The XLA failure doesn't seem to be related. |
|
@pytorchbot merge -f |
|
@pytorchbot successfully started a merge job. Check the current status here |
|
Hey @alanwaketan. |
Summary: This patch makes allreduce as a custom op such that it's dispatcher passable. It's one part of the effort to route comm ops to the dispatcher such that tracing mechanisms that relies on the dispatcher can trace them, e.g., LazyTensor and AOTAutograd. Pull Request resolved: #79582 Approved by: https://github.com/wanchaol Test Plan: contbuild & OSS CI, see https://hud.pytorch.org/commit/pytorch/pytorch/e5841bafbd2868eaf6eb7b89b4caf3a6261dcfa6 Test plan from GitHub: python test/distributed/test_c10d_nccl.py -k test_allreduce_ops python test/distributed/test_c10d_gloo.py -k test_allreduce_basics ...and other existing distributed tests. Reviewed By: atalman Differential Revision: D37382098 Pulled By: alanwaketan fbshipit-source-id: 068fd6d8f2c3fa3998431dcf878e14bd41890693
Can this be represented as edges in the graph? |
I think we need more discussions on this. Let me try to organize a follow up meeting. |
Signed-off-by: Masaki Kozuki <[email protected]> Co-authored-by: ptrblck <[email protected]> Co-authored-by: Michael Carilli <[email protected]> Patch for pytorch#79582 Apparently 79852 is newer than 34 and the commit below so the PR assumes `ReduceOp` to be an `enum`, not a `struct` including an `enum` inside it.
Signed-off-by: Masaki Kozuki <[email protected]> Co-authored-by: ptrblck <[email protected]> Co-authored-by: Michael Carilli <[email protected]> Patch for pytorch#79582 Apparently 79852 is newer than 34 and the commit below so the PR assumes `ReduceOp` to be an `enum`, not a `struct` including an `enum` inside it.
Stack from ghstack (oldest at bottom):
Summary:
This patch makes allreduce as a custom op such that it's dispatcher
passable. It's one part of the effort to route comm ops to the dispatcher
such that tracing mechanisms that relies on the dispatcher can trace them,
e.g., LazyTensor and AOTAutograd.
Test Plan:
python test/distributed/test_c10d_nccl.py -k test_allreduce_ops
python test/distributed/test_c10d_gloo.py -k test_allreduce_basics
...and other existing distributed tests.