Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
1d14813
initial commit for discussion
Jun 1, 2022
9a93e04
Merge branch 'main' of https://github.com/envoyproxy/envoy into dev-p…
Jun 8, 2022
7e9af36
add filter name and custom name by the helper
Jun 9, 2022
f7bf240
make the passthrough filter inherit correctly
Jun 9, 2022
aed108a
fix build of async client
Jun 9, 2022
a5b8da3
minor fix
Jun 10, 2022
7064a02
fix build error
Jun 10, 2022
834002f
add test to ensure the name can be set correctly and the new method c…
Jun 10, 2022
99bd886
fix config discovery impl test
Jun 10, 2022
ab1dfb5
fix clang tidy
Jun 10, 2022
18000df
fix warning of virtual-move-assign
Jun 12, 2022
14def65
add more explicit move assignment
Jun 12, 2022
eff8e67
fix clang tidy
Jun 13, 2022
2d23437
Merge branch 'main' of https://github.com/envoyproxy/envoy into dev-p…
Jun 13, 2022
fc2d687
fix clang-tidy
Jun 13, 2022
76fb97a
update passthroughfilter
Jun 14, 2022
af2b15a
add new interface for filter manager
Jun 19, 2022
ac8adf2
new implementation and clear old code
Jun 20, 2022
d831041
revert dirty changes
Jun 20, 2022
d19f087
minor update
Jun 21, 2022
95a9870
fix the unexpected removing of new interface
Jun 21, 2022
0e90132
fix unused helper in the HCM
Jun 21, 2022
d3ce0d9
use struct of named http factory cb
Jun 21, 2022
68df359
fix log
Jun 21, 2022
05919dc
avoid repeated searching if filter name is same as custom name
Jun 21, 2022
58ad9dc
fix admin
Jun 21, 2022
c6cfaf7
add more comments and rename the filed to make it more accurate
Jun 22, 2022
0239eff
minor update of comment
Jun 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 52 additions & 6 deletions envoy/http/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,21 @@ class StreamFilterCallbacks {
* Called when filter activity indicates that the stream idle timeout should be reset.
*/
virtual void resetIdleTimer() PURE;

/**
* This is a helper to get the route's per-filter config if it exists, otherwise the virtual
* host's. Or nullptr if none of them exist.
*/
virtual const Router::RouteSpecificFilterConfig* mostSpecificPerFilterConfig() const PURE;

/**
* Fold all the available per route filter configs, invoking the callback with each config (if
* it is present). Iteration of the configs is in order of specificity. That means that the
* callback will be called first for a config on a Virtual host, then a route, and finally a route
* entry (weighted cluster). If a config is not present, the callback will not be invoked.
*/
virtual void traversePerFilterConfig(
std::function<void(const Router::RouteSpecificFilterConfig&)> cb) const PURE;
};

/**
Expand Down Expand Up @@ -1063,6 +1078,37 @@ class FilterChainFactoryCallbacks {
*/
using FilterFactoryCb = std::function<void(FilterChainFactoryCallbacks& callbacks)>;

/**
* Simple struct of additional contextual information of HTTP filter, e.g. filter config name
* from configuration, canonical filter name, etc.
*/
struct FilterContext {
// The name of the filter configuration that used to create related filter factory function.
// This could be any legitimate non-empty string.
std::string config_name;
// Filter extension qualified name. This is used as a fallback of `config_name`. E.g.,
// "envoy.filters.http.buffer" for the HTTP buffer filter.
std::string filter_name;
};

/**
* The filter chain manager is provided by the connection manager to the filter chain factory.
* The filter chain factory will post the filter factory context and filter factory to the
* filter chain manager to create filter and construct HTTP stream filter chain.
*/
class FilterChainManager {
public:
virtual ~FilterChainManager() = default;

/**
* Post filter factory context and filter factory to the filter chain manager. The filter
* chain manager will create filter instance based on the context and factory internally.
* @param context supplies additional contextual information of filter factory.
* @param factory factory function used to create filter instances.
*/
virtual void applyFilterFactoryCb(FilterContext context, FilterFactoryCb& factory) PURE;
};

/**
* A FilterChainFactory is used by a connection manager to create an HTTP level filter chain when a
* new stream is created on the connection (either locally or remotely). Typically it would be
Expand All @@ -1075,24 +1121,24 @@ class FilterChainFactory {

/**
* Called when a new HTTP stream is created on the connection.
* @param callbacks supplies the "sink" that is used for actually creating the filter chain. @see
* FilterChainFactoryCallbacks.
* @param manager supplies the "sink" that is used for actually creating the filter chain. @see
* FilterChainManager.
*/
virtual void createFilterChain(FilterChainFactoryCallbacks& callbacks) PURE;
virtual void createFilterChain(FilterChainManager& manager) PURE;

/**
* Called when a new upgrade stream is created on the connection.
* @param upgrade supplies the upgrade header from downstream
* @param per_route_upgrade_map supplies the upgrade map, if any, for this route.
* @param callbacks supplies the "sink" that is used for actually creating the filter chain. @see
* FilterChainFactoryCallbacks.
* @param manager supplies the "sink" that is used for actually creating the filter chain. @see
* FilterChainManager.
* @return true if upgrades of this type are allowed and the filter chain has been created.
* returns false if this upgrade type is not configured, and no filter chain is created.
*/
using UpgradeMap = std::map<std::string, bool>;
virtual bool createUpgradeFilterChain(absl::string_view upgrade,
const UpgradeMap* per_route_upgrade_map,
FilterChainFactoryCallbacks& callbacks) PURE;
FilterChainManager& manager) PURE;
};

} // namespace Http
Expand Down
18 changes: 14 additions & 4 deletions source/common/filter/config_discovery_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,17 @@ class DynamicFilterConfigProviderImpl : public DynamicFilterConfigProviderImplBa
ThreadLocal::TypedSlot<ThreadLocalConfig> tls_;
};

// Struct of canonical filter name and HTTP stream filter factory callback.
struct NamedHttpFilterFactoryCb {
// Canonical filter name.
std::string name;
Copy link
Copy Markdown
Member

@soulxu soulxu Jun 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to put this name to the FilterConfigProvider also?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to merge this in some way. I'm not familiar with this code but it seems like there is a bunch of duplication with the FilterFactoryContext also? Can this be shared somehow?

Copy link
Copy Markdown
Member Author

@wbpcode wbpcode Jun 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have just tried to add a method to expose this name in the FilterConfigProvider. But it just make the code more complex and no better than current implementation.

Because it's supported to used different type filters by the ECDS, so the canonical filter name of current provider may be changed when the filter config is updateing by the ECDS. We must make the canonical filter name thread local and update it just as we update factory function in the FilterConfigProvider.
So it's hard to used FilterFactoryContext to store these name in the FilterConfigProvider.

cc @mattklein123 @soulxu

Copy link
Copy Markdown
Member Author

@wbpcode wbpcode Jun 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

message ExtensionConfigSource {
  ConfigSource config_source = 1 [(validate.rules).any = {required: true}];

  // Optional default configuration to use as the initial configuration if
  // there is a failure to receive the initial extension configuration or if
  // `apply_default_config_without_warming` flag is set.
  google.protobuf.Any default_config = 2;

  // Use the default config as the initial configuration without warming and
  // waiting for the first discovery response. Requires the default configuration
  // to be supplied.
  bool apply_default_config_without_warming = 3;
  
  // **Here the repeated type_urls make the filter type can be changed by the ECDS. A ratelimit filter can be** 
  // **updated to ext_authz.**
  // A set of permitted extension type URLs. Extension configuration updates are rejected
  // if they do not match any type URL in the set.
  repeated string type_urls = 4 [(validate.rules).repeated = {min_items: 1}];
}

IMO, it would be better to add some restrictions here. For example, only multiple version APIs of same type filter can be used here.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, it would be better to add some restrictions here. For example, only multiple version APIs of same type filter can be used here.

I guess even though there can be multiple extension type URLs, they should pointed to the same factory, or they should point to the same canonical filter name?

for (const auto& type_url : config_discovery.type_urls()) {
auto factory_type_url = TypeUtil::typeUrlToDescriptorFullName(type_url);
auto* factory = Registry::FactoryRegistry<
Server::Configuration::NamedHttpFilterConfigFactory>::getFactoryByType(factory_type_url);
if (factory == nullptr) {
throw EnvoyException(
fmt::format("Error: no factory found for a required type URL {}.", factory_type_url));
}
}

Then you can get factory->name(), and pass that to the dynamic filter config provider.

auto filter_config_provider = filter_config_provider_manager_.createDynamicFilterConfigProvider(
config_discovery, name, context_, stats_prefix_, last_filter_in_current_config,
filter_chain_type, nullptr);

@adisuissa probably knows the usecase of multiple extension type urls.

Copy link
Copy Markdown
Member Author

@wbpcode wbpcode Jun 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess even though there can be multiple extension type URLs, they should pointed to the same factory, or they should point to the same canonical filter name?

It seems that there is no any restriction for now. It would be great if we can add this restriction. But it would be a break change.
So, I think we can keep this PR's implemantation and after this restriction is shipped someday, we can update the implemantation with a new minor patch.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, it seems like this is the best we have now. Thanks!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kyessenov Can probably shed some light on multiple extensions type-urls

// Factory function used to create filter instances.
Http::FilterFactoryCb factory_cb;
};

// Implementation of a HTTP dynamic filter config provider.
class HttpDynamicFilterConfigProviderImpl
: public DynamicFilterConfigProviderImpl<Http::FilterFactoryCb> {
: public DynamicFilterConfigProviderImpl<NamedHttpFilterFactoryCb> {
public:
HttpDynamicFilterConfigProviderImpl(
FilterConfigSubscriptionSharedPtr& subscription,
Expand All @@ -190,10 +198,12 @@ class HttpDynamicFilterConfigProviderImpl
}

private:
Http::FilterFactoryCb instantiateFilterFactory(const Protobuf::Message& message) const override {
NamedHttpFilterFactoryCb
instantiateFilterFactory(const Protobuf::Message& message) const override {
auto* factory = Registry::FactoryRegistry<Server::Configuration::NamedHttpFilterConfigFactory>::
getFactoryByType(message.GetTypeName());
return factory->createFilterFactoryFromProto(message, getStatPrefix(), factory_context_);
return {factory->name(),
factory->createFilterFactoryFromProto(message, getStatPrefix(), factory_context_)};
}

Server::Configuration::FactoryContext& factory_context_;
Expand Down Expand Up @@ -495,7 +505,7 @@ class FilterConfigProviderManagerImpl : public FilterConfigProviderManagerImplBa
// HTTP filter
class HttpFilterConfigProviderManagerImpl
: public FilterConfigProviderManagerImpl<
Server::Configuration::NamedHttpFilterConfigFactory, Http::FilterFactoryCb,
Server::Configuration::NamedHttpFilterConfigFactory, NamedHttpFilterFactoryCb,
Server::Configuration::FactoryContext, HttpDynamicFilterConfigProviderImpl> {
public:
absl::string_view statPrefix() const override { return "http_filter."; }
Expand Down
5 changes: 5 additions & 0 deletions source/common/http/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,11 @@ class AsyncStreamImpl : public AsyncClient::Stream,
}
void addUpstreamSocketOptions(const Network::Socket::OptionsSharedPtr&) override {}
Network::Socket::OptionsSharedPtr getUpstreamSocketOptions() const override { return {}; }
const Router::RouteSpecificFilterConfig* mostSpecificPerFilterConfig() const override {
return nullptr;
}
void traversePerFilterConfig(
std::function<void(const Router::RouteSpecificFilterConfig&)>) const override {}
void requestRouteConfigUpdate(Http::RouteConfigUpdatedCallbackSharedPtr) override {}
void resetIdleTimer() override {}
void setUpstreamOverrideHost(absl::string_view) override {}
Expand Down
104 changes: 55 additions & 49 deletions source/common/http/filter_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,43 @@ void ActiveStreamFilterBase::resetIdleTimer() {
parent_.filter_manager_callbacks_.resetIdleTimer();
}

const Router::RouteSpecificFilterConfig*
ActiveStreamFilterBase::mostSpecificPerFilterConfig() const {
auto route = parent_.filter_manager_callbacks_.route(nullptr);
if (route == nullptr) {
return nullptr;
}

auto* result = route->mostSpecificPerFilterConfig(filter_context_.config_name);

if (result == nullptr && filter_context_.filter_name != filter_context_.config_name) {
// Fallback to use filter name.
result = route->mostSpecificPerFilterConfig(filter_context_.filter_name);
}
return result;
}

void ActiveStreamFilterBase::traversePerFilterConfig(
std::function<void(const Router::RouteSpecificFilterConfig&)> cb) const {
auto route = parent_.filter_manager_callbacks_.route(nullptr);
if (route == nullptr) {
return;
}

bool handled = false;
route->traversePerFilterConfig(filter_context_.config_name,
[&handled, &cb](const Router::RouteSpecificFilterConfig& config) {
handled = true;
cb(config);
});

if (handled || filter_context_.filter_name == filter_context_.config_name) {
return;
}

route->traversePerFilterConfig(filter_context_.filter_name, cb);
}

bool ActiveStreamDecoderFilter::canContinue() {
// It is possible for the connection manager to respond directly to a request even while
// a filter is trying to continue. If a response has already happened, we should not
Expand Down Expand Up @@ -442,40 +479,9 @@ void ActiveStreamDecoderFilter::requestDataTooLarge() {
}
}

void FilterManager::addStreamDecoderFilterWorker(StreamDecoderFilterSharedPtr filter,
bool dual_filter) {
ActiveStreamDecoderFilterPtr wrapper(new ActiveStreamDecoderFilter(*this, filter, dual_filter));

filter->setDecoderFilterCallbacks(*wrapper);
// Note: configured decoder filters are appended to decoder_filters_.
// This means that if filters are configured in the following order (assume all three filters are
// both decoder/encoder filters):
// http_filters:
// - A
// - B
// - C
// The decoder filter chain will iterate through filters A, B, C.
LinkedList::moveIntoListBack(std::move(wrapper), decoder_filters_);
}

void FilterManager::addStreamEncoderFilterWorker(StreamEncoderFilterSharedPtr filter,
bool dual_filter) {
ActiveStreamEncoderFilterPtr wrapper(new ActiveStreamEncoderFilter(*this, filter, dual_filter));

filter->setEncoderFilterCallbacks(*wrapper);
// Note: configured encoder filters are prepended to encoder_filters_.
// This means that if filters are configured in the following order (assume all three filters are
// both decoder/encoder filters):
// http_filters:
// - A
// - B
// - C
// The encoder filter chain will iterate through filters C, B, A.
LinkedList::moveIntoList(std::move(wrapper), encoder_filters_);
}

void FilterManager::addAccessLogHandler(AccessLog::InstanceSharedPtr handler) {
access_log_handlers_.push_back(handler);
void FilterManager::applyFilterFactoryCb(FilterContext context, FilterFactoryCb& factory) {
FilterChainFactoryCallbacksImpl callbacks(*this, context);
factory(callbacks);
}

void FilterManager::maybeContinueDecoding(
Expand Down Expand Up @@ -506,7 +512,7 @@ void FilterManager::decodeHeaders(ActiveStreamDecoderFilter* filter, RequestHead
if (state_.decoder_filter_chain_aborted_) {
ENVOY_STREAM_LOG(trace,
"decodeHeaders filter iteration aborted due to local reply: filter={}",
*this, static_cast<const void*>((*entry).get()));
*this, (*entry)->filter_context_.config_name);
status = FilterHeadersStatus::StopIteration;
}

Expand All @@ -516,7 +522,7 @@ void FilterManager::decodeHeaders(ActiveStreamDecoderFilter* filter, RequestHead

state_.filter_call_state_ &= ~FilterCallState::DecodeHeaders;
ENVOY_STREAM_LOG(trace, "decode headers called: filter={} status={}", *this,
static_cast<const void*>((*entry).get()), static_cast<uint64_t>(status));
(*entry)->filter_context_.config_name, static_cast<uint64_t>(status));

(*entry)->decode_headers_called_ = true;

Expand Down Expand Up @@ -648,10 +654,10 @@ void FilterManager::decodeData(ActiveStreamDecoderFilter* filter, Buffer::Instan
state_.filter_call_state_ &= ~FilterCallState::LastDataFrame;
}
ENVOY_STREAM_LOG(trace, "decode data called: filter={} status={}", *this,
static_cast<const void*>((*entry).get()), static_cast<uint64_t>(status));
(*entry)->filter_context_.config_name, static_cast<uint64_t>(status));
if (state_.decoder_filter_chain_aborted_) {
ENVOY_STREAM_LOG(trace, "decodeData filter iteration aborted due to local reply: filter={}",
*this, static_cast<const void*>((*entry).get()));
*this, (*entry)->filter_context_.config_name);
return;
}

Expand Down Expand Up @@ -737,11 +743,11 @@ void FilterManager::decodeTrailers(ActiveStreamDecoderFilter* filter, RequestTra
(*entry)->end_stream_ = true;
state_.filter_call_state_ &= ~FilterCallState::DecodeTrailers;
ENVOY_STREAM_LOG(trace, "decode trailers called: filter={} status={}", *this,
static_cast<const void*>((*entry).get()), static_cast<uint64_t>(status));
(*entry)->filter_context_.config_name, static_cast<uint64_t>(status));
if (state_.decoder_filter_chain_aborted_) {
ENVOY_STREAM_LOG(trace,
"decodeTrailers filter iteration aborted due to local reply: filter={}",
*this, static_cast<const void*>((*entry).get()));
*this, (*entry)->filter_context_.config_name);
status = FilterTrailersStatus::StopIteration;
}

Expand Down Expand Up @@ -772,15 +778,15 @@ void FilterManager::decodeMetadata(ActiveStreamDecoderFilter* filter, MetadataMa

FilterMetadataStatus status = (*entry)->handle_->decodeMetadata(metadata_map);
ENVOY_STREAM_LOG(trace, "decode metadata called: filter={} status={}, metadata: {}", *this,
static_cast<const void*>((*entry).get()), static_cast<uint64_t>(status),
(*entry)->filter_context_.config_name, static_cast<uint64_t>(status),
metadata_map);
}
}

void FilterManager::maybeEndDecode(bool end_stream) {
// If recreateStream is called, the HCM rewinds state and may send more encodeData calls.
if (end_stream && !remoteDecodeComplete()) {
stream_info_.downstreamTiming().onLastDownstreamRxByteReceived(dispatcher().timeSource());
stream_info_.downstreamTiming().onLastDownstreamRxByteReceived(dispatcher_.timeSource());
ENVOY_STREAM_LOG(debug, "request end stream", *this);
}
}
Expand Down Expand Up @@ -999,7 +1005,7 @@ void FilterManager::encode1xxHeaders(ActiveStreamEncoderFilter* filter,
FilterHeadersStatus status = (*entry)->handle_->encode1xxHeaders(headers);
state_.filter_call_state_ &= ~FilterCallState::Encode1xxHeaders;
ENVOY_STREAM_LOG(trace, "encode 1xx continue headers called: filter={} status={}", *this,
static_cast<const void*>((*entry).get()), static_cast<uint64_t>(status));
(*entry)->filter_context_.config_name, static_cast<uint64_t>(status));
if (!(*entry)->commonHandleAfter1xxHeadersCallback(status)) {
return;
}
Expand Down Expand Up @@ -1042,7 +1048,7 @@ void FilterManager::encodeHeaders(ActiveStreamEncoderFilter* filter, ResponseHea
if (state_.encoder_filter_chain_aborted_) {
ENVOY_STREAM_LOG(trace,
"encodeHeaders filter iteration aborted due to local reply: filter={}",
*this, static_cast<const void*>((*entry).get()));
*this, (*entry)->filter_context_.config_name);
status = FilterHeadersStatus::StopIteration;
}

Expand All @@ -1052,7 +1058,7 @@ void FilterManager::encodeHeaders(ActiveStreamEncoderFilter* filter, ResponseHea

state_.filter_call_state_ &= ~FilterCallState::EncodeHeaders;
ENVOY_STREAM_LOG(trace, "encode headers called: filter={} status={}", *this,
static_cast<const void*>((*entry).get()), static_cast<uint64_t>(status));
(*entry)->filter_context_.config_name, static_cast<uint64_t>(status));

(*entry)->encode_headers_called_ = true;

Expand Down Expand Up @@ -1124,7 +1130,7 @@ void FilterManager::encodeMetadata(ActiveStreamEncoderFilter* filter,

FilterMetadataStatus status = (*entry)->handle_->encodeMetadata(*metadata_map_ptr);
ENVOY_STREAM_LOG(trace, "encode metadata called: filter={} status={}", *this,
static_cast<const void*>((*entry).get()), static_cast<uint64_t>(status));
(*entry)->filter_context_.config_name, static_cast<uint64_t>(status));
}
// TODO(soya3129): update stats with metadata.

Expand Down Expand Up @@ -1206,7 +1212,7 @@ void FilterManager::encodeData(ActiveStreamEncoderFilter* filter, Buffer::Instan
FilterDataStatus status = (*entry)->handle_->encodeData(data, (*entry)->end_stream_);
if (state_.encoder_filter_chain_aborted_) {
ENVOY_STREAM_LOG(trace, "encodeData filter iteration aborted due to local reply: filter={}",
*this, static_cast<const void*>((*entry).get()));
*this, (*entry)->filter_context_.config_name);
status = FilterDataStatus::StopIterationNoBuffer;
}
if ((*entry)->end_stream_) {
Expand All @@ -1217,7 +1223,7 @@ void FilterManager::encodeData(ActiveStreamEncoderFilter* filter, Buffer::Instan
state_.filter_call_state_ &= ~FilterCallState::LastDataFrame;
}
ENVOY_STREAM_LOG(trace, "encode data called: filter={} status={}", *this,
static_cast<const void*>((*entry).get()), static_cast<uint64_t>(status));
(*entry)->filter_context_.config_name, static_cast<uint64_t>(status));

if (!trailers_exists_at_start && filter_manager_callbacks_.responseTrailers() &&
trailers_added_entry == encoder_filters_.end()) {
Expand Down Expand Up @@ -1262,7 +1268,7 @@ void FilterManager::encodeTrailers(ActiveStreamEncoderFilter* filter,
(*entry)->end_stream_ = true;
state_.filter_call_state_ &= ~FilterCallState::EncodeTrailers;
ENVOY_STREAM_LOG(trace, "encode trailers called: filter={} status={}", *this,
static_cast<const void*>((*entry).get()), static_cast<uint64_t>(status));
(*entry)->filter_context_.config_name, static_cast<uint64_t>(status));
if (!(*entry)->commonHandleAfterTrailersCallback(status)) {
return;
}
Expand Down
Loading