Skip to content

Commit 0b90f64

Browse files
authored
ext_proc: Ext proc half close on destroy and defer reset till trailers received. (#37083)
<!-- !!!ATTENTION!!! If you are fixing *any* crash or *any* potential security issue, *do not* open a pull request in this repo. Please report the issue via emailing [email protected] where the issue will be triaged appropriately. Thank you in advance for helping to keep Envoy secure. !!!ATTENTION!!! For an explanation of how to fill out the fields, please see the relevant section in [PULL_REQUESTS.md](https://github.com/envoyproxy/envoy/blob/main/PULL_REQUESTS.md) --> Commit Message: [ext_proc] Ext proc half close on destroy and defer reset till trailers received. Additional Description: In Grpc, the trailers carries the grpc-status header terminates a rpc stream. Our current flow ignores the trailers by closeStream and resetStream all together, the later resetStream call would signal the remote server a CANCEL, while clean the sidestream and ignore any possible trailers that might have been sent by ext_proc server. This PR would defer the cleanup of the side stream till a trailers been received, or till a cleanup timer fires. Risk Level: medium Testing: unit tests Docs Changes: Release Notes: Platform Specific Features: --------- Signed-off-by: Xin Zhuang <[email protected]>
1 parent 60038ff commit 0b90f64

13 files changed

Lines changed: 201 additions & 83 deletions

File tree

source/extensions/filters/http/ext_proc/client.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,13 @@ class ExternalProcessorStream : public StreamBase {
2323
virtual void send(envoy::service::ext_proc::v3::ProcessingRequest&& request,
2424
bool end_stream) PURE;
2525
// Idempotent close. Return true if it actually closed.
26-
virtual bool close() PURE;
26+
// Sends a half-close from the client side.
27+
// No further messages can be sent after this, but gRPC server may still send
28+
// messages back.
29+
virtual bool closeLocalStream() PURE;
30+
virtual bool remoteClosed() const PURE;
31+
virtual bool localClosed() const PURE;
32+
virtual void resetStream() PURE;
2733
virtual const StreamInfo::StreamInfo& streamInfo() const PURE;
2834
virtual StreamInfo::StreamInfo& streamInfo() PURE;
2935
virtual void notifyFilterDestroy() PURE;

source/extensions/filters/http/ext_proc/client_impl.cc

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,16 +65,15 @@ void ExternalProcessorStreamImpl::send(envoy::service::ext_proc::v3::ProcessingR
6565
stream_.sendMessage(std::move(request), end_stream);
6666
}
6767

68-
bool ExternalProcessorStreamImpl::close() {
69-
if (!stream_closed_) {
68+
bool ExternalProcessorStreamImpl::closeLocalStream() {
69+
if (!local_closed_) {
7070
ENVOY_LOG(debug, "Closing gRPC stream");
7171
// Unregister the watermark callbacks, if any exist (e.g., filter is not destroyed yet)
7272
if (grpc_side_stream_flow_control_ && callbacks_.has_value()) {
7373
stream_.removeWatermarkCallbacks();
7474
}
7575
stream_.closeStream();
76-
stream_closed_ = true;
77-
stream_.resetStream();
76+
local_closed_ = true;
7877
return true;
7978
}
8079
return false;
@@ -95,7 +94,10 @@ void ExternalProcessorStreamImpl::onReceiveTrailingMetadata(Http::ResponseTraile
9594
void ExternalProcessorStreamImpl::onRemoteClose(Grpc::Status::GrpcStatus status,
9695
const std::string& message) {
9796
ENVOY_LOG(debug, "gRPC stream closed remotely with status {}: {}", status, message);
98-
stream_closed_ = true;
97+
// Also mark the client stream as closed: the underlying http async stream is
98+
// closed and cleaned up already after processing trailers.
99+
local_closed_ = true;
100+
remote_closed_ = true;
99101

100102
if (!callbacks_.has_value()) {
101103
ENVOY_LOG(debug, "Underlying filter object has been destroyed.");

source/extensions/filters/http/ext_proc/client_impl.h

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ class ExternalProcessorStreamImpl : public ExternalProcessorStream,
5555
void send(ProcessingRequest&& request, bool end_stream) override;
5656
// Close the stream. This is idempotent and will return true if we
5757
// actually closed it.
58-
bool close() override;
58+
bool closeLocalStream() override;
5959

6060
void notifyFilterDestroy() override {
6161
// When the filter object is being destroyed, `callbacks_` (which is a OptRef to filter object)
@@ -64,7 +64,7 @@ class ExternalProcessorStreamImpl : public ExternalProcessorStream,
6464

6565
// Unregister the watermark callbacks(if any) to prevent access of filter callbacks after
6666
// the filter object is destroyed.
67-
if (!stream_closed_) {
67+
if (!local_closed_) {
6868
// Remove the parent stream info to avoid a dangling reference.
6969
stream_.streamInfo().clearParentStreamInfo();
7070
if (grpc_side_stream_flow_control_) {
@@ -73,9 +73,23 @@ class ExternalProcessorStreamImpl : public ExternalProcessorStream,
7373
}
7474
}
7575

76+
// Returns true if the stream is closed from the Server/remote side.
77+
bool remoteClosed() const override { return remote_closed_; }
78+
79+
// Returns true if the stream is closed from the Envoy/client side.
80+
bool localClosed() const override { return local_closed_; }
81+
7682
// AsyncStreamCallbacks
7783
void onReceiveMessage(ProcessingResponsePtr&& message) override;
7884

85+
void resetStream() override {
86+
if (!remoteClosed()) {
87+
stream_.resetStream();
88+
// Mark the stream as closed from the Server/remote side.
89+
remote_closed_ = true;
90+
}
91+
}
92+
7993
// RawAsyncStreamCallbacks
8094
void onCreateInitialMetadata(Http::RequestHeaderMap& metadata) override;
8195
void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&& metadata) override;
@@ -101,7 +115,10 @@ class ExternalProcessorStreamImpl : public ExternalProcessorStream,
101115
Grpc::AsyncClient<ProcessingRequest, ProcessingResponse> client_;
102116
Grpc::AsyncStream<ProcessingRequest> stream_;
103117
Http::AsyncClient::ParentContext grpc_context_;
104-
bool stream_closed_ = false;
118+
// Whether stream is closed from Envoy/client side.
119+
bool local_closed_ = false;
120+
// Whether the stream is closed from the Server/remote side.
121+
bool remote_closed_ = false;
105122
// Boolean flag initiated by runtime flag.
106123
const bool grpc_side_stream_flow_control_;
107124
};

source/extensions/filters/http/ext_proc/ext_proc.cc

Lines changed: 59 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -468,22 +468,37 @@ void Filter::closeStream() {
468468
if (!config_->grpcService().has_value()) {
469469
return;
470470
}
471+
// Stream not opened or already cleaned up.
472+
if (stream_ == nullptr) {
473+
ENVOY_LOG(debug, "Stream already closed or not opened yet.");
474+
return;
475+
}
471476

472-
if (stream_) {
477+
if (!stream_->localClosed()) {
473478
ENVOY_LOG(debug, "Calling close on stream");
474-
if (stream_->close()) {
479+
if (stream_->closeLocalStream()) {
475480
stats_.streams_closed_.inc();
476481
}
477-
config_->threadLocalStreamManager().erase(stream_);
478-
stream_ = nullptr;
479-
} else {
480-
ENVOY_LOG(debug, "Stream already closed");
481482
}
482483
}
483484

484-
void Filter::deferredCloseStream() {
485-
ENVOY_LOG(debug, "Calling deferred close on stream");
485+
void Filter::deferredResetStream() {
486+
ENVOY_LOG(debug, "Calling deferred cleanup on stream");
487+
if (stream_ == nullptr) {
488+
ENVOY_LOG(debug, "Stream already reset or not opened yet.");
489+
return;
490+
}
486491
config_->threadLocalStreamManager().deferredErase(stream_, filter_callbacks_->dispatcher());
492+
stream_ = nullptr;
493+
}
494+
495+
void Filter::cleanupStream() {
496+
ENVOY_LOG(debug, "Calling cleanup stream");
497+
if (stream_ != nullptr) {
498+
stream_->resetStream();
499+
config_->threadLocalStreamManager().erase(stream_);
500+
stream_ = nullptr;
501+
}
487502
}
488503

489504
void Filter::onDestroy() {
@@ -498,23 +513,30 @@ void Filter::onDestroy() {
498513
client_->cancel();
499514
return;
500515
}
501-
502-
if (config_->observabilityMode()) {
503-
// In observability mode where the main stream processing and side stream processing are
504-
// asynchronous, it is possible that filter instance is destroyed before the side stream request
505-
// arrives at ext_proc server. In order to prevent the data loss in this case, side stream
506-
// closure is deferred upon filter destruction with a timer.
507-
508-
// First, release the referenced filter resource.
509-
if (stream_ != nullptr) {
516+
if (stream_ != nullptr) {
517+
if (!stream_->localClosed()) {
518+
// Perform immediate close on the stream otherwise.
519+
closeStream();
520+
}
521+
// Defer the resource cleanup if the remote is not closed (no trailers
522+
// received).
523+
// This means essentially not sending a reset to the server, which
524+
// translates into a client CANCELED error on the server side.
525+
if (config_->observabilityMode() || !stream_->remoteClosed()) {
526+
// In observability mode where the main stream processing and side stream
527+
// processing are asynchronous, it is possible that filter instance is
528+
// destroyed before the side stream request arrives at ext_proc server. In
529+
// order to prevent the data loss in this case, side stream resetting is
530+
// deferred upon filter destruction with a timer.
531+
532+
// First, release the referenced filter resource.
510533
stream_->notifyFilterDestroy();
534+
// Second, perform stream deferred closure.
535+
deferredResetStream();
536+
} else {
537+
// Server closed the stream, so we can cleanup the stream immediately.
538+
cleanupStream();
511539
}
512-
513-
// Second, perform stream deferred closure.
514-
deferredCloseStream();
515-
} else {
516-
// Perform immediate close on the stream otherwise.
517-
closeStream();
518540
}
519541
}
520542

@@ -1360,6 +1382,7 @@ void Filter::onGrpcError(Grpc::Status::GrpcStatus status) {
13601382
// make sure that they do not fire now.
13611383
onFinishProcessorCalls(status);
13621384
closeStream();
1385+
cleanupStream();
13631386
ImmediateResponse errorResponse;
13641387
errorResponse.mutable_status()->set_code(StatusCode::InternalServerError);
13651388
errorResponse.set_details(absl::StrFormat("%s_gRPC_error_%i", ErrorPrefix, status));
@@ -1371,10 +1394,14 @@ void Filter::onGrpcClose() {
13711394
ENVOY_LOG(debug, "Received gRPC stream close");
13721395

13731396
processing_complete_ = true;
1374-
stats_.streams_closed_.inc();
13751397
// Successful close. We can ignore the stream for the rest of our request
13761398
// and response processing.
13771399
closeStream();
1400+
// ExternalProcessorStreamImpl::onRemoteClose will call onGrpcClose in either
1401+
// happy path or error path.
1402+
// This will mark remote_closed_ as closed.
1403+
cleanupStream();
1404+
13781405
clearAsyncState();
13791406
}
13801407

@@ -1389,13 +1416,15 @@ void Filter::onMessageTimeout() {
13891416
// the external processor for the rest of the request.
13901417
processing_complete_ = true;
13911418
closeStream();
1419+
cleanupStream();
13921420
stats_.failure_mode_allowed_.inc();
13931421
clearAsyncState();
13941422

13951423
} else {
13961424
// Return an error and stop processing the current stream.
13971425
processing_complete_ = true;
13981426
closeStream();
1427+
cleanupStream();
13991428
decoding_state_.onFinishProcessorCall(Grpc::Status::DeadlineExceeded);
14001429
encoding_state_.onFinishProcessorCall(Grpc::Status::DeadlineExceeded);
14011430
ImmediateResponse errorResponse;
@@ -1537,13 +1566,12 @@ void Filter::mergePerRouteConfig() {
15371566
}
15381567
}
15391568

1540-
void DeferredDeletableStream::closeStreamOnTimer() {
1569+
void DeferredDeletableStream::cleanupStreamOnTimer() {
15411570
// Close the stream.
1542-
if (stream_) {
1543-
ENVOY_LOG(debug, "Closing the stream");
1544-
if (stream_->close()) {
1545-
stats.streams_closed_.inc();
1546-
}
1571+
if (stream_ != nullptr) {
1572+
ENVOY_LOG(debug, "Resetting the stream.");
1573+
// After timeout if still no trailers received, reset the stream.
1574+
stream_->resetStream();
15471575
// Erase this entry from the map; this will also reset the stream_ pointer.
15481576
parent.erase(stream_.get());
15491577
} else {
@@ -1554,7 +1582,7 @@ void DeferredDeletableStream::closeStreamOnTimer() {
15541582
// In the deferred closure mode, stream closure is deferred upon filter destruction, with a timer
15551583
// to prevent unbounded resource usage growth.
15561584
void DeferredDeletableStream::deferredClose(Envoy::Event::Dispatcher& dispatcher) {
1557-
derferred_close_timer = dispatcher.createTimer([this] { closeStreamOnTimer(); });
1585+
derferred_close_timer = dispatcher.createTimer([this] { cleanupStreamOnTimer(); });
15581586
derferred_close_timer->enableTimer(std::chrono::milliseconds(deferred_close_timeout));
15591587
}
15601588

source/extensions/filters/http/ext_proc/ext_proc.h

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,10 @@ struct DeferredDeletableStream : public Logger::Loggable<Logger::Id::ext_proc> {
161161
deferred_close_timeout(timeout) {}
162162

163163
void deferredClose(Envoy::Event::Dispatcher& dispatcher);
164-
void closeStreamOnTimer();
164+
// After a timer timeouts, reset the stream, this essentially reset the
165+
// underlying gRPC stream. Gives remote grpc server a CANCELED signal, and
166+
// ignored any further messages/callbacks from the server.
167+
void cleanupStreamOnTimer();
165168

166169
ExternalProcessorStreamPtr stream_;
167170
ThreadLocalStreamManager& parent;
@@ -191,7 +194,6 @@ class ThreadLocalStreamManager : public Envoy::ThreadLocal::ThreadLocalObject {
191194
if (it == stream_manager_.end()) {
192195
return;
193196
}
194-
195197
it->second->deferredClose(dispatcher);
196198
}
197199

@@ -459,6 +461,11 @@ class Filter : public Logger::Loggable<Logger::Id::ext_proc>,
459461
void mergePerRouteConfig();
460462
StreamOpenState openStream();
461463
void closeStream();
464+
// Erases the stream from the threadLocalStreamManager, and reset the
465+
// stream_ pointer and the underlying gRPC stream.
466+
// This is called when the stream needs to be cleaned up, due to remote close
467+
// event, or local stream timeouts.
468+
void cleanupStream();
462469

463470
void onFinishProcessorCalls(Grpc::Status::GrpcStatus call_status);
464471
void clearAsyncState();
@@ -496,7 +503,7 @@ class Filter : public Logger::Loggable<Logger::Id::ext_proc>,
496503
bool end_stream);
497504
Http::FilterDataStatus sendDataInObservabilityMode(Buffer::Instance& data, ProcessorState& state,
498505
bool end_stream);
499-
void deferredCloseStream();
506+
void deferredResetStream();
500507

501508
envoy::service::ext_proc::v3::ProcessingRequest
502509
buildHeaderRequest(ProcessorState& state, Http::RequestOrResponseHeaderMap& headers,

test/extensions/filters/http/ext_proc/BUILD

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ envoy_extension_cc_test(
2929

3030
envoy_extension_cc_test(
3131
name = "filter_test",
32-
size = "small",
32+
size = "medium",
3333
srcs = ["filter_test.cc"],
3434
copts = select({
3535
"//bazel:windows_x86_64": [],

test/extensions/filters/http/ext_proc/client_test.cc

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,7 @@ TEST_F(ExtProcStreamTest, OpenCloseStream) {
9393
auto options = Http::AsyncClient::StreamOptions().setParentContext(parent_context);
9494
auto stream = client_->start(*this, config_with_hash_key_, options, watermark_callbacks_);
9595
EXPECT_CALL(stream_, closeStream());
96-
EXPECT_CALL(stream_, resetStream());
97-
stream->close();
96+
stream->closeLocalStream();
9897
}
9998

10099
TEST_F(ExtProcStreamTest, SendToStream) {
@@ -107,8 +106,8 @@ TEST_F(ExtProcStreamTest, SendToStream) {
107106
ProcessingRequest req;
108107
stream->send(std::move(req), false);
109108
EXPECT_CALL(stream_, closeStream());
110-
EXPECT_CALL(stream_, resetStream());
111-
stream->close();
109+
110+
stream->closeLocalStream();
112111
}
113112

114113
TEST_F(ExtProcStreamTest, SendAndClose) {
@@ -150,8 +149,7 @@ TEST_F(ExtProcStreamTest, ReceiveFromStream) {
150149
stream_callbacks_->onReceiveTrailingMetadata(std::move(empty_response_trailers));
151150

152151
EXPECT_CALL(stream_, closeStream());
153-
EXPECT_CALL(stream_, resetStream());
154-
stream->close();
152+
stream->closeLocalStream();
155153
}
156154

157155
TEST_F(ExtProcStreamTest, StreamClosed) {
@@ -161,13 +159,15 @@ TEST_F(ExtProcStreamTest, StreamClosed) {
161159
auto stream = client_->start(*this, config_with_hash_key_, options, watermark_callbacks_);
162160
ASSERT_NE(stream_callbacks_, nullptr);
163161
EXPECT_FALSE(last_response_);
162+
EXPECT_FALSE(stream->remoteClosed());
164163
EXPECT_FALSE(grpc_closed_);
165164
EXPECT_EQ(grpc_status_, 0);
166165
stream_callbacks_->onRemoteClose(0, "");
167166
EXPECT_FALSE(last_response_);
168167
EXPECT_TRUE(grpc_closed_);
168+
EXPECT_TRUE(stream->remoteClosed());
169169
EXPECT_EQ(grpc_status_, 0);
170-
stream->close();
170+
stream->closeLocalStream();
171171
}
172172

173173
TEST_F(ExtProcStreamTest, StreamError) {
@@ -183,7 +183,8 @@ TEST_F(ExtProcStreamTest, StreamError) {
183183
EXPECT_FALSE(last_response_);
184184
EXPECT_FALSE(grpc_closed_);
185185
EXPECT_EQ(grpc_status_, 123);
186-
stream->close();
186+
stream->closeLocalStream();
187+
EXPECT_TRUE(stream->localClosed());
187188
}
188189

189190
} // namespace

test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -638,6 +638,16 @@ class ExtProcIntegrationTest : public HttpIntegrationTest,
638638

639639
handleUpstreamRequest();
640640
verifyDownstreamResponse(*response, 200);
641+
// Also verify that grpc-status sent via trailers are correctly propagated
642+
// and recorded.
643+
if (clientType() == Grpc::ClientType::EnvoyGrpc) {
644+
const auto& ext_cluster =
645+
test_server_->server().clusterManager().clusters().getCluster("ext_proc_server_0");
646+
const auto& ext_server_host =
647+
ext_cluster.value().get().prioritySet().hostSetsPerPriority()[0]->hosts()[0];
648+
EXPECT_EQ(ext_server_host->stats().rq_success_.value(), 1);
649+
EXPECT_EQ(ext_server_host->stats().rq_total_.value(), 1);
650+
}
641651
}
642652

643653
void testSendDyanmicMetadata() {

0 commit comments

Comments
 (0)