Skip to content

Commit 43cc041

Browse files
authored
feat(generator): use async for long-running operations (#6830)
For long-running operations we need to generate three different RPCs: one to start the operation (e.g. `CreateDatabase()`), one to poll the operation (`GetOperation()`), and one to cancel the operation (`CancelOperation()`). With this change all the three RPCs are generated as asynchronous RPCs. The `*Connection` class can use these asynchronous RPCs in the background, using the background threads. This (1) avoids creating a detached thread to poll the operation, (2) uses code that is more easily unit tested separate from the generated libraries.
1 parent 02e449b commit 43cc041

22 files changed

Lines changed: 1332 additions & 1171 deletions

generator/integration_tests/golden/golden_thing_admin_connection.cc

Lines changed: 97 additions & 145 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@
2222
#include "generator/integration_tests/golden/internal/golden_thing_admin_stub_factory.h"
2323
#include "google/cloud/background_threads.h"
2424
#include "google/cloud/grpc_options.h"
25+
#include "google/cloud/internal/async_long_running_operation.h"
2526
#include "google/cloud/internal/pagination_range.h"
26-
#include "google/cloud/internal/polling_loop.h"
2727
#include "google/cloud/internal/retry_loop.h"
2828
#include <memory>
2929

@@ -219,23 +219,30 @@ class GoldenThingAdminConnectionImpl : public GoldenThingAdminConnection {
219219
}
220220

221221
future<StatusOr<google::test::admin::database::v1::Database>>
222-
CreateDatabase(
223-
google::test::admin::database::v1::CreateDatabaseRequest const& request) override {
224-
auto operation = google::cloud::internal::RetryLoop(
225-
retry_policy_prototype_->clone(), backoff_policy_prototype_->clone(),
226-
idempotency_policy_->CreateDatabase(request),
227-
[this](grpc::ClientContext& context,
228-
google::test::admin::database::v1::CreateDatabaseRequest const& request) {
229-
return stub_->CreateDatabase(context, request);
230-
},
231-
request, __func__);
232-
if (!operation) {
233-
return google::cloud::make_ready_future(
234-
StatusOr<google::test::admin::database::v1::Database>(operation.status()));
235-
}
236-
237-
return AwaitCreateDatabase(*std::move(operation));
238-
}
222+
CreateDatabase(google::test::admin::database::v1::CreateDatabaseRequest const& request) override {
223+
auto stub = stub_;
224+
return google::cloud::internal::AsyncLongRunningOperation<google::test::admin::database::v1::Database>(
225+
background_->cq(), request,
226+
[stub](google::cloud::CompletionQueue& cq,
227+
std::unique_ptr<grpc::ClientContext> context,
228+
google::test::admin::database::v1::CreateDatabaseRequest const& request) {
229+
return stub->AsyncCreateDatabase(cq, std::move(context), request);
230+
},
231+
[stub](google::cloud::CompletionQueue& cq,
232+
std::unique_ptr<grpc::ClientContext> context,
233+
google::longrunning::GetOperationRequest const& request) {
234+
return stub->AsyncGetOperation(cq, std::move(context), request);
235+
},
236+
[stub](google::cloud::CompletionQueue& cq,
237+
std::unique_ptr<grpc::ClientContext> context,
238+
google::longrunning::CancelOperationRequest const& request) {
239+
return stub->AsyncCancelOperation(cq, std::move(context), request);
240+
},
241+
&google::cloud::internal::ExtractLongRunningResultResponse<google::test::admin::database::v1::Database>,
242+
retry_policy_prototype_->clone(), backoff_policy_prototype_->clone(),
243+
idempotency_policy_->CreateDatabase(request),
244+
polling_policy_prototype_->clone(), __func__);
245+
}
239246

240247
StatusOr<google::test::admin::database::v1::Database>
241248
GetDatabase(
@@ -251,23 +258,30 @@ class GoldenThingAdminConnectionImpl : public GoldenThingAdminConnection {
251258
}
252259

253260
future<StatusOr<google::test::admin::database::v1::UpdateDatabaseDdlMetadata>>
254-
UpdateDatabaseDdl(
255-
google::test::admin::database::v1::UpdateDatabaseDdlRequest const& request) override {
256-
auto operation = google::cloud::internal::RetryLoop(
257-
retry_policy_prototype_->clone(), backoff_policy_prototype_->clone(),
258-
idempotency_policy_->UpdateDatabaseDdl(request),
259-
[this](grpc::ClientContext& context,
260-
google::test::admin::database::v1::UpdateDatabaseDdlRequest const& request) {
261-
return stub_->UpdateDatabaseDdl(context, request);
262-
},
263-
request, __func__);
264-
if (!operation) {
265-
return google::cloud::make_ready_future(
266-
StatusOr<google::test::admin::database::v1::UpdateDatabaseDdlMetadata>(operation.status()));
267-
}
268-
269-
return AwaitUpdateDatabaseDdl(*std::move(operation));
270-
}
261+
UpdateDatabaseDdl(google::test::admin::database::v1::UpdateDatabaseDdlRequest const& request) override {
262+
auto stub = stub_;
263+
return google::cloud::internal::AsyncLongRunningOperation<google::test::admin::database::v1::UpdateDatabaseDdlMetadata>(
264+
background_->cq(), request,
265+
[stub](google::cloud::CompletionQueue& cq,
266+
std::unique_ptr<grpc::ClientContext> context,
267+
google::test::admin::database::v1::UpdateDatabaseDdlRequest const& request) {
268+
return stub->AsyncUpdateDatabaseDdl(cq, std::move(context), request);
269+
},
270+
[stub](google::cloud::CompletionQueue& cq,
271+
std::unique_ptr<grpc::ClientContext> context,
272+
google::longrunning::GetOperationRequest const& request) {
273+
return stub->AsyncGetOperation(cq, std::move(context), request);
274+
},
275+
[stub](google::cloud::CompletionQueue& cq,
276+
std::unique_ptr<grpc::ClientContext> context,
277+
google::longrunning::CancelOperationRequest const& request) {
278+
return stub->AsyncCancelOperation(cq, std::move(context), request);
279+
},
280+
&google::cloud::internal::ExtractLongRunningResultMetadata<google::test::admin::database::v1::UpdateDatabaseDdlMetadata>,
281+
retry_policy_prototype_->clone(), backoff_policy_prototype_->clone(),
282+
idempotency_policy_->UpdateDatabaseDdl(request),
283+
polling_policy_prototype_->clone(), __func__);
284+
}
271285

272286
Status
273287
DropDatabase(
@@ -335,23 +349,30 @@ class GoldenThingAdminConnectionImpl : public GoldenThingAdminConnection {
335349
}
336350

337351
future<StatusOr<google::test::admin::database::v1::Backup>>
338-
CreateBackup(
339-
google::test::admin::database::v1::CreateBackupRequest const& request) override {
340-
auto operation = google::cloud::internal::RetryLoop(
341-
retry_policy_prototype_->clone(), backoff_policy_prototype_->clone(),
342-
idempotency_policy_->CreateBackup(request),
343-
[this](grpc::ClientContext& context,
344-
google::test::admin::database::v1::CreateBackupRequest const& request) {
345-
return stub_->CreateBackup(context, request);
346-
},
347-
request, __func__);
348-
if (!operation) {
349-
return google::cloud::make_ready_future(
350-
StatusOr<google::test::admin::database::v1::Backup>(operation.status()));
351-
}
352-
353-
return AwaitCreateBackup(*std::move(operation));
354-
}
352+
CreateBackup(google::test::admin::database::v1::CreateBackupRequest const& request) override {
353+
auto stub = stub_;
354+
return google::cloud::internal::AsyncLongRunningOperation<google::test::admin::database::v1::Backup>(
355+
background_->cq(), request,
356+
[stub](google::cloud::CompletionQueue& cq,
357+
std::unique_ptr<grpc::ClientContext> context,
358+
google::test::admin::database::v1::CreateBackupRequest const& request) {
359+
return stub->AsyncCreateBackup(cq, std::move(context), request);
360+
},
361+
[stub](google::cloud::CompletionQueue& cq,
362+
std::unique_ptr<grpc::ClientContext> context,
363+
google::longrunning::GetOperationRequest const& request) {
364+
return stub->AsyncGetOperation(cq, std::move(context), request);
365+
},
366+
[stub](google::cloud::CompletionQueue& cq,
367+
std::unique_ptr<grpc::ClientContext> context,
368+
google::longrunning::CancelOperationRequest const& request) {
369+
return stub->AsyncCancelOperation(cq, std::move(context), request);
370+
},
371+
&google::cloud::internal::ExtractLongRunningResultResponse<google::test::admin::database::v1::Backup>,
372+
retry_policy_prototype_->clone(), backoff_policy_prototype_->clone(),
373+
idempotency_policy_->CreateBackup(request),
374+
polling_policy_prototype_->clone(), __func__);
375+
}
355376

356377
StatusOr<google::test::admin::database::v1::Backup>
357378
GetBackup(
@@ -424,23 +445,30 @@ class GoldenThingAdminConnectionImpl : public GoldenThingAdminConnection {
424445
}
425446

426447
future<StatusOr<google::test::admin::database::v1::Database>>
427-
RestoreDatabase(
428-
google::test::admin::database::v1::RestoreDatabaseRequest const& request) override {
429-
auto operation = google::cloud::internal::RetryLoop(
430-
retry_policy_prototype_->clone(), backoff_policy_prototype_->clone(),
431-
idempotency_policy_->RestoreDatabase(request),
432-
[this](grpc::ClientContext& context,
433-
google::test::admin::database::v1::RestoreDatabaseRequest const& request) {
434-
return stub_->RestoreDatabase(context, request);
435-
},
436-
request, __func__);
437-
if (!operation) {
438-
return google::cloud::make_ready_future(
439-
StatusOr<google::test::admin::database::v1::Database>(operation.status()));
440-
}
441-
442-
return AwaitRestoreDatabase(*std::move(operation));
443-
}
448+
RestoreDatabase(google::test::admin::database::v1::RestoreDatabaseRequest const& request) override {
449+
auto stub = stub_;
450+
return google::cloud::internal::AsyncLongRunningOperation<google::test::admin::database::v1::Database>(
451+
background_->cq(), request,
452+
[stub](google::cloud::CompletionQueue& cq,
453+
std::unique_ptr<grpc::ClientContext> context,
454+
google::test::admin::database::v1::RestoreDatabaseRequest const& request) {
455+
return stub->AsyncRestoreDatabase(cq, std::move(context), request);
456+
},
457+
[stub](google::cloud::CompletionQueue& cq,
458+
std::unique_ptr<grpc::ClientContext> context,
459+
google::longrunning::GetOperationRequest const& request) {
460+
return stub->AsyncGetOperation(cq, std::move(context), request);
461+
},
462+
[stub](google::cloud::CompletionQueue& cq,
463+
std::unique_ptr<grpc::ClientContext> context,
464+
google::longrunning::CancelOperationRequest const& request) {
465+
return stub->AsyncCancelOperation(cq, std::move(context), request);
466+
},
467+
&google::cloud::internal::ExtractLongRunningResultResponse<google::test::admin::database::v1::Database>,
468+
retry_policy_prototype_->clone(), backoff_policy_prototype_->clone(),
469+
idempotency_policy_->RestoreDatabase(request),
470+
polling_policy_prototype_->clone(), __func__);
471+
}
444472

445473
StreamRange<google::longrunning::Operation> ListDatabaseOperations(
446474
google::test::admin::database::v1::ListDatabaseOperationsRequest request) override {
@@ -505,82 +533,6 @@ class GoldenThingAdminConnectionImpl : public GoldenThingAdminConnection {
505533
}
506534

507535
private:
508-
template <typename MethodResponse, template<typename> class Extractor,
509-
typename Stub>
510-
future<StatusOr<MethodResponse>>
511-
AwaitLongrunningOperation(google::longrunning::Operation operation) { // NOLINT
512-
using ResponseExtractor = Extractor<MethodResponse>;
513-
std::weak_ptr<Stub> cancel_stub(stub_);
514-
promise<typename ResponseExtractor::ReturnType> pr(
515-
[cancel_stub, operation]() {
516-
grpc::ClientContext context;
517-
context.set_deadline(std::chrono::system_clock::now() +
518-
std::chrono::seconds(60));
519-
google::longrunning::CancelOperationRequest request;
520-
request.set_name(operation.name());
521-
if (auto ptr = cancel_stub.lock()) {
522-
ptr->CancelOperation(context, request);
523-
}
524-
});
525-
auto f = pr.get_future();
526-
std::thread t(
527-
[](std::shared_ptr<Stub> stub,
528-
google::longrunning::Operation operation,
529-
std::unique_ptr<PollingPolicy> polling_policy,
530-
google::cloud::promise<typename ResponseExtractor::ReturnType> promise,
531-
char const* location) mutable {
532-
auto result = google::cloud::internal::PollingLoop<ResponseExtractor>(
533-
std::move(polling_policy),
534-
[stub](grpc::ClientContext& context,
535-
google::longrunning::GetOperationRequest const& request) {
536-
return stub->GetOperation(context, request);
537-
},
538-
std::move(operation), location);
539-
stub.reset();
540-
promise.set_value(std::move(result));
541-
},
542-
stub_, std::move(operation), polling_policy_prototype_->clone(),
543-
std::move(pr), __func__);
544-
t.detach();
545-
return f;
546-
}
547-
548-
future<StatusOr<google::test::admin::database::v1::Database>>
549-
AwaitCreateDatabase(
550-
google::longrunning::Operation operation) {
551-
return AwaitLongrunningOperation<
552-
google::test::admin::database::v1::Database,
553-
google::cloud::internal::PollingLoopResponseExtractor,
554-
golden_internal::GoldenThingAdminStub>(std::move(operation));
555-
}
556-
557-
future<StatusOr<google::test::admin::database::v1::UpdateDatabaseDdlMetadata>>
558-
AwaitUpdateDatabaseDdl(
559-
google::longrunning::Operation operation) {
560-
return AwaitLongrunningOperation<
561-
google::test::admin::database::v1::UpdateDatabaseDdlMetadata,
562-
google::cloud::internal::PollingLoopMetadataExtractor,
563-
golden_internal::GoldenThingAdminStub>(std::move(operation));
564-
}
565-
566-
future<StatusOr<google::test::admin::database::v1::Backup>>
567-
AwaitCreateBackup(
568-
google::longrunning::Operation operation) {
569-
return AwaitLongrunningOperation<
570-
google::test::admin::database::v1::Backup,
571-
google::cloud::internal::PollingLoopResponseExtractor,
572-
golden_internal::GoldenThingAdminStub>(std::move(operation));
573-
}
574-
575-
future<StatusOr<google::test::admin::database::v1::Database>>
576-
AwaitRestoreDatabase(
577-
google::longrunning::Operation operation) {
578-
return AwaitLongrunningOperation<
579-
google::test::admin::database::v1::Database,
580-
google::cloud::internal::PollingLoopResponseExtractor,
581-
golden_internal::GoldenThingAdminStub>(std::move(operation));
582-
}
583-
584536
std::unique_ptr<google::cloud::BackgroundThreads> background_;
585537
std::shared_ptr<golden_internal::GoldenThingAdminStub> stub_;
586538
std::unique_ptr<GoldenThingAdminRetryPolicy const> retry_policy_prototype_;

generator/integration_tests/golden/internal/golden_kitchen_sink_auth_decorator.h

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,27 +40,22 @@ class GoldenKitchenSinkAuth : public GoldenKitchenSinkStub {
4040
std::shared_ptr<google::cloud::internal::GrpcAuthenticationStrategy> auth,
4141
std::shared_ptr<GoldenKitchenSinkStub> child);
4242

43-
4443
StatusOr<google::test::admin::database::v1::GenerateAccessTokenResponse> GenerateAccessToken(
4544
grpc::ClientContext& context,
4645
google::test::admin::database::v1::GenerateAccessTokenRequest const& request) override;
4746

48-
4947
StatusOr<google::test::admin::database::v1::GenerateIdTokenResponse> GenerateIdToken(
5048
grpc::ClientContext& context,
5149
google::test::admin::database::v1::GenerateIdTokenRequest const& request) override;
5250

53-
5451
StatusOr<google::test::admin::database::v1::WriteLogEntriesResponse> WriteLogEntries(
5552
grpc::ClientContext& context,
5653
google::test::admin::database::v1::WriteLogEntriesRequest const& request) override;
5754

58-
5955
StatusOr<google::test::admin::database::v1::ListLogsResponse> ListLogs(
6056
grpc::ClientContext& context,
6157
google::test::admin::database::v1::ListLogsRequest const& request) override;
6258

63-
6459
std::unique_ptr<internal::StreamingReadRpc<google::test::admin::database::v1::TailLogEntriesResponse>>
6560
TailLogEntries(
6661
std::unique_ptr<grpc::ClientContext> context,
@@ -70,7 +65,6 @@ class GoldenKitchenSinkAuth : public GoldenKitchenSinkStub {
7065
grpc::ClientContext& context,
7166
google::test::admin::database::v1::ListServiceAccountKeysRequest const& request) override;
7267

73-
7468
private:
7569
std::shared_ptr<google::cloud::internal::GrpcAuthenticationStrategy> auth_;
7670
std::shared_ptr<GoldenKitchenSinkStub> child_;

0 commit comments

Comments
 (0)