Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ removed_config_or_runtime:
removed runtime key ``envoy.reloadable_features.delta_xds_subscription_state_tracking_fix`` and legacy code paths.

new_features:
- area: tcp_proxy
change: |
added :ref:`an option <config_network_filters_tcp_proxy_receive_before_connect>` to allow filters to read from the
downstream connection before TCP proxy has opened the upstream connection, by setting a filter state object for the key
``envoy.tcp_proxy.receive_before_connect``.
- area: access_log
change: |
added %ACCESS_LOG_TYPE% substitution string, to help distinguishing between access log records and when they are being
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,21 @@ To define metadata that a suitable upstream host must match, use one of the foll
In addition, dynamic metadata can be set by earlier network filters on the ``StreamInfo``. Setting the dynamic metadata
must happen before ``onNewConnection()`` is called on the ``TcpProxy`` filter to affect load balancing.

.. _config_network_filters_tcp_proxy_receive_before_connect:

Early reception and delayed upstream connection establishment
-------------------------------------------------------------

``TcpProxy`` filter normally disables reading on the downstream connection until the upstream connection has been established. In some situations earlier filters in the filter chain may need to read data from the downstream connection before allowing the upstream connection to be established. This can be done by setting the ``StreamInfo`` filter state object for the key ``envoy.tcp_proxy.receive_before_connect``. Setting this dynamic metadata must happen in ``initializeReadFilterCallbacks()`` callback of the network filter so that is done before ``TcpProxy`` filter is initialized.

Network filters can also pass data upto the ``TcpProxy`` filter before the upstream connection has been established, as ``TcpProxy`` filter now buffers data it receives before the upstream connection has been established to be sent when the upstream connection is established. Filters can also delay the upstream connection setup by returning ``StopIteration`` from their ``onNewConnection`` and ``onData`` callbacks.

.. note::

``TcpProxy`` filter does not limit the size of the pre-connection data buffer. Filters using the
``envoy.tcp_proxy.receive_before_connect`` option must take care to not pass unlimited amount to
data to the TcpProxy before the upstream connection has been set up.

.. _config_network_filters_tcp_proxy_tunneling_over_http:

Tunneling TCP over HTTP
Expand Down
1 change: 1 addition & 0 deletions source/common/tcp_proxy/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ envoy_cc_library(
"//envoy/stats:stats_interface",
"//envoy/stats:stats_macros",
"//envoy/stats:timespan_interface",
"//envoy/stream_info:bool_accessor_interface",
"//envoy/stream_info:filter_state_interface",
"//envoy/tcp:conn_pool_interface",
"//envoy/tcp:upstream_interface",
Expand Down
34 changes: 30 additions & 4 deletions source/common/tcp_proxy/tcp_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "envoy/extensions/filters/network/tcp_proxy/v3/tcp_proxy.pb.h"
#include "envoy/extensions/filters/network/tcp_proxy/v3/tcp_proxy.pb.validate.h"
#include "envoy/stats/scope.h"
#include "envoy/stream_info/bool_accessor.h"
#include "envoy/upstream/cluster_manager.h"
#include "envoy/upstream/upstream.h"

Expand Down Expand Up @@ -244,7 +245,17 @@ void Filter::initialize(Network::ReadFilterCallbacks& callbacks, bool set_connec
// Need to disable reads so that we don't write to an upstream that might fail
// in onData(). This will get re-enabled when the upstream connection is
// established.
read_callbacks_->connection().readDisable(true);
auto receive_before_connect =
read_callbacks_->connection()
.streamInfo()
.filterState()
->getDataReadOnly<StreamInfo::BoolAccessor>(ReceiveBeforeConnectKey);
if (receive_before_connect && receive_before_connect->value()) {
receive_before_connect_ = true;
} else {
read_callbacks_->connection().readDisable(true);
}

getStreamInfo().setDownstreamBytesMeter(std::make_shared<StreamInfo::BytesMeter>());
getStreamInfo().setUpstreamInfo(std::make_shared<StreamInfo::UpstreamInfoImpl>());

Expand Down Expand Up @@ -462,8 +473,11 @@ Network::FilterStatus Filter::establishUpstreamConnection() {
// cluster->trafficStats()->upstream_cx_none_healthy in the latter case.
getStreamInfo().setResponseFlag(StreamInfo::ResponseFlag::NoHealthyUpstream);
onInitFailure(UpstreamFailureReason::NoHealthyUpstream);
return Network::FilterStatus::StopIteration;
}
return Network::FilterStatus::StopIteration;
// Allow OnData() to receive data before connect if so configured
return receive_before_connect_ ? Network::FilterStatus::Continue
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case of receive_before_connect_, now that earlier filters have seen whatever data they need and allowed the filter chain to continue, should tcp_proxy readDisable(true) to prevent additional data from getting buffered while the upstream connection is established?

If not, then I think there needs to be some limit to the data enforced. One option is to leave it in the Connection buffer so that flow control can be triggered.

: Network::FilterStatus::StopIteration;
}

void Filter::onClusterDiscoveryCompletion(Upstream::ClusterDiscoveryStatus cluster_status) {
Expand Down Expand Up @@ -671,12 +685,18 @@ Network::FilterStatus Filter::onData(Buffer::Instance& data, bool end_stream) {
if (upstream_) {
getStreamInfo().getUpstreamBytesMeter()->addWireBytesSent(data.length());
upstream_->encodeData(data, end_stream);
resetIdleTimer(); // TODO(ggreenway) PERF: do we need to reset timer on both send and receive?
} else if (receive_before_connect_) {
// Buffer data received before upstream connection exists
early_data_buffer_.move(data);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be a limit on how much data can be buffered here? Or do you depend on the earlier network filter to readDisable() if there's too much data?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did not consider this, so maybe the latter? Of should I change the boolean metadata to be an integral maximum buffer size instead?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could probably use one of the existing connection buffer size settings for the value. But if you want the other filter to specify how much data to buffer, that's fine too.

if (!early_data_end_stream_) {
early_data_end_stream_ = end_stream;
}
}
// The upstream should consume all of the data.
// Before there is an upstream the connection should be readDisabled. If the upstream is
// destroyed, there should be no further reads as well.
ASSERT(0 == data.length());
resetIdleTimer(); // TODO(ggreenway) PERF: do we need to reset timer on both send and receive?
return Network::FilterStatus::StopIteration;
}

Expand Down Expand Up @@ -791,7 +811,13 @@ void Filter::onUpstreamConnection() {
connecting_ = false;
// Re-enable downstream reads now that the upstream connection is established
// so we have a place to send downstream data to.
read_callbacks_->connection().readDisable(false);
if (!receive_before_connect_) {
read_callbacks_->connection().readDisable(false);
} else if (early_data_buffer_.length() > 0) {
getStreamInfo().getUpstreamBytesMeter()->addWireBytesSent(early_data_buffer_.length());
upstream_->encodeData(early_data_buffer_, early_data_end_stream_);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a test that causes a high watermark on the upstream here, to make sure that any callbacks that re-renter this class have the correct state?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you point to a test that is close enough to use for inspiration to what you mean here?

ASSERT(0 == early_data_buffer_.length());
}

read_callbacks_->upstreamHost()->outlierDetector().putResult(
Upstream::Outlier::Result::LocalOriginConnectSuccessFinal);
Expand Down
6 changes: 6 additions & 0 deletions source/common/tcp_proxy/tcp_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "envoy/upstream/cluster_manager.h"
#include "envoy/upstream/upstream.h"

#include "source/common/buffer/buffer_impl.h"
#include "source/common/common/logger.h"
#include "source/common/formatter/substitution_format_string.h"
#include "source/common/http/header_map_impl.h"
Expand All @@ -38,6 +39,8 @@
namespace Envoy {
namespace TcpProxy {

constexpr absl::string_view ReceiveBeforeConnectKey = "envoy.tcp_proxy.receive_before_connect";

/**
* All tcp proxy stats. @see stats_macros.h
*/
Expand Down Expand Up @@ -540,6 +543,9 @@ class Filter : public Network::ReadFilter,
uint32_t connect_attempts_{};
bool connecting_{};
bool downstream_closed_{};
bool receive_before_connect_{false};
bool early_data_end_stream_{false};
Buffer::OwnedImpl early_data_buffer_{};
};

// This class deals with an upstream connection that needs to finish flushing, when the downstream
Expand Down
1 change: 1 addition & 0 deletions test/integration/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -1661,6 +1661,7 @@ envoy_cc_test(
"//source/common/event:dispatcher_includes",
"//source/common/event:dispatcher_lib",
"//source/common/network:utility_lib",
"//source/common/stream_info:bool_accessor_lib",
"//source/extensions/access_loggers/file:config",
"//source/extensions/filters/network/common:factory_base_lib",
"//source/extensions/filters/network/tcp_proxy:config",
Expand Down
67 changes: 45 additions & 22 deletions test/integration/tcp_proxy_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

#include "source/common/config/api_version.h"
#include "source/common/network/utility.h"
#include "source/common/stream_info/bool_accessor_impl.h"
#include "source/common/tcp_proxy/tcp_proxy.h"
#include "source/extensions/filters/network/common/factory_base.h"
#include "source/extensions/transport_sockets/tls/context_manager_impl.h"

Expand Down Expand Up @@ -1259,18 +1261,27 @@ class InjectDynamicMetadata : public Network::ReadFilter {
Network::FilterStatus onData(Buffer::Instance& data, bool) override {
if (!metadata_set_) {
// To allow testing a write that returns `StopIteration`, only proceed
// when more than 1 byte is received.
if (data.length() < 2) {
ASSERT(data.length() == 1);

// Echo data back to test can verify it was received.
Buffer::OwnedImpl copy(data);
read_callbacks_->connection().write(copy, false);
// when more than 1 word is received.

// locate the first space in data
char space = ' ';
ssize_t index = data.search(&space, sizeof(space), 0);
if (index < 0) {
// When returning StopIteration the received data remains in the buffer
// so that we can get to it later when enough data has been received.
return Network::FilterStatus::StopIteration;
}

void* p = data.linearize(index);
std::string first_word(static_cast<char*>(p), index);

// Echo first word back so tests can verify it was received
Buffer::OwnedImpl copy(first_word);
read_callbacks_->connection().write(copy, false);

// Use the first word as dynamic metadata value
ProtobufWkt::Value val;
val.set_string_value(data.toString());
val.set_string_value(first_word);

ProtobufWkt::Struct& map =
(*read_callbacks_->connection()
Expand All @@ -1279,24 +1290,25 @@ class InjectDynamicMetadata : public Network::ReadFilter {
.mutable_filter_metadata())[Envoy::Config::MetadataFilters::get().ENVOY_LB];
(*map.mutable_fields())[key_] = val;

// Put this back in the state that TcpProxy expects.
read_callbacks_->connection().readDisable(true);

metadata_set_ = true;
}
return Network::FilterStatus::Continue;
}

Network::FilterStatus onNewConnection() override {
// TcpProxy disables read; must re-enable so we can read headers.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we can read-enable here, seems this PR will only buy some time back between the initialization and onNewConnection?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR does two things:

  1. asks the (TCP) proxy to not read-disable in the first place, so that other filters do not need to know the exact spots where to read-enable and disable again. This enables filters receiving data before and while the upstream connection setup takes place.

  2. adds early data buffering to the tcp_proxy filter, so that filters can write (early) data after tcp_proxy has started connect() system call, but the connection has not yet been established. As of now tcp_proxy will assert-fail if it receives data before the connection has been established. This allows filters that read data enabled by point (1) to pass it on without buffering and waiting for connection setup to complete.

read_callbacks_->connection().readDisable(false);

// Stop until we read the value and can set the metadata for TcpProxy.
// TcpProxy proceeds with upstream connection once onData() returns FilterStatus::Continue.
return Network::FilterStatus::StopIteration;
}

void initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) override {
read_callbacks_ = &callbacks;

// Tell TcpProxy to not disable read so that we can read headers
read_callbacks_->connection().streamInfo().filterState()->setData(
TcpProxy::ReceiveBeforeConnectKey, std::make_unique<StreamInfo::BoolAccessorImpl>(true),
StreamInfo::FilterState::StateType::ReadOnly,
StreamInfo::FilterState::LifeSpan::Connection);
}

const std::string key_;
Expand Down Expand Up @@ -1354,14 +1366,25 @@ TEST_P(TcpProxyDynamicMetadataMatchIntegrationTest, DynamicMetadataMatch) {
initialize();

expectEndpointToMatchRoute([](IntegrationTcpClient& tcp_client) -> std::string {
// Break the write into two; validate that the first is received before sending the second. This
// validates that a downstream filter can use this functionality, even if it can't make a
// decision after the first `onData()`.
EXPECT_TRUE(tcp_client.write("p", false));
tcp_client.waitForData("p");
// Break the write into multiple chunks; validate that the first word is received before sending
// the rest. This validates that a downstream filter can use this functionality, even if it
// can't make a decision after the first `onData()`.
EXPECT_TRUE(tcp_client.write("pri", false));
// Writing additional data in multiple chunks to show that buffering of early data in tcp_buffer
// works properly
EXPECT_TRUE(tcp_client.write("mary is ", false));
Comment on lines 1372 to 1375
Copy link
Copy Markdown
Member

@botengyao botengyao Mar 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the early data buffer you create in Tcp proxy, right? It is a little bit confusing for the comment, like we stop the filter chain iteration at InjectDynamicMetadata::onData() when just receives pri. Tcp proxy cannot get the onData call before the InjectDynamicMetadata continues the filter chain.

And writing a new set of integration tests maybe more clear here to make sure the original behavior is also tested. We should also add some situations that trigger early_data_buffer_ when connection is not created, but seems not easy to test.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jrajahalme please address this.

EXPECT_TRUE(tcp_client.write("selected ", false));
EXPECT_TRUE(tcp_client.write("before ", false));
// check that the 1st word is returned
tcp_client.waitForData("primary");
tcp_client.clearData();
EXPECT_TRUE(tcp_client.write("rimary", false));
return "primary";
// some more data, most likely the upstream connection has already been established as we waited
// for the return data above.
EXPECT_TRUE(tcp_client.write("upstream connection ", false));
EXPECT_TRUE(tcp_client.write("exists", false));

// All data expected at the destination
return "primary is selected before upstream connection exists";
});
}

Expand All @@ -1378,7 +1401,7 @@ TEST_P(TcpProxyDynamicMetadataMatchIntegrationTest, DynamicMetadataNonMatch) {

initialize();

expectEndpointNotToMatchRoute("does_not_match_role_primary");
expectEndpointNotToMatchRoute("does not match role primary");
}

INSTANTIATE_TEST_SUITE_P(TcpProxyIntegrationTestParams, TcpProxySslIntegrationTest,
Expand Down