Skip to content

Commit e9b7bff

Browse files
authored
feat(common): a generic long-running operation (#6804)
Long-running operations [aip/151] are used for API methods that take a significant amount of time to complete (think minutes, maybe an hour). The gRPC API returns a "promise" object, represented by the `google::longrunning::Operation` proto, and the application (or client library) should periodically poll this object until it is "done". In the C++ client libraries we represent these long-running operations by a member function that returns `future<StatusOr<ReturnType>>`. This PR introduces a generic function to implement these member functions. It first starts the operation using an asynchronous retry loop, and then starts an asynchronous loop to poll the operation until it completes. The promise can complete with an error, which is represented by a `google::cloud::Status` object, or with success and some `ReturnType` value. The application may also configure the "polling policy", which may stop the polling even though the operation has not completed. A future PR will change the generated libraries to use this function. With some work we could change the Spanner library to use it too, without the need to launch detached threads to poll the results. With substantial work we could change the Bigtable library to use it too. [aip/151]: https://google.aip.dev/151
1 parent 039c164 commit e9b7bff

10 files changed

Lines changed: 942 additions & 2 deletions

google/cloud/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ cc_library(
9898
"@com_google_absl//absl/time",
9999
"@com_google_googleapis//:googleapis_system_includes",
100100
"@com_google_googleapis//google/iam/credentials/v1:credentials_cc_grpc",
101+
"@com_google_googleapis//google/longrunning:longrunning_cc_grpc",
101102
"@com_google_googleapis//google/rpc:status_cc_proto",
102103
],
103104
)

google/cloud/CMakeLists.txt

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,10 @@ if (GOOGLE_CLOUD_CPP_ENABLE_GRPC)
390390
grpc_utils/version.h
391391
internal/async_connection_ready.cc
392392
internal/async_connection_ready.h
393+
internal/async_long_running_operation.cc
394+
internal/async_long_running_operation.h
395+
internal/async_polling_loop.cc
396+
internal/async_polling_loop.h
393397
internal/async_read_stream_impl.h
394398
internal/async_read_write_stream_impl.h
395399
internal/async_retry_loop.h
@@ -435,6 +439,7 @@ if (GOOGLE_CLOUD_CPP_ENABLE_GRPC)
435439
absl::memory
436440
absl::time
437441
google-cloud-cpp::iam_protos
442+
google-cloud-cpp::longrunning_operations_protos
438443
google-cloud-cpp::rpc_status_protos
439444
google-cloud-cpp::common
440445
gRPC::grpc++
@@ -524,6 +529,8 @@ if (GOOGLE_CLOUD_CPP_ENABLE_GRPC)
524529
grpc_error_delegate_test.cc
525530
grpc_options_test.cc
526531
internal/async_connection_ready_test.cc
532+
internal/async_long_running_operation_test.cc
533+
internal/async_polling_loop_test.cc
527534
internal/async_read_write_stream_impl_test.cc
528535
internal/async_retry_loop_test.cc
529536
internal/async_retry_unary_rpc_test.cc
@@ -610,8 +617,12 @@ if (GOOGLE_CLOUD_CPP_ENABLE_GRPC)
610617
"gRPC Utilities for the Google Cloud C++ Client Library")
611618
set(GOOGLE_CLOUD_CPP_PC_DESCRIPTION
612619
"Provides gRPC Utilities for the Google Cloud C++ Client Library.")
613-
string(CONCAT GOOGLE_CLOUD_CPP_PC_REQUIRES
614-
"google_cloud_cpp_common google_cloud_cpp_iam_protos")
620+
string(
621+
CONCAT GOOGLE_CLOUD_CPP_PC_REQUIRES
622+
"google_cloud_cpp_common"
623+
" google_cloud_cpp_iam_protos"
624+
" google_cloud_cpp_longrunning_operations_protos"
625+
" google_cloud_cpp_rpc_status_protos")
615626
string(CONCAT GOOGLE_CLOUD_CPP_PC_LIBS "-lgoogle_cloud_cpp_grpc_utils")
616627

617628
# Create and install the pkg-config files.

google/cloud/google_cloud_cpp_grpc_utils.bzl

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ google_cloud_cpp_grpc_utils_hdrs = [
2828
"grpc_utils/grpc_error_delegate.h",
2929
"grpc_utils/version.h",
3030
"internal/async_connection_ready.h",
31+
"internal/async_long_running_operation.h",
32+
"internal/async_polling_loop.h",
3133
"internal/async_read_stream_impl.h",
3234
"internal/async_read_write_stream_impl.h",
3335
"internal/async_retry_loop.h",
@@ -61,6 +63,8 @@ google_cloud_cpp_grpc_utils_srcs = [
6163
"grpc_error_delegate.cc",
6264
"grpc_options.cc",
6365
"internal/async_connection_ready.cc",
66+
"internal/async_long_running_operation.cc",
67+
"internal/async_polling_loop.cc",
6468
"internal/background_threads_impl.cc",
6569
"internal/default_completion_queue_impl.cc",
6670
"internal/grpc_access_token_authentication.cc",

google/cloud/google_cloud_cpp_grpc_utils_unit_tests.bzl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ google_cloud_cpp_grpc_utils_unit_tests = [
2222
"grpc_error_delegate_test.cc",
2323
"grpc_options_test.cc",
2424
"internal/async_connection_ready_test.cc",
25+
"internal/async_long_running_operation_test.cc",
26+
"internal/async_polling_loop_test.cc",
2527
"internal/async_read_write_stream_impl_test.cc",
2628
"internal/async_retry_loop_test.cc",
2729
"internal/async_retry_unary_rpc_test.cc",
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
// Copyright 2021 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "google/cloud/internal/async_long_running_operation.h"
16+
#include "google/cloud/grpc_error_delegate.h"
17+
18+
namespace google {
19+
namespace cloud {
20+
inline namespace GOOGLE_CLOUD_CPP_NS {
21+
namespace internal {
22+
23+
Status ExtractOperationResultImpl(
24+
StatusOr<google::longrunning::Operation> op,
25+
google::protobuf::Message& result,
26+
absl::FunctionRef<bool(google::protobuf::Any const&)> validate_any,
27+
std::string const& location) {
28+
if (!op) return std::move(op).status();
29+
if (op->has_error()) return MakeStatusFromRpcError(op->error());
30+
if (!op->has_metadata()) {
31+
return Status(StatusCode::kInternal,
32+
location +
33+
"() cannot extract value from operation without error or "
34+
"metadata, name=" +
35+
op->name());
36+
}
37+
google::protobuf::Any const& any = op->metadata();
38+
if (!validate_any(any)) {
39+
return Status(
40+
StatusCode::kInternal,
41+
location +
42+
"() operation completed with an invalid metadata type, name=" +
43+
op->name());
44+
}
45+
any.UnpackTo(&result);
46+
return Status{};
47+
}
48+
49+
} // namespace internal
50+
} // namespace GOOGLE_CLOUD_CPP_NS
51+
} // namespace cloud
52+
} // namespace google
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
// Copyright 2021 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_INTERNAL_ASYNC_LONG_RUNNING_OPERATION_H
16+
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_INTERNAL_ASYNC_LONG_RUNNING_OPERATION_H
17+
18+
#include "google/cloud/backoff_policy.h"
19+
#include "google/cloud/completion_queue.h"
20+
#include "google/cloud/future.h"
21+
#include "google/cloud/internal/async_polling_loop.h"
22+
#include "google/cloud/internal/async_retry_loop.h"
23+
#include "google/cloud/polling_policy.h"
24+
#include "google/cloud/status_or.h"
25+
#include "google/cloud/version.h"
26+
#include "absl/functional/function_ref.h"
27+
#include <google/longrunning/operations.pb.h>
28+
#include <grpcpp/grpcpp.h>
29+
#include <functional>
30+
#include <memory>
31+
#include <string>
32+
33+
namespace google {
34+
namespace cloud {
35+
inline namespace GOOGLE_CLOUD_CPP_NS {
36+
namespace internal {
37+
38+
/// Extracts the value (or error) from a completed long-running operation
39+
Status ExtractOperationResultImpl(
40+
StatusOr<google::longrunning::Operation> op,
41+
google::protobuf::Message& result,
42+
absl::FunctionRef<bool(google::protobuf::Any const&)> validate_any,
43+
std::string const& location);
44+
45+
/**
46+
* Extracts the value from a completed long-running operation.
47+
*
48+
* This helper is used in `AsyncLongRunningOperation()` to extract the value (or
49+
* error) from a completed long-running operation.
50+
*/
51+
template <typename ReturnType>
52+
StatusOr<ReturnType> ExtractLongRunningResult(
53+
StatusOr<google::longrunning::Operation> op, std::string const& location) {
54+
ReturnType result;
55+
auto status = ExtractOperationResultImpl(
56+
std::move(op), result,
57+
[](google::protobuf::Any const& any) { return any.Is<ReturnType>(); },
58+
location);
59+
if (!status.ok()) return status;
60+
return result;
61+
}
62+
63+
/**
64+
* Asynchronously starts and polls a long-running operation.
65+
*
66+
* Long-running operations [aip/151] are used for API methods that take a
67+
* significant amount of time to complete (think minutes, maybe an hour). The
68+
* gRPC API returns a "promise" object, represented by the
69+
* `google::longrunning::Operation` proto, and the application (or client
70+
* library) should periodically poll this object until it is "done".
71+
*
72+
* In the C++ client libraries we represent these long-running operations by
73+
* a member function that returns `future<StatusOr<ReturnType>>`. This function
74+
* is a helper to implement these member functions. It first starts the
75+
* operation using an asynchronous retry loop, and then starts an asynchronous
76+
* loop to poll the operation until it completes.
77+
*
78+
* The promise can complete with an error, which is represented by a
79+
* `google::cloud::Status` object, or with success and some `ReturnType` value.
80+
* The application may also configure the "polling policy", which may stop the
81+
* polling even though the operation has not completed.
82+
*
83+
* Library developers would use this function as follows:
84+
*
85+
* @code
86+
* class BarStub {
87+
* public:
88+
* virtual future<StatusOr<google::longrunning::Operation>> AsyncFoo(
89+
* google::cloud::CompletionQueue& cq,
90+
* std::unique_ptr<grpc::ClientContext> context,
91+
* FooRequest const& request) = 0;
92+
*
93+
* virtual future<StatusOr<google::longrunning::Operation>> AsyncGetOperation(
94+
* google::cloud::CompletionQueue& cq,
95+
* std::unique_ptr<grpc::ClientContext> context,
96+
* google::longrunning::GetOperationRequest const& request) = 0;
97+
* };
98+
* @endcode
99+
*
100+
* The corresponding `*ConnectionImpl` class would look as follows:
101+
*
102+
* @code
103+
* class BarConnectionImpl : public BarConnection {
104+
* public:
105+
* // Using C++14 for exposition purposes. The implementation supports C++11.
106+
* future<StatusOr<FooResponse>> Foo(FooRequest const& request) override {
107+
* return google::cloud::internal::AsyncLongRunningOperation(
108+
* cq_, request,
109+
* [stub = stub_](auto& cq, auto context, auto const& request) {
110+
* return stub->AsyncFoo(cq, std::move(context), request);
111+
* },
112+
* [stub = stub_](auto& cq, auto context, auto const& request) {
113+
* return stub->AsyncGetOperation(cq, std::move(context), request);
114+
* },
115+
* retry_policy_->clone(), backoff_policy_->clone(),
116+
* IdempotencyPolicy::kIdempotent,
117+
* polling_policy_->clone(),
118+
* __func__ // for debugging
119+
* );
120+
* }
121+
*
122+
* private:
123+
* google::cloud::CompletionQueue cq_;
124+
* std::shared_ptr<BarStub> stub_;
125+
* };
126+
* @endcode
127+
*
128+
* [aip/151]: https://google.aip.dev/151
129+
*/
130+
template <typename ReturnType, typename RequestType, typename StartFunctor,
131+
typename RetryPolicyType>
132+
future<StatusOr<ReturnType>> AsyncLongRunningOperation(
133+
google::cloud::CompletionQueue cq, RequestType&& request,
134+
StartFunctor&& start, AsyncPollLongRunningOperation poll,
135+
std::unique_ptr<RetryPolicyType> retry_policy,
136+
std::unique_ptr<BackoffPolicy> backoff_policy, Idempotency idempotent,
137+
std::unique_ptr<PollingPolicy> polling_policy, char const* location) {
138+
auto operation =
139+
AsyncRetryLoop(std::move(retry_policy), std::move(backoff_policy),
140+
idempotent, cq, std::forward<StartFunctor>(start),
141+
std::forward<RequestType>(request), location);
142+
struct MoveCapture {
143+
google::cloud::CompletionQueue cq;
144+
AsyncPollLongRunningOperation poll;
145+
std::unique_ptr<PollingPolicy> polling_policy;
146+
std::string location;
147+
148+
future<StatusOr<ReturnType>> operator()(
149+
future<StatusOr<google::longrunning::Operation>> f) {
150+
auto op = f.get();
151+
if (!op) {
152+
return make_ready_future(StatusOr<ReturnType>(std::move(op).status()));
153+
}
154+
auto loc = this->location;
155+
return AsyncPollingLoop(std::move(cq), *std::move(op), std::move(poll),
156+
std::move(polling_policy), std::move(location))
157+
.then([loc](future<StatusOr<google::longrunning::Operation>> g) {
158+
return ExtractLongRunningResult<ReturnType>(g.get(), loc);
159+
});
160+
}
161+
};
162+
163+
return operation.then(MoveCapture{std::move(cq), std::move(poll),
164+
std::move(polling_policy),
165+
std::string{location}});
166+
}
167+
168+
} // namespace internal
169+
} // namespace GOOGLE_CLOUD_CPP_NS
170+
} // namespace cloud
171+
} // namespace google
172+
173+
#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_INTERNAL_ASYNC_LONG_RUNNING_OPERATION_H

0 commit comments

Comments
 (0)