Skip to content

Commit 94392a4

Browse files
authored
Remove duplicate filter instances in Broker, Proxy and Function worker web server (apache#15637)
- There were filter instances for each context path which made maxConcurrentHttpRequests and httpRequestsMaxPerSecond not work as expected. - Fixes the backpressure solution that is dependent on maxConcurrentHttpRequests and httpRequestsMaxPerSecond working properly - Fix invalid Jersey api usage
1 parent 38e5bba commit 94392a4

File tree

9 files changed

+216
-182
lines changed

9 files changed

+216
-182
lines changed

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/Filters.java

Lines changed: 0 additions & 60 deletions
This file was deleted.

pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -91,13 +91,15 @@
9191
import org.apache.pulsar.broker.loadbalance.LoadResourceQuotaUpdaterTask;
9292
import org.apache.pulsar.broker.loadbalance.LoadSheddingTask;
9393
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
94+
import org.apache.pulsar.broker.lookup.v1.TopicLookup;
9495
import org.apache.pulsar.broker.namespace.NamespaceService;
9596
import org.apache.pulsar.broker.protocol.ProtocolHandlers;
9697
import org.apache.pulsar.broker.resourcegroup.ResourceGroupService;
9798
import org.apache.pulsar.broker.resourcegroup.ResourceUsageTopicTransportManager;
9899
import org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager;
99100
import org.apache.pulsar.broker.resources.ClusterResources;
100101
import org.apache.pulsar.broker.resources.PulsarResources;
102+
import org.apache.pulsar.broker.rest.Topics;
101103
import org.apache.pulsar.broker.service.BrokerService;
102104
import org.apache.pulsar.broker.service.GracefulExecutorServicesShutdown;
103105
import org.apache.pulsar.broker.service.SystemTopicBaseTxnBufferSnapshotService;
@@ -882,21 +884,21 @@ private void addWebServerHandlers(WebService webService,
882884
// Ensure the VIP status is only visible when the broker is fully initialized
883885
return state == State.Started;
884886
});
887+
885888
// Add admin rest resources
886-
webService.addRestResources("/",
887-
VipStatus.class.getPackage().getName(), false, vipAttributeMap);
888-
webService.addRestResources("/",
889-
"org.apache.pulsar.broker.web", false, attributeMap);
889+
webService.addRestResource("/",
890+
false, vipAttributeMap, VipStatus.class);
890891
webService.addRestResources("/admin",
891-
"org.apache.pulsar.broker.admin.v1", true, attributeMap);
892+
true, attributeMap, "org.apache.pulsar.broker.admin.v1");
892893
webService.addRestResources("/admin/v2",
893-
"org.apache.pulsar.broker.admin.v2", true, attributeMap);
894+
true, attributeMap, "org.apache.pulsar.broker.admin.v2");
894895
webService.addRestResources("/admin/v3",
895-
"org.apache.pulsar.broker.admin.v3", true, attributeMap);
896-
webService.addRestResources("/lookup",
897-
"org.apache.pulsar.broker.lookup", true, attributeMap);
898-
webService.addRestResources("/topics",
899-
"org.apache.pulsar.broker.rest", true, attributeMap);
896+
true, attributeMap, "org.apache.pulsar.broker.admin.v3");
897+
webService.addRestResource("/lookup",
898+
true, attributeMap, TopicLookup.class,
899+
org.apache.pulsar.broker.lookup.v2.TopicLookup.class);
900+
webService.addRestResource("/topics",
901+
true, attributeMap, Topics.class);
900902

901903
// Add metrics servlet
902904
webService.addServlet("/metrics",

pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java

Lines changed: 84 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,15 @@
1818
*/
1919
package org.apache.pulsar.broker.web;
2020

21-
import static org.apache.pulsar.broker.web.Filters.addFilter;
22-
import static org.apache.pulsar.broker.web.Filters.addFilterClass;
2321
import com.google.common.collect.Lists;
2422
import io.prometheus.client.CollectorRegistry;
2523
import io.prometheus.client.jetty.JettyStatisticsCollector;
2624
import java.util.ArrayList;
27-
import java.util.Collections;
25+
import java.util.EnumSet;
2826
import java.util.List;
2927
import java.util.Map;
3028
import java.util.Optional;
29+
import javax.servlet.DispatcherType;
3130
import org.apache.pulsar.broker.PulsarServerException;
3231
import org.apache.pulsar.broker.PulsarService;
3332
import org.apache.pulsar.broker.ServiceConfiguration;
@@ -43,6 +42,7 @@
4342
import org.eclipse.jetty.server.handler.RequestLogHandler;
4443
import org.eclipse.jetty.server.handler.ResourceHandler;
4544
import org.eclipse.jetty.server.handler.StatisticsHandler;
45+
import org.eclipse.jetty.servlet.FilterHolder;
4646
import org.eclipse.jetty.servlet.ServletContextHandler;
4747
import org.eclipse.jetty.servlet.ServletHolder;
4848
import org.eclipse.jetty.servlets.QoSFilter;
@@ -72,6 +72,7 @@ public class WebService implements AutoCloseable {
7272

7373
private final ServerConnector httpConnector;
7474
private final ServerConnector httpsConnector;
75+
private final FilterInitializer filterInitializer;
7576
private JettyStatisticsCollector jettyStatisticsCollector;
7677

7778
public WebService(PulsarService pulsar) throws PulsarServerException {
@@ -144,66 +145,106 @@ public WebService(PulsarService pulsar) throws PulsarServerException {
144145
// Limit number of concurrent HTTP connections to avoid getting out of file descriptors
145146
connectors.forEach(c -> c.setAcceptQueueSize(config.getHttpServerAcceptQueueSize()));
146147
server.setConnectors(connectors.toArray(new ServerConnector[connectors.size()]));
148+
149+
filterInitializer = new FilterInitializer(pulsar);
150+
}
151+
152+
public void addRestResources(String basePath, boolean requiresAuthentication, Map<String, Object> attributeMap,
153+
String... javaPackages) {
154+
ResourceConfig config = new ResourceConfig();
155+
for (String javaPackage : javaPackages) {
156+
config.packages(false, javaPackage);
157+
}
158+
addResourceServlet(basePath, requiresAuthentication, attributeMap, config);
147159
}
148160

149-
public void addRestResources(String basePath, String javaPackages, boolean requiresAuthentication,
150-
Map<String, Object> attributeMap) {
161+
public void addRestResource(String basePath, boolean requiresAuthentication, Map<String, Object> attributeMap,
162+
Class<?>... resourceClasses) {
151163
ResourceConfig config = new ResourceConfig();
152-
config.packages("jersey.config.server.provider.packages", javaPackages);
164+
for (Class<?> resourceClass : resourceClasses) {
165+
config.register(resourceClass);
166+
}
167+
addResourceServlet(basePath, requiresAuthentication, attributeMap, config);
168+
}
169+
170+
private void addResourceServlet(String basePath, boolean requiresAuthentication, Map<String, Object> attributeMap,
171+
ResourceConfig config) {
153172
config.register(JsonMapperProvider.class);
154173
config.register(MultiPartFeature.class);
155174
ServletHolder servletHolder = new ServletHolder(new ServletContainer(config));
156175
servletHolder.setAsyncSupported(true);
157176
addServlet(basePath, servletHolder, requiresAuthentication, attributeMap);
158177
}
159178

160-
public void addServlet(String path, ServletHolder servletHolder, boolean requiresAuthentication,
161-
Map<String, Object> attributeMap) {
162-
ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
163-
context.setContextPath(path);
164-
context.addServlet(servletHolder, MATCH_ALL);
165-
if (attributeMap != null) {
166-
attributeMap.forEach((key, value) -> {
167-
context.setAttribute(key, value);
168-
});
169-
}
179+
private static class FilterInitializer {
180+
private final List<FilterHolder> filterHolders = new ArrayList<>();
181+
private final FilterHolder authenticationFilterHolder;
182+
FilterInitializer(PulsarService pulsarService) {
183+
ServiceConfiguration config = pulsarService.getConfiguration();
184+
if (config.getMaxConcurrentHttpRequests() > 0) {
185+
FilterHolder filterHolder = new FilterHolder(QoSFilter.class);
186+
filterHolder.setInitParameter("maxRequests", String.valueOf(config.getMaxConcurrentHttpRequests()));
187+
filterHolders.add(filterHolder);
188+
}
170189

171-
ServiceConfiguration config = pulsar.getConfig();
190+
if (config.isHttpRequestsLimitEnabled()) {
191+
filterHolders.add(new FilterHolder(
192+
new RateLimitingFilter(config.getHttpRequestsMaxPerSecond())));
193+
}
172194

173-
if (config.getMaxConcurrentHttpRequests() > 0) {
174-
addFilterClass(context, QoSFilter.class, Collections.singletonMap("maxRequests",
175-
String.valueOf(config.getMaxConcurrentHttpRequests())));
176-
}
195+
if (!config.getBrokerInterceptors().isEmpty()
196+
|| !config.isDisableBrokerInterceptors()) {
197+
ExceptionHandler handler = new ExceptionHandler();
198+
// Enable PreInterceptFilter only when interceptors are enabled
199+
filterHolders.add(
200+
new FilterHolder(new PreInterceptFilter(pulsarService.getBrokerInterceptor(), handler)));
201+
filterHolders.add(new FilterHolder(new ProcessHandlerFilter(pulsarService)));
202+
}
177203

178-
if (pulsar.getConfiguration().isHttpRequestsLimitEnabled()) {
179-
addFilter(context,
180-
new RateLimitingFilter(pulsar.getConfiguration().getHttpRequestsMaxPerSecond()));
181-
}
204+
if (config.isAuthenticationEnabled()) {
205+
authenticationFilterHolder = new FilterHolder(new AuthenticationFilter(
206+
pulsarService.getBrokerService().getAuthenticationService()));
207+
filterHolders.add(authenticationFilterHolder);
208+
} else {
209+
authenticationFilterHolder = null;
210+
}
182211

183-
if (!config.getBrokerInterceptors().isEmpty()
184-
|| !config.isDisableBrokerInterceptors()) {
185-
ExceptionHandler handler = new ExceptionHandler();
186-
// Enable PreInterceptFilter only when interceptors are enabled
187-
addFilter(context, new PreInterceptFilter(pulsar.getBrokerInterceptor(), handler));
188-
addFilter(context, new ProcessHandlerFilter(pulsar));
189-
}
212+
if (config.isDisableHttpDebugMethods()) {
213+
filterHolders.add(new FilterHolder(new DisableDebugHttpMethodFilter(config)));
214+
}
190215

191-
if (requiresAuthentication && pulsar.getConfiguration().isAuthenticationEnabled()) {
192-
addFilter(context, new AuthenticationFilter(
193-
pulsar.getBrokerService().getAuthenticationService()));
194-
}
216+
if (config.getHttpMaxRequestSize() > 0) {
217+
filterHolders.add(new FilterHolder(
218+
new MaxRequestSizeFilter(
219+
config.getHttpMaxRequestSize())));
220+
}
195221

196-
if (config.isDisableHttpDebugMethods()) {
197-
addFilter(context, new DisableDebugHttpMethodFilter(config));
222+
filterHolders.add(new FilterHolder(new ResponseHandlerFilter(pulsarService)));
198223
}
199224

200-
if (config.getHttpMaxRequestSize() > 0) {
201-
addFilter(context,
202-
new MaxRequestSizeFilter(
203-
config.getHttpMaxRequestSize()));
225+
public void addFilters(ServletContextHandler context, boolean requiresAuthentication) {
226+
for (FilterHolder filterHolder : filterHolders) {
227+
if (requiresAuthentication || filterHolder != authenticationFilterHolder) {
228+
context.addFilter(filterHolder,
229+
MATCH_ALL, EnumSet.allOf(DispatcherType.class));
230+
}
231+
}
204232
}
205233

206-
addFilter(context, new ResponseHandlerFilter(pulsar));
234+
}
235+
236+
public void addServlet(String path, ServletHolder servletHolder, boolean requiresAuthentication,
237+
Map<String, Object> attributeMap) {
238+
ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
239+
// Notice: each context path should be unique, but there's nothing here to verify that
240+
context.setContextPath(path);
241+
context.addServlet(servletHolder, MATCH_ALL);
242+
if (attributeMap != null) {
243+
attributeMap.forEach((key, value) -> {
244+
context.setAttribute(key, value);
245+
});
246+
}
247+
filterInitializer.addFilters(context, requiresAuthentication);
207248
handlers.add(context);
208249
}
209250

0 commit comments

Comments
 (0)