-
Notifications
You must be signed in to change notification settings - Fork 5.3k
Expand file tree
/
Copy pathasync_client_impl.h
More file actions
438 lines (396 loc) · 18.1 KB
/
async_client_impl.h
File metadata and controls
438 lines (396 loc) · 18.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
#pragma once
#include <chrono>
#include <cstdint>
#include <functional>
#include <list>
#include <map>
#include <memory>
#include <string>
#include <vector>
#include "envoy/buffer/buffer.h"
#include "envoy/common/random_generator.h"
#include "envoy/common/scope_tracker.h"
#include "envoy/config/core/v3/base.pb.h"
#include "envoy/config/route/v3/route_components.pb.h"
#include "envoy/config/typed_metadata.h"
#include "envoy/event/dispatcher.h"
#include "envoy/http/async_client.h"
#include "envoy/http/codec.h"
#include "envoy/http/context.h"
#include "envoy/http/filter.h"
#include "envoy/http/header_map.h"
#include "envoy/http/message.h"
#include "envoy/router/context.h"
#include "envoy/router/router.h"
#include "envoy/router/router_ratelimit.h"
#include "envoy/router/shadow_writer.h"
#include "envoy/server/filter_config.h"
#include "envoy/ssl/connection.h"
#include "envoy/tracing/tracer.h"
#include "envoy/type/v3/percent.pb.h"
#include "envoy/upstream/load_balancer.h"
#include "envoy/upstream/upstream.h"
#include "source/common/common/assert.h"
#include "source/common/common/empty_string.h"
#include "source/common/common/linked_object.h"
#include "source/common/http/message_impl.h"
#include "source/common/http/null_route_impl.h"
#include "source/common/local_reply/local_reply.h"
#include "source/common/router/config_impl.h"
#include "source/common/router/router.h"
#include "source/common/stream_info/stream_info_impl.h"
#include "source/common/tracing/http_tracer_impl.h"
#include "source/common/upstream/retry_factory.h"
#include "source/extensions/early_data/default_early_data_policy.h"
namespace Envoy {
namespace Http {
namespace {
// Limit the size of buffer for data used for retries.
// This is currently fixed to 64KB.
constexpr uint64_t kDefaultDecoderBufferLimit = 1 << 16;
// Response buffer limit 32MB.
constexpr uint64_t kBufferLimitForResponse = 32 * 1024 * 1024;
} // namespace
class AsyncStreamImpl;
class AsyncRequestSharedImpl;
class AsyncClientImpl final : public AsyncClient {
public:
AsyncClientImpl(Upstream::ClusterInfoConstSharedPtr cluster, Stats::Store& stats_store,
Event::Dispatcher& dispatcher, Upstream::ClusterManager& cm,
Server::Configuration::CommonFactoryContext& factory_context,
Router::ShadowWriterPtr&& shadow_writer, Http::Context& http_context,
Router::Context& router_context);
~AsyncClientImpl() override;
// Http::AsyncClient
Request* send(RequestMessagePtr&& request, Callbacks& callbacks,
const AsyncClient::RequestOptions& options) override;
Stream* start(StreamCallbacks& callbacks, const AsyncClient::StreamOptions& options) override;
OngoingRequest* startRequest(RequestHeaderMapPtr&& request_headers, Callbacks& callbacks,
const AsyncClient::RequestOptions& options) override;
Server::Configuration::CommonFactoryContext& factory_context_;
Upstream::ClusterInfoConstSharedPtr cluster_;
Event::Dispatcher& dispatcher() override { return dispatcher_; }
static const absl::string_view ResponseBufferLimit;
private:
template <typename T> T* internalStartRequest(T* async_request);
const Router::FilterConfigSharedPtr config_;
Event::Dispatcher& dispatcher_;
std::list<std::unique_ptr<AsyncStreamImpl>> active_streams_;
const LocalReply::LocalReplyPtr local_reply_;
friend class AsyncStreamImpl;
friend class AsyncRequestSharedImpl;
};
/**
* Implementation of AsyncRequest. This implementation is capable of sending HTTP requests to a
* ConnectionPool asynchronously.
*/
class AsyncStreamImpl : public virtual AsyncClient::Stream,
public StreamDecoderFilterCallbacks,
public Event::DeferredDeletable,
public Logger::Loggable<Logger::Id::http>,
public LinkedObject<AsyncStreamImpl>,
public ScopeTrackedObject {
public:
static absl::StatusOr<std::unique_ptr<AsyncStreamImpl>>
create(AsyncClientImpl& parent, AsyncClient::StreamCallbacks& callbacks,
const AsyncClient::StreamOptions& options) {
absl::Status creation_status = absl::OkStatus();
std::unique_ptr<AsyncStreamImpl> stream = std::unique_ptr<AsyncStreamImpl>(
new AsyncStreamImpl(parent, callbacks, options, creation_status));
RETURN_IF_NOT_OK(creation_status);
return stream;
}
~AsyncStreamImpl() override {
routerDestroy();
// UpstreamRequest::cleanUp() is guaranteed to reset the high watermark calls.
ENVOY_BUG(high_watermark_calls_ == 0, "Excess high watermark calls after async stream ended.");
if (destructor_callback_.has_value()) {
(*destructor_callback_)();
}
}
void setDestructorCallback(AsyncClient::StreamDestructorCallbacks callback) override {
ASSERT(!destructor_callback_);
destructor_callback_.emplace(callback);
}
void removeDestructorCallback() override {
ASSERT(destructor_callback_);
destructor_callback_.reset();
}
void setWatermarkCallbacks(Http::SidestreamWatermarkCallbacks& callbacks) override {
ENVOY_BUG(!watermark_callbacks_, "Watermark callbacks should not already be registered!");
watermark_callbacks_.emplace(callbacks);
for (uint32_t i = 0; i < high_watermark_calls_; ++i) {
watermark_callbacks_->get().onSidestreamAboveHighWatermark();
}
}
void removeWatermarkCallbacks() override {
ENVOY_BUG(watermark_callbacks_, "Watermark callbacks should already be registered!");
for (uint32_t i = 0; i < high_watermark_calls_; ++i) {
watermark_callbacks_->get().onSidestreamBelowLowWatermark();
}
watermark_callbacks_.reset();
}
// Http::AsyncClient::Stream
void sendHeaders(RequestHeaderMap& headers, bool end_stream) override;
void sendData(Buffer::Instance& data, bool end_stream) override;
void sendTrailers(RequestTrailerMap& trailers) override;
void reset() override;
bool isAboveWriteBufferHighWatermark() const override { return high_watermark_calls_ > 0; }
const StreamInfo::StreamInfo& streamInfo() const override { return stream_info_; }
StreamInfo::StreamInfoImpl& streamInfo() override { return stream_info_; }
protected:
AsyncStreamImpl(AsyncClientImpl& parent, AsyncClient::StreamCallbacks& callbacks,
const AsyncClient::StreamOptions& options, absl::Status& creation_status);
bool remoteClosed() { return remote_closed_; }
void closeLocal(bool end_stream);
AsyncClientImpl& parent_;
// Callback to listen for stream destruction.
absl::optional<AsyncClient::StreamDestructorCallbacks> destructor_callback_;
// Callback to listen for low/high/overflow watermark events.
absl::optional<std::reference_wrapper<SidestreamWatermarkCallbacks>> watermark_callbacks_;
bool complete_{};
const bool discard_response_body_;
const bool new_async_client_retry_logic_{};
absl::optional<uint64_t> buffer_limit_{absl::nullopt};
private:
void cleanup();
void closeRemote(bool end_stream);
bool complete() { return local_closed_ && remote_closed_; }
void routerDestroy();
// Http::StreamDecoderFilterCallbacks
OptRef<const Network::Connection> connection() override { return {}; }
Event::Dispatcher& dispatcher() override { return parent_.dispatcher_; }
void resetStream(Http::StreamResetReason reset_reason = Http::StreamResetReason::LocalReset,
absl::string_view transport_failure_reason = "") override;
OptRef<const Router::Route> route() override { return makeOptRefFromPtr(route_.get()); }
Router::RouteConstSharedPtr routeSharedPtr() override { return route_; }
OptRef<const Upstream::ClusterInfo> clusterInfo() override {
return makeOptRefFromPtr(parent_.cluster_.get());
}
Upstream::ClusterInfoConstSharedPtr clusterInfoSharedPtr() override { return parent_.cluster_; }
uint64_t streamId() const override { return stream_id_; }
// TODO(kbaichoo): Plumb account from owning request filter.
Buffer::BufferMemoryAccountSharedPtr account() const override { return account_; }
Tracing::Span& activeSpan() override { return active_span_; }
OptRef<const Tracing::Config> tracingConfig() const override {
return makeOptRef<const Tracing::Config>(tracing_config_);
}
void continueDecoding() override {}
RequestTrailerMap& addDecodedTrailers() override { PANIC("not implemented"); }
void addDecodedData(Buffer::Instance& data, bool) override {
if (!new_async_client_retry_logic_) {
// This should only be called if the user has set up buffering. The request is already fully
// buffered. Note that this is only called via the async client's internal use of the router
// filter which uses this function for buffering.
ASSERT(buffered_body_ != nullptr);
return;
}
// This will only be used by internal router filter for buffering for retries.
// If the buffer limit is reached, the router filter will ignore the retry and the following
// data will not be buffered. So, we don't need to check the buffer limit here because the
// router filter already did that.
if (buffered_body_ == nullptr) {
buffered_body_ = std::make_unique<Buffer::OwnedImpl>();
}
buffered_body_->move(data);
}
MetadataMapVector& addDecodedMetadata() override { PANIC("not implemented"); }
void injectDecodedDataToFilterChain(Buffer::Instance&, bool) override {}
const Buffer::Instance* decodingBuffer() override { return buffered_body_.get(); }
void modifyDecodingBuffer(std::function<void(Buffer::Instance&)>) override {}
void sendLocalReply(Code code, absl::string_view body,
std::function<void(ResponseHeaderMap& headers)> modify_headers,
const absl::optional<Grpc::Status::GrpcStatus> grpc_status,
absl::string_view details) override;
// The async client won't pause if sending 1xx headers so simply swallow any.
void encode1xxHeaders(ResponseHeaderMapPtr&&) override {}
void encodeHeaders(ResponseHeaderMapPtr&& headers, bool end_stream,
absl::string_view details) override;
void encodeData(Buffer::Instance& data, bool end_stream) override;
void encodeTrailers(ResponseTrailerMapPtr&& trailers) override;
void encodeMetadata(MetadataMapPtr&&) override {}
void onDecoderFilterAboveWriteBufferHighWatermark() override {
++high_watermark_calls_;
if (watermark_callbacks_.has_value()) {
watermark_callbacks_->get().onSidestreamAboveHighWatermark();
}
}
void onDecoderFilterBelowWriteBufferLowWatermark() override {
ASSERT(high_watermark_calls_ != 0);
--high_watermark_calls_;
if (watermark_callbacks_.has_value()) {
watermark_callbacks_->get().onSidestreamBelowLowWatermark();
}
}
void addDownstreamWatermarkCallbacks(DownstreamWatermarkCallbacks&) override {}
void removeDownstreamWatermarkCallbacks(DownstreamWatermarkCallbacks&) override {}
void sendGoAwayAndClose(bool graceful [[maybe_unused]] = false) override {}
void setBufferLimit(uint64_t) override {
IS_ENVOY_BUG("decoder buffer limits should not be overridden on async streams.");
}
uint64_t bufferLimit() override {
if (new_async_client_retry_logic_) {
return buffer_limit_.value_or(kDefaultDecoderBufferLimit);
} else {
return buffer_limit_.value_or(0);
}
}
bool recreateStream(const ResponseHeaderMap*) override { return false; }
const ScopeTrackedObject& scope() override { return *this; }
void restoreContextOnContinue(ScopeTrackedObjectStack& tracked_object_stack) override {
tracked_object_stack.add(*this);
}
void addUpstreamSocketOptions(const Network::Socket::OptionsSharedPtr&) override {}
Network::Socket::OptionsSharedPtr getUpstreamSocketOptions() const override { return {}; }
const Router::RouteSpecificFilterConfig* mostSpecificPerFilterConfig() const override {
return nullptr;
}
Router::RouteSpecificFilterConfigs perFilterConfigs() const override { return {}; }
Http1StreamEncoderOptionsOptRef http1StreamEncoderOptions() override { return {}; }
OptRef<DownstreamStreamFilterCallbacks> downstreamCallbacks() override { return {}; }
OptRef<UpstreamStreamFilterCallbacks> upstreamCallbacks() override { return {}; }
void resetIdleTimer() override {}
void setUpstreamOverrideHost(Upstream::LoadBalancerContext::OverrideHost host) override {
upstream_override_host_ = std::move(host);
}
OptRef<const Upstream::LoadBalancerContext::OverrideHost> upstreamOverrideHost() const override {
if (upstream_override_host_.host.empty()) {
return {};
}
return upstream_override_host_;
}
bool shouldLoadShed() const override { return false; }
absl::string_view filterConfigName() const override { return ""; }
RequestHeaderMapOptRef requestHeaders() override { return makeOptRefFromPtr(request_headers_); }
RequestTrailerMapOptRef requestTrailers() override {
return makeOptRefFromPtr(request_trailers_);
}
ResponseHeaderMapOptRef informationalHeaders() override { return {}; }
ResponseHeaderMapOptRef responseHeaders() override { return {}; }
ResponseTrailerMapOptRef responseTrailers() override { return {}; }
// ScopeTrackedObject
void dumpState(std::ostream& os, int indent_level) const override {
const char* spaces = spacesForLevel(indent_level);
os << spaces << "AsyncClient " << this << DUMP_MEMBER(stream_id_) << "\n";
DUMP_DETAILS(&stream_info_);
}
AsyncClient::StreamCallbacks& stream_callbacks_;
const uint64_t stream_id_;
Router::ProdFilter router_;
StreamInfo::StreamInfoImpl stream_info_;
Tracing::NullSpan active_span_;
const Tracing::Config& tracing_config_;
const LocalReply::LocalReply& local_reply_;
Router::RouteConstSharedPtr parent_route_;
std::shared_ptr<NullRouteImpl> route_;
uint32_t high_watermark_calls_{};
bool local_closed_{};
bool remote_closed_{};
Buffer::InstancePtr buffered_body_;
Buffer::BufferMemoryAccountSharedPtr account_{nullptr};
RequestHeaderMap* request_headers_{};
RequestTrailerMap* request_trailers_{};
bool encoded_response_headers_{};
bool is_grpc_request_{};
bool is_head_request_{false};
bool send_xff_{true};
bool send_internal_{true};
bool router_destroyed_{false};
// Upstream override host for bypassing load balancer selection
Upstream::LoadBalancerContext::OverrideHost upstream_override_host_;
friend class AsyncClientImpl;
friend class AsyncClientImplUnitTest;
};
class AsyncRequestSharedImpl : public virtual AsyncClient::Request,
protected AsyncStreamImpl,
protected AsyncClient::StreamCallbacks {
public:
void cancel() final;
protected:
AsyncRequestSharedImpl(AsyncClientImpl& parent, AsyncClient::Callbacks& callbacks,
const AsyncClient::RequestOptions& options, absl::Status& creation_status);
void onHeaders(ResponseHeaderMapPtr&& headers, bool end_stream) final;
void onData(Buffer::Instance& data, bool end_stream) final;
void onTrailers(ResponseTrailerMapPtr&& trailers) final;
void onComplete() final;
void onReset() final;
AsyncClient::Callbacks& callbacks_;
Tracing::SpanPtr child_span_;
std::unique_ptr<ResponseMessageImpl> response_;
bool cancelled_{};
bool response_buffer_overlimit_{};
const uint64_t response_buffer_limit_;
};
class AsyncOngoingRequestImpl final : public AsyncClient::OngoingRequest,
public AsyncRequestSharedImpl {
public:
static AsyncOngoingRequestImpl* create(RequestHeaderMapPtr&& request_headers,
AsyncClientImpl& parent, AsyncClient::Callbacks& callbacks,
const AsyncClient::RequestOptions& options) {
absl::Status creation_status = absl::OkStatus();
auto* ret = new AsyncOngoingRequestImpl(std::move(request_headers), parent, callbacks, options,
creation_status);
if (!creation_status.ok()) {
delete ret;
return nullptr;
}
return ret;
}
void captureAndSendTrailers(RequestTrailerMapPtr&& trailers) override {
request_trailers_ = std::move(trailers);
sendTrailers(*request_trailers_);
}
private:
AsyncOngoingRequestImpl(RequestHeaderMapPtr&& request_headers, AsyncClientImpl& parent,
AsyncClient::Callbacks& callbacks,
const AsyncClient::RequestOptions& options, absl::Status& creation_status)
: AsyncRequestSharedImpl(parent, callbacks, options, creation_status),
request_headers_(std::move(request_headers)) {
ASSERT(request_headers_);
}
void initialize();
RequestHeaderMapPtr request_headers_;
RequestTrailerMapPtr request_trailers_;
friend class AsyncClientImpl;
};
class AsyncRequestImpl final : public AsyncRequestSharedImpl {
public:
static AsyncRequestImpl* create(RequestMessagePtr&& request, AsyncClientImpl& parent,
AsyncClient::Callbacks& callbacks,
const AsyncClient::RequestOptions& options) {
absl::Status creation_status = absl::OkStatus();
auto* ret =
new AsyncRequestImpl(std::move(request), parent, callbacks, options, creation_status);
if (!creation_status.ok()) {
delete ret;
return nullptr;
}
return ret;
}
private:
AsyncRequestImpl(RequestMessagePtr&& request, AsyncClientImpl& parent,
AsyncClient::Callbacks& callbacks, const AsyncClient::RequestOptions& options,
absl::Status& creation_status)
: AsyncRequestSharedImpl(parent, callbacks, options, creation_status),
request_(std::move(request)) {}
void initialize();
// Http::StreamDecoderFilterCallbacks
void addDecodedData(Buffer::Instance&, bool) override {
// This will only be used by internal router filter for buffering for retries.
// But for AsyncRequest that all data is already buffered in request message
// and do not need to buffer again.
}
const Buffer::Instance* decodingBuffer() override { return &request_->body(); }
uint64_t bufferLimit() override {
if (new_async_client_retry_logic_) {
// 0 means no limit because the whole body is already buffered in request message.
return 0;
} else {
return buffer_limit_.value_or(0);
}
}
RequestMessagePtr request_;
friend class AsyncClientImpl;
};
} // namespace Http
} // namespace Envoy