-
Notifications
You must be signed in to change notification settings - Fork 5.3k
tcp_proxy: Add filter state envoy.tcp_proxy.receive_before_connect #25804
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
25c2d8a
79bc1fb
fff172e
6f52664
df45a3c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,6 +12,7 @@ | |
| #include "envoy/extensions/filters/network/tcp_proxy/v3/tcp_proxy.pb.h" | ||
| #include "envoy/extensions/filters/network/tcp_proxy/v3/tcp_proxy.pb.validate.h" | ||
| #include "envoy/stats/scope.h" | ||
| #include "envoy/stream_info/bool_accessor.h" | ||
| #include "envoy/upstream/cluster_manager.h" | ||
| #include "envoy/upstream/upstream.h" | ||
|
|
||
|
|
@@ -244,7 +245,17 @@ void Filter::initialize(Network::ReadFilterCallbacks& callbacks, bool set_connec | |
| // Need to disable reads so that we don't write to an upstream that might fail | ||
| // in onData(). This will get re-enabled when the upstream connection is | ||
| // established. | ||
| read_callbacks_->connection().readDisable(true); | ||
| auto receive_before_connect = | ||
| read_callbacks_->connection() | ||
| .streamInfo() | ||
| .filterState() | ||
| ->getDataReadOnly<StreamInfo::BoolAccessor>(ReceiveBeforeConnectKey); | ||
| if (receive_before_connect && receive_before_connect->value()) { | ||
| receive_before_connect_ = true; | ||
| } else { | ||
| read_callbacks_->connection().readDisable(true); | ||
| } | ||
|
|
||
| getStreamInfo().setDownstreamBytesMeter(std::make_shared<StreamInfo::BytesMeter>()); | ||
| getStreamInfo().setUpstreamInfo(std::make_shared<StreamInfo::UpstreamInfoImpl>()); | ||
|
|
||
|
|
@@ -462,8 +473,11 @@ Network::FilterStatus Filter::establishUpstreamConnection() { | |
| // cluster->trafficStats()->upstream_cx_none_healthy in the latter case. | ||
| getStreamInfo().setResponseFlag(StreamInfo::ResponseFlag::NoHealthyUpstream); | ||
| onInitFailure(UpstreamFailureReason::NoHealthyUpstream); | ||
| return Network::FilterStatus::StopIteration; | ||
| } | ||
| return Network::FilterStatus::StopIteration; | ||
| // Allow OnData() to receive data before connect if so configured | ||
| return receive_before_connect_ ? Network::FilterStatus::Continue | ||
| : Network::FilterStatus::StopIteration; | ||
| } | ||
|
|
||
| void Filter::onClusterDiscoveryCompletion(Upstream::ClusterDiscoveryStatus cluster_status) { | ||
|
|
@@ -671,12 +685,18 @@ Network::FilterStatus Filter::onData(Buffer::Instance& data, bool end_stream) { | |
| if (upstream_) { | ||
| getStreamInfo().getUpstreamBytesMeter()->addWireBytesSent(data.length()); | ||
| upstream_->encodeData(data, end_stream); | ||
| resetIdleTimer(); // TODO(ggreenway) PERF: do we need to reset timer on both send and receive? | ||
| } else if (receive_before_connect_) { | ||
| // Buffer data received before upstream connection exists | ||
| early_data_buffer_.move(data); | ||
|
||
| if (!early_data_end_stream_) { | ||
| early_data_end_stream_ = end_stream; | ||
| } | ||
| } | ||
| // The upstream should consume all of the data. | ||
| // Before there is an upstream the connection should be readDisabled. If the upstream is | ||
| // destroyed, there should be no further reads as well. | ||
| ASSERT(0 == data.length()); | ||
| resetIdleTimer(); // TODO(ggreenway) PERF: do we need to reset timer on both send and receive? | ||
| return Network::FilterStatus::StopIteration; | ||
| } | ||
|
|
||
|
|
@@ -791,7 +811,13 @@ void Filter::onUpstreamConnection() { | |
| connecting_ = false; | ||
| // Re-enable downstream reads now that the upstream connection is established | ||
| // so we have a place to send downstream data to. | ||
| read_callbacks_->connection().readDisable(false); | ||
| if (!receive_before_connect_) { | ||
| read_callbacks_->connection().readDisable(false); | ||
| } else if (early_data_buffer_.length() > 0) { | ||
| getStreamInfo().getUpstreamBytesMeter()->addWireBytesSent(early_data_buffer_.length()); | ||
| upstream_->encodeData(early_data_buffer_, early_data_end_stream_); | ||
|
||
| ASSERT(0 == early_data_buffer_.length()); | ||
| } | ||
|
|
||
| read_callbacks_->upstreamHost()->outlierDetector().putResult( | ||
| Upstream::Outlier::Result::LocalOriginConnectSuccessFinal); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,6 +12,8 @@ | |
|
|
||
| #include "source/common/config/api_version.h" | ||
| #include "source/common/network/utility.h" | ||
| #include "source/common/stream_info/bool_accessor_impl.h" | ||
| #include "source/common/tcp_proxy/tcp_proxy.h" | ||
| #include "source/extensions/filters/network/common/factory_base.h" | ||
| #include "source/extensions/transport_sockets/tls/context_manager_impl.h" | ||
|
|
||
|
|
@@ -1259,18 +1261,27 @@ class InjectDynamicMetadata : public Network::ReadFilter { | |
| Network::FilterStatus onData(Buffer::Instance& data, bool) override { | ||
| if (!metadata_set_) { | ||
| // To allow testing a write that returns `StopIteration`, only proceed | ||
| // when more than 1 byte is received. | ||
| if (data.length() < 2) { | ||
| ASSERT(data.length() == 1); | ||
|
|
||
| // Echo data back to test can verify it was received. | ||
| Buffer::OwnedImpl copy(data); | ||
| read_callbacks_->connection().write(copy, false); | ||
| // when more than 1 word is received. | ||
|
|
||
| // locate the first space in data | ||
| char space = ' '; | ||
| ssize_t index = data.search(&space, sizeof(space), 0); | ||
| if (index < 0) { | ||
| // When returning StopIteration the received data remains in the buffer | ||
| // so that we can get to it later when enough data has been received. | ||
| return Network::FilterStatus::StopIteration; | ||
| } | ||
|
|
||
| void* p = data.linearize(index); | ||
| std::string first_word(static_cast<char*>(p), index); | ||
|
|
||
| // Echo first word back so tests can verify it was received | ||
| Buffer::OwnedImpl copy(first_word); | ||
| read_callbacks_->connection().write(copy, false); | ||
|
|
||
| // Use the first word as dynamic metadata value | ||
| ProtobufWkt::Value val; | ||
| val.set_string_value(data.toString()); | ||
| val.set_string_value(first_word); | ||
|
|
||
| ProtobufWkt::Struct& map = | ||
| (*read_callbacks_->connection() | ||
|
|
@@ -1279,24 +1290,25 @@ class InjectDynamicMetadata : public Network::ReadFilter { | |
| .mutable_filter_metadata())[Envoy::Config::MetadataFilters::get().ENVOY_LB]; | ||
| (*map.mutable_fields())[key_] = val; | ||
|
|
||
| // Put this back in the state that TcpProxy expects. | ||
| read_callbacks_->connection().readDisable(true); | ||
|
|
||
| metadata_set_ = true; | ||
| } | ||
| return Network::FilterStatus::Continue; | ||
| } | ||
|
|
||
| Network::FilterStatus onNewConnection() override { | ||
| // TcpProxy disables read; must re-enable so we can read headers. | ||
|
||
| read_callbacks_->connection().readDisable(false); | ||
|
|
||
| // Stop until we read the value and can set the metadata for TcpProxy. | ||
| // TcpProxy proceeds with upstream connection once onData() returns FilterStatus::Continue. | ||
| return Network::FilterStatus::StopIteration; | ||
| } | ||
|
|
||
| void initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) override { | ||
| read_callbacks_ = &callbacks; | ||
|
|
||
| // Tell TcpProxy to not disable read so that we can read headers | ||
| read_callbacks_->connection().streamInfo().filterState()->setData( | ||
| TcpProxy::ReceiveBeforeConnectKey, std::make_unique<StreamInfo::BoolAccessorImpl>(true), | ||
| StreamInfo::FilterState::StateType::ReadOnly, | ||
| StreamInfo::FilterState::LifeSpan::Connection); | ||
| } | ||
|
|
||
| const std::string key_; | ||
|
|
@@ -1354,14 +1366,25 @@ TEST_P(TcpProxyDynamicMetadataMatchIntegrationTest, DynamicMetadataMatch) { | |
| initialize(); | ||
|
|
||
| expectEndpointToMatchRoute([](IntegrationTcpClient& tcp_client) -> std::string { | ||
| // Break the write into two; validate that the first is received before sending the second. This | ||
| // validates that a downstream filter can use this functionality, even if it can't make a | ||
| // decision after the first `onData()`. | ||
| EXPECT_TRUE(tcp_client.write("p", false)); | ||
| tcp_client.waitForData("p"); | ||
| // Break the write into multiple chunks; validate that the first word is received before sending | ||
| // the rest. This validates that a downstream filter can use this functionality, even if it | ||
| // can't make a decision after the first `onData()`. | ||
| EXPECT_TRUE(tcp_client.write("pri", false)); | ||
| // Writing additional data in multiple chunks to show that buffering of early data in tcp_buffer | ||
| // works properly | ||
| EXPECT_TRUE(tcp_client.write("mary is ", false)); | ||
|
||
| EXPECT_TRUE(tcp_client.write("selected ", false)); | ||
| EXPECT_TRUE(tcp_client.write("before ", false)); | ||
| // check that the 1st word is returned | ||
| tcp_client.waitForData("primary"); | ||
| tcp_client.clearData(); | ||
| EXPECT_TRUE(tcp_client.write("rimary", false)); | ||
| return "primary"; | ||
| // some more data, most likely the upstream connection has already been established as we waited | ||
| // for the return data above. | ||
| EXPECT_TRUE(tcp_client.write("upstream connection ", false)); | ||
| EXPECT_TRUE(tcp_client.write("exists", false)); | ||
|
|
||
| // All data expected at the destination | ||
| return "primary is selected before upstream connection exists"; | ||
| }); | ||
| } | ||
|
|
||
|
|
@@ -1378,7 +1401,7 @@ TEST_P(TcpProxyDynamicMetadataMatchIntegrationTest, DynamicMetadataNonMatch) { | |
|
|
||
| initialize(); | ||
|
|
||
| expectEndpointNotToMatchRoute("does_not_match_role_primary"); | ||
| expectEndpointNotToMatchRoute("does not match role primary"); | ||
| } | ||
|
|
||
| INSTANTIATE_TEST_SUITE_P(TcpProxyIntegrationTestParams, TcpProxySslIntegrationTest, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the case of
receive_before_connect_, now that earlier filters have seen whatever data they need and allowed the filter chain to continue, should tcp_proxyreadDisable(true)to prevent additional data from getting buffered while the upstream connection is established?If not, then I think there needs to be some limit to the data enforced. One option is to leave it in the Connection buffer so that flow control can be triggered.