Skip to content

Commit 54f4b5c

Browse files
authored
feat(common): cancel long-running operations (#6831)
This time for realsies, I need to return the future created by `AsyncPollingLoop()` because this is where the cancel callback is initialized.
1 parent ddb4e25 commit 54f4b5c

3 files changed

Lines changed: 108 additions & 64 deletions

File tree

google/cloud/internal/async_long_running_operation.h

Lines changed: 7 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -129,30 +129,13 @@ future<StatusOr<ReturnType>> AsyncLongRunningOperation(
129129
AsyncRetryLoop(std::move(retry_policy), std::move(backoff_policy),
130130
idempotent, cq, std::forward<StartFunctor>(start),
131131
std::forward<RequestType>(request), location);
132-
struct MoveCapture {
133-
google::cloud::CompletionQueue cq;
134-
AsyncPollLongRunningOperation poll;
135-
AsyncCancelLongRunningOperation cancel;
136-
LongRunningOperationValueExtractor<ReturnType> value_extractor;
137-
std::unique_ptr<PollingPolicy> polling_policy;
138-
std::string location;
139-
140-
future<StatusOr<ReturnType>> operator()(future<StatusOr<Operation>> f) {
141-
auto loc = this->location;
142-
auto extractor = std::move(value_extractor);
143-
return AsyncPollingLoop(std::move(cq), std::move(f), std::move(poll),
144-
std::move(cancel), std::move(polling_policy),
145-
std::move(location))
146-
.then([extractor, loc](future<StatusOr<Operation>> g) {
147-
return extractor(g.get(), loc);
148-
});
149-
}
150-
};
151-
152-
return operation.then(
153-
MoveCapture{std::move(cq), std::move(poll), std::move(cancel),
154-
std::move(value_extractor), std::move(polling_policy),
155-
std::string{location}});
132+
auto loc = std::string{location};
133+
return AsyncPollingLoop(std::move(cq), std::move(operation), std::move(poll),
134+
std::move(cancel), std::move(polling_policy),
135+
std::move(location))
136+
.then([value_extractor, loc](future<StatusOr<Operation>> g) {
137+
return value_extractor(g.get(), loc);
138+
});
156139
}
157140

158141
} // namespace internal

google/cloud/internal/async_long_running_operation_test.cc

Lines changed: 97 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
#include "google/cloud/internal/async_long_running_operation.h"
16+
#include "google/cloud/testing_util/async_sequencer.h"
1617
#include "google/cloud/testing_util/is_proto_equal.h"
1718
#include "google/cloud/testing_util/mock_completion_queue_impl.h"
1819
#include "google/cloud/testing_util/status_matchers.h"
@@ -27,10 +28,13 @@ namespace {
2728

2829
using ::google::bigtable::admin::v2::CreateInstanceRequest;
2930
using ::google::bigtable::admin::v2::Instance;
31+
using ::google::cloud::testing_util::AsyncSequencer;
3032
using ::google::cloud::testing_util::IsOk;
3133
using ::google::cloud::testing_util::IsProtoEqual;
3234
using ::google::cloud::testing_util::MockCompletionQueueImpl;
35+
using ::google::cloud::testing_util::StatusIs;
3336
using ::google::longrunning::Operation;
37+
using ::testing::AtLeast;
3438
using ::testing::Return;
3539

3640
class MockStub {
@@ -72,6 +76,32 @@ std::unique_ptr<BackoffPolicy> TestBackoffPolicy() {
7276
return ExponentialBackoffPolicy(us(100), us(100), 2.0).clone();
7377
}
7478

79+
using StartOperation =
80+
std::function<future<StatusOr<google::longrunning::Operation>>(
81+
CompletionQueue&, std::unique_ptr<grpc::ClientContext>,
82+
CreateInstanceRequest const&)>;
83+
84+
StartOperation MakeStart(std::shared_ptr<MockStub> const& m) {
85+
return [m](CompletionQueue& cq, std::unique_ptr<grpc::ClientContext> context,
86+
CreateInstanceRequest const& request) {
87+
return m->AsyncCreateInstance(cq, std::move(context), request);
88+
};
89+
}
90+
91+
AsyncPollLongRunningOperation MakePoll(std::shared_ptr<MockStub> const& m) {
92+
return [m](CompletionQueue& cq, std::unique_ptr<grpc::ClientContext> context,
93+
google::longrunning::GetOperationRequest const& request) {
94+
return m->AsyncGetOperation(cq, std::move(context), request);
95+
};
96+
}
97+
98+
AsyncCancelLongRunningOperation MakeCancel(std::shared_ptr<MockStub> const& m) {
99+
return [m](CompletionQueue& cq, std::unique_ptr<grpc::ClientContext> context,
100+
google::longrunning::CancelOperationRequest const& request) {
101+
return m->AsyncCancelOperation(cq, std::move(context), request);
102+
};
103+
}
104+
75105
TEST(AsyncLongRunningTest, RequestPollThenSuccessMetadata) {
76106
Instance expected;
77107
expected.set_name("test-instance-name");
@@ -110,25 +140,10 @@ TEST(AsyncLongRunningTest, RequestPollThenSuccessMetadata) {
110140
request.set_instance_id("test-instance-id");
111141
auto actual =
112142
AsyncLongRunningOperation<Instance>(
113-
cq, std::move(request),
114-
[mock](CompletionQueue& cq,
115-
std::unique_ptr<grpc::ClientContext> context,
116-
CreateInstanceRequest const& request) {
117-
return mock->AsyncCreateInstance(cq, std::move(context), request);
118-
},
119-
[mock](CompletionQueue& cq,
120-
std::unique_ptr<grpc::ClientContext> context,
121-
google::longrunning::GetOperationRequest const& request) {
122-
return mock->AsyncGetOperation(cq, std::move(context), request);
123-
},
124-
[mock](CompletionQueue& cq,
125-
std::unique_ptr<grpc::ClientContext> context,
126-
google::longrunning::CancelOperationRequest const& request) {
127-
return mock->AsyncCancelOperation(cq, std::move(context), request);
128-
},
129-
&ExtractLongRunningResultMetadata<Instance>, TestRetryPolicy(),
130-
TestBackoffPolicy(), Idempotency::kIdempotent, std::move(policy),
131-
"test-function")
143+
cq, std::move(request), MakeStart(mock), MakePoll(mock),
144+
MakeCancel(mock), &ExtractLongRunningResultMetadata<Instance>,
145+
TestRetryPolicy(), TestBackoffPolicy(), Idempotency::kIdempotent,
146+
std::move(policy), "test-function")
132147
.get();
133148
ASSERT_THAT(actual, IsOk());
134149
EXPECT_THAT(*actual, IsProtoEqual(expected));
@@ -172,30 +187,74 @@ TEST(AsyncLongRunningTest, RequestPollThenSuccessResponse) {
172187
request.set_instance_id("test-instance-id");
173188
auto actual =
174189
AsyncLongRunningOperation<Instance>(
175-
cq, std::move(request),
176-
[mock](CompletionQueue& cq,
177-
std::unique_ptr<grpc::ClientContext> context,
178-
CreateInstanceRequest const& request) {
179-
return mock->AsyncCreateInstance(cq, std::move(context), request);
180-
},
181-
[mock](CompletionQueue& cq,
182-
std::unique_ptr<grpc::ClientContext> context,
183-
google::longrunning::GetOperationRequest const& request) {
184-
return mock->AsyncGetOperation(cq, std::move(context), request);
185-
},
186-
[mock](CompletionQueue& cq,
187-
std::unique_ptr<grpc::ClientContext> context,
188-
google::longrunning::CancelOperationRequest const& request) {
189-
return mock->AsyncCancelOperation(cq, std::move(context), request);
190-
},
191-
&ExtractLongRunningResultResponse<Instance>, TestRetryPolicy(),
192-
TestBackoffPolicy(), Idempotency::kIdempotent, std::move(policy),
193-
"test-function")
190+
cq, std::move(request), MakeStart(mock), MakePoll(mock),
191+
MakeCancel(mock), &ExtractLongRunningResultResponse<Instance>,
192+
TestRetryPolicy(), TestBackoffPolicy(), Idempotency::kIdempotent,
193+
std::move(policy), "test-function")
194194
.get();
195195
ASSERT_THAT(actual, IsOk());
196196
EXPECT_THAT(*actual, IsProtoEqual(expected));
197197
}
198198

199+
TEST(AsyncLongRunningTest, RequestPollThenCancel) {
200+
Instance expected;
201+
expected.set_name("test-instance-name");
202+
google::longrunning::Operation starting_op;
203+
starting_op.set_name("test-op-name");
204+
205+
auto mock_cq = std::make_shared<MockCompletionQueueImpl>();
206+
AsyncSequencer<void> timer;
207+
EXPECT_CALL(*mock_cq, MakeRelativeTimer)
208+
.WillRepeatedly([&timer](std::chrono::nanoseconds) {
209+
return timer.PushBack().then([](future<void>) {
210+
return make_status_or(std::chrono::system_clock::now());
211+
});
212+
});
213+
CompletionQueue cq(mock_cq);
214+
215+
auto mock = std::make_shared<MockStub>();
216+
EXPECT_CALL(*mock, AsyncCreateInstance)
217+
.WillOnce([&](CompletionQueue&, std::unique_ptr<grpc::ClientContext>,
218+
CreateInstanceRequest const&) {
219+
return make_ready_future(make_status_or(starting_op));
220+
});
221+
EXPECT_CALL(*mock, AsyncGetOperation)
222+
.Times(AtLeast(1))
223+
.WillRepeatedly([&](CompletionQueue&,
224+
std::unique_ptr<grpc::ClientContext>,
225+
google::longrunning::GetOperationRequest const&) {
226+
return make_ready_future(make_status_or(starting_op));
227+
});
228+
EXPECT_CALL(*mock, AsyncCancelOperation)
229+
.WillOnce([&](CompletionQueue&, std::unique_ptr<grpc::ClientContext>,
230+
google::longrunning::CancelOperationRequest const&) {
231+
return make_ready_future(Status{});
232+
});
233+
auto policy = absl::make_unique<MockPollingPolicy>();
234+
EXPECT_CALL(*policy, clone()).Times(0);
235+
EXPECT_CALL(*policy, OnFailure).WillRepeatedly(Return(true));
236+
EXPECT_CALL(*policy, WaitPeriod)
237+
.WillRepeatedly(Return(std::chrono::milliseconds(1)));
238+
CreateInstanceRequest request;
239+
request.set_parent("test-parent");
240+
request.set_instance_id("test-instance-id");
241+
auto pending = AsyncLongRunningOperation<Instance>(
242+
cq, std::move(request), MakeStart(mock), MakePoll(mock), MakeCancel(mock),
243+
&ExtractLongRunningResultMetadata<Instance>, TestRetryPolicy(),
244+
TestBackoffPolicy(), Idempotency::kIdempotent, std::move(policy),
245+
"test-function");
246+
247+
// Wait until the polling loop is backing off for a second time.
248+
timer.PopFront().set_value();
249+
auto t = timer.PopFront();
250+
// cancel the long running operation
251+
pending.cancel();
252+
// release timer
253+
t.set_value();
254+
auto actual = pending.get();
255+
EXPECT_THAT(actual, StatusIs(StatusCode::kCancelled));
256+
}
257+
199258
} // namespace
200259
} // namespace internal
201260
} // namespace GOOGLE_CLOUD_CPP_NS

google/cloud/internal/async_polling_loop_test.cc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -395,8 +395,9 @@ TEST(AsyncPollingLoopTest, PollThenCancelDuringTimer) {
395395

396396
timer_sequencer.PopFront().set_value(std::chrono::system_clock::now());
397397
get_sequencer.PopFront().set_value(starting_op);
398+
auto t = timer_sequencer.PopFront();
398399
pending.cancel();
399-
timer_sequencer.PopFront().set_value(std::chrono::system_clock::now());
400+
t.set_value(std::chrono::system_clock::now());
400401

401402
auto actual = pending.get();
402403
EXPECT_THAT(actual, StatusIs(Not(Eq(StatusCode::kOk)),
@@ -449,8 +450,9 @@ TEST(AsyncPollingLoopTest, PollThenCancelDuringPoll) {
449450
timer_sequencer.PopFront().set_value(std::chrono::system_clock::now());
450451
get_sequencer.PopFront().set_value(starting_op);
451452
timer_sequencer.PopFront().set_value(std::chrono::system_clock::now());
453+
auto g = get_sequencer.PopFront();
452454
pending.cancel();
453-
get_sequencer.PopFront().set_value(starting_op);
455+
g.set_value(starting_op);
454456

455457
auto actual = pending.get();
456458
EXPECT_THAT(actual, StatusIs(Not(Eq(StatusCode::kOk)),

0 commit comments

Comments
 (0)