Skip to content

Commit 46debe7

Browse files
kennyhorrorfacebook-github-bot
authored andcommitted
[DPER] Introduce barrier operation to force synchronization of threads in async execution (#49322)
Summary: Pull Request resolved: #49322 In some cases async execution might loose dependencies (Alias like ops) or produce suboptimal scheduling when there is an option which parts to schedule first. Example of the later behavior can happen in ModelParallel training where copy can get lower priority compared to the rest of the execution on the given GPU, which will caused other GPUs to starve. This operator allows to address these issues by introducing extra explicit dependencies between ops. Test Plan: Unit-test/ E2E testing in the future diffs. Reviewed By: xianjiec Differential Revision: D24933471 fbshipit-source-id: 1668994c7856d73926cde022378a99e1e8db3567
1 parent 7518f54 commit 46debe7

File tree

4 files changed

+119
-0
lines changed

4 files changed

+119
-0
lines changed
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
#include "caffe2/operators/async_net_barrier_op.h"
2+
3+
namespace caffe2 {
4+
5+
namespace {
6+
std::pair<std::vector<DeviceOption>, std::vector<DeviceOption>>
7+
asyncBarrierOpDevInfer(const OperatorDef& def) {
8+
auto op_device =
9+
def.has_device_option() ? def.device_option() : DeviceOption();
10+
ArgumentHelper helper(def);
11+
auto cross_device = helper.GetSingleArgument<int>("cross_device", 0);
12+
std::vector<DeviceOption> opt;
13+
for (int i = 0; i < def.input().size(); ++i) {
14+
if (cross_device == 1) {
15+
DeviceOption dev;
16+
dev.set_device_type(op_device.device_type());
17+
dev.set_device_id(i);
18+
opt.push_back(dev);
19+
} else {
20+
opt.push_back(op_device);
21+
}
22+
}
23+
return std::make_pair(opt, opt);
24+
}
25+
}
26+
27+
OPERATOR_SCHEMA(AsyncNetBarrier)
28+
.NumInputs(1, INT_MAX)
29+
.NumOutputs(1, INT_MAX)
30+
.IdenticalTypeAndShape()
31+
.InputsCanCrossDevices()
32+
.AllowOneToOneInplace()
33+
.DeviceInferenceFunction(asyncBarrierOpDevInfer)
34+
.SetDoc(R"DOC(
35+
This is a pretty much no-op operator, since it's only purposes is make sure that
36+
async_scheduling will schedule certian operations earlier than others.
37+
38+
Exaple where this operator can work well - mixture of data-parallel and model-
39+
parallel training, where one wants to force that all copies are started before
40+
data-parallel part starts.
41+
)DOC")
42+
.Arg(
43+
"cross_device",
44+
"Specifies either inputs should be across different devices in dev inference options");
45+
46+
SHOULD_NOT_DO_GRADIENT(AsyncNetBarrier);
47+
REGISTER_CPU_OPERATOR(AsyncNetBarrier, AsyncNetBarrierOp<CPUContext>);
48+
49+
50+
} // namespace caffe2
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
#include "caffe2/core/context_gpu.h"
2+
#include "caffe2/operators/async_net_barrier_op.h"
3+
4+
namespace caffe2 {
5+
6+
REGISTER_CUDA_OPERATOR(AsyncNetBarrier, AsyncNetBarrierOp<CUDAContext>);
7+
8+
} // namespace caffe2
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
#ifndef CAFFE2_OPERATORS_ASYNC_BARRIER_OP_H_
2+
#define CAFFE2_OPERATORS_ASYNC_BARRIER_OP_H_
3+
4+
#include "caffe2/core/context.h"
5+
#include "caffe2/core/export_caffe2_op_to_c10.h"
6+
#include "caffe2/core/operator.h"
7+
8+
namespace caffe2 {
9+
10+
template <class Context>
11+
class AsyncNetBarrierOp : public Operator<Context> {
12+
public:
13+
USE_OPERATOR_CONTEXT_FUNCTIONS;
14+
USE_SIMPLE_CTOR_DTOR(AsyncNetBarrierOp)
15+
16+
bool RunOnDevice() override {
17+
// This is a pretty much no-op operator, since it's only purposes is make
18+
// sure that async_scheduling will schedule certian operations earlier than
19+
// others.
20+
//
21+
// Exaple where this operator can work well - mixture of data-parallel and
22+
// model parallel training, where one wants to force that all copies are
23+
// started before data-parallel part starts.
24+
return true;
25+
}
26+
};
27+
28+
} // namespace caffe2
29+
30+
#endif // CAFFE2_OPERATORS_ASYNC_BARRIER_OP_H_
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
#!/usr/bin/env python3
2+
3+
import caffe2.python.hypothesis_test_util as hu
4+
import hypothesis.strategies as st
5+
import numpy as np
6+
from caffe2.python import core
7+
from hypothesis import given
8+
9+
10+
class TestAsyncNetBarrierOp(hu.HypothesisTestCase):
11+
@given(
12+
n=st.integers(1, 5),
13+
shape=st.lists(st.integers(0, 5), min_size=1, max_size=3),
14+
**hu.gcs
15+
)
16+
def test_async_net_barrier_op(self, n, shape, dc, gc):
17+
test_inputs = [(100 * np.random.random(shape)).astype(np.float32) for _ in range(n)]
18+
test_input_blobs = ["x_{}".format(i) for i in range(n)]
19+
20+
barrier_op = core.CreateOperator(
21+
"AsyncNetBarrier",
22+
test_input_blobs,
23+
test_input_blobs,
24+
device_option=gc,
25+
)
26+
27+
def reference_func(*args):
28+
self.assertEquals(len(args), n)
29+
return args
30+
31+
self.assertReferenceChecks(gc, barrier_op, test_inputs, reference_func)

0 commit comments

Comments
 (0)