|
18 | 18 | */ |
19 | 19 | package org.apache.pulsar.broker.web; |
20 | 20 |
|
21 | | -import static org.apache.pulsar.broker.web.Filters.addFilter; |
22 | | -import static org.apache.pulsar.broker.web.Filters.addFilterClass; |
23 | 21 | import com.google.common.collect.Lists; |
24 | 22 | import io.prometheus.client.CollectorRegistry; |
25 | 23 | import io.prometheus.client.jetty.JettyStatisticsCollector; |
26 | 24 | import java.util.ArrayList; |
27 | | -import java.util.Collections; |
| 25 | +import java.util.EnumSet; |
28 | 26 | import java.util.List; |
29 | 27 | import java.util.Map; |
30 | 28 | import java.util.Optional; |
| 29 | +import javax.servlet.DispatcherType; |
31 | 30 | import org.apache.pulsar.broker.PulsarServerException; |
32 | 31 | import org.apache.pulsar.broker.PulsarService; |
33 | 32 | import org.apache.pulsar.broker.ServiceConfiguration; |
|
43 | 42 | import org.eclipse.jetty.server.handler.RequestLogHandler; |
44 | 43 | import org.eclipse.jetty.server.handler.ResourceHandler; |
45 | 44 | import org.eclipse.jetty.server.handler.StatisticsHandler; |
| 45 | +import org.eclipse.jetty.servlet.FilterHolder; |
46 | 46 | import org.eclipse.jetty.servlet.ServletContextHandler; |
47 | 47 | import org.eclipse.jetty.servlet.ServletHolder; |
48 | 48 | import org.eclipse.jetty.servlets.QoSFilter; |
@@ -72,6 +72,7 @@ public class WebService implements AutoCloseable { |
72 | 72 |
|
73 | 73 | private final ServerConnector httpConnector; |
74 | 74 | private final ServerConnector httpsConnector; |
| 75 | + private final FilterInitializer filterInitializer; |
75 | 76 | private JettyStatisticsCollector jettyStatisticsCollector; |
76 | 77 |
|
77 | 78 | public WebService(PulsarService pulsar) throws PulsarServerException { |
@@ -144,66 +145,106 @@ public WebService(PulsarService pulsar) throws PulsarServerException { |
144 | 145 | // Limit number of concurrent HTTP connections to avoid getting out of file descriptors |
145 | 146 | connectors.forEach(c -> c.setAcceptQueueSize(config.getHttpServerAcceptQueueSize())); |
146 | 147 | server.setConnectors(connectors.toArray(new ServerConnector[connectors.size()])); |
| 148 | + |
| 149 | + filterInitializer = new FilterInitializer(pulsar); |
| 150 | + } |
| 151 | + |
| 152 | + public void addRestResources(String basePath, boolean requiresAuthentication, Map<String, Object> attributeMap, |
| 153 | + String... javaPackages) { |
| 154 | + ResourceConfig config = new ResourceConfig(); |
| 155 | + for (String javaPackage : javaPackages) { |
| 156 | + config.packages(false, javaPackage); |
| 157 | + } |
| 158 | + addResourceServlet(basePath, requiresAuthentication, attributeMap, config); |
147 | 159 | } |
148 | 160 |
|
149 | | - public void addRestResources(String basePath, String javaPackages, boolean requiresAuthentication, |
150 | | - Map<String, Object> attributeMap) { |
| 161 | + public void addRestResource(String basePath, boolean requiresAuthentication, Map<String, Object> attributeMap, |
| 162 | + Class<?>... resourceClasses) { |
151 | 163 | ResourceConfig config = new ResourceConfig(); |
152 | | - config.packages("jersey.config.server.provider.packages", javaPackages); |
| 164 | + for (Class<?> resourceClass : resourceClasses) { |
| 165 | + config.register(resourceClass); |
| 166 | + } |
| 167 | + addResourceServlet(basePath, requiresAuthentication, attributeMap, config); |
| 168 | + } |
| 169 | + |
| 170 | + private void addResourceServlet(String basePath, boolean requiresAuthentication, Map<String, Object> attributeMap, |
| 171 | + ResourceConfig config) { |
153 | 172 | config.register(JsonMapperProvider.class); |
154 | 173 | config.register(MultiPartFeature.class); |
155 | 174 | ServletHolder servletHolder = new ServletHolder(new ServletContainer(config)); |
156 | 175 | servletHolder.setAsyncSupported(true); |
157 | 176 | addServlet(basePath, servletHolder, requiresAuthentication, attributeMap); |
158 | 177 | } |
159 | 178 |
|
160 | | - public void addServlet(String path, ServletHolder servletHolder, boolean requiresAuthentication, |
161 | | - Map<String, Object> attributeMap) { |
162 | | - ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); |
163 | | - context.setContextPath(path); |
164 | | - context.addServlet(servletHolder, MATCH_ALL); |
165 | | - if (attributeMap != null) { |
166 | | - attributeMap.forEach((key, value) -> { |
167 | | - context.setAttribute(key, value); |
168 | | - }); |
169 | | - } |
| 179 | + private static class FilterInitializer { |
| 180 | + private final List<FilterHolder> filterHolders = new ArrayList<>(); |
| 181 | + private final FilterHolder authenticationFilterHolder; |
| 182 | + FilterInitializer(PulsarService pulsarService) { |
| 183 | + ServiceConfiguration config = pulsarService.getConfiguration(); |
| 184 | + if (config.getMaxConcurrentHttpRequests() > 0) { |
| 185 | + FilterHolder filterHolder = new FilterHolder(QoSFilter.class); |
| 186 | + filterHolder.setInitParameter("maxRequests", String.valueOf(config.getMaxConcurrentHttpRequests())); |
| 187 | + filterHolders.add(filterHolder); |
| 188 | + } |
170 | 189 |
|
171 | | - ServiceConfiguration config = pulsar.getConfig(); |
| 190 | + if (config.isHttpRequestsLimitEnabled()) { |
| 191 | + filterHolders.add(new FilterHolder( |
| 192 | + new RateLimitingFilter(config.getHttpRequestsMaxPerSecond()))); |
| 193 | + } |
172 | 194 |
|
173 | | - if (config.getMaxConcurrentHttpRequests() > 0) { |
174 | | - addFilterClass(context, QoSFilter.class, Collections.singletonMap("maxRequests", |
175 | | - String.valueOf(config.getMaxConcurrentHttpRequests()))); |
176 | | - } |
| 195 | + if (!config.getBrokerInterceptors().isEmpty() |
| 196 | + || !config.isDisableBrokerInterceptors()) { |
| 197 | + ExceptionHandler handler = new ExceptionHandler(); |
| 198 | + // Enable PreInterceptFilter only when interceptors are enabled |
| 199 | + filterHolders.add( |
| 200 | + new FilterHolder(new PreInterceptFilter(pulsarService.getBrokerInterceptor(), handler))); |
| 201 | + filterHolders.add(new FilterHolder(new ProcessHandlerFilter(pulsarService))); |
| 202 | + } |
177 | 203 |
|
178 | | - if (pulsar.getConfiguration().isHttpRequestsLimitEnabled()) { |
179 | | - addFilter(context, |
180 | | - new RateLimitingFilter(pulsar.getConfiguration().getHttpRequestsMaxPerSecond())); |
181 | | - } |
| 204 | + if (config.isAuthenticationEnabled()) { |
| 205 | + authenticationFilterHolder = new FilterHolder(new AuthenticationFilter( |
| 206 | + pulsarService.getBrokerService().getAuthenticationService())); |
| 207 | + filterHolders.add(authenticationFilterHolder); |
| 208 | + } else { |
| 209 | + authenticationFilterHolder = null; |
| 210 | + } |
182 | 211 |
|
183 | | - if (!config.getBrokerInterceptors().isEmpty() |
184 | | - || !config.isDisableBrokerInterceptors()) { |
185 | | - ExceptionHandler handler = new ExceptionHandler(); |
186 | | - // Enable PreInterceptFilter only when interceptors are enabled |
187 | | - addFilter(context, new PreInterceptFilter(pulsar.getBrokerInterceptor(), handler)); |
188 | | - addFilter(context, new ProcessHandlerFilter(pulsar)); |
189 | | - } |
| 212 | + if (config.isDisableHttpDebugMethods()) { |
| 213 | + filterHolders.add(new FilterHolder(new DisableDebugHttpMethodFilter(config))); |
| 214 | + } |
190 | 215 |
|
191 | | - if (requiresAuthentication && pulsar.getConfiguration().isAuthenticationEnabled()) { |
192 | | - addFilter(context, new AuthenticationFilter( |
193 | | - pulsar.getBrokerService().getAuthenticationService())); |
194 | | - } |
| 216 | + if (config.getHttpMaxRequestSize() > 0) { |
| 217 | + filterHolders.add(new FilterHolder( |
| 218 | + new MaxRequestSizeFilter( |
| 219 | + config.getHttpMaxRequestSize()))); |
| 220 | + } |
195 | 221 |
|
196 | | - if (config.isDisableHttpDebugMethods()) { |
197 | | - addFilter(context, new DisableDebugHttpMethodFilter(config)); |
| 222 | + filterHolders.add(new FilterHolder(new ResponseHandlerFilter(pulsarService))); |
198 | 223 | } |
199 | 224 |
|
200 | | - if (config.getHttpMaxRequestSize() > 0) { |
201 | | - addFilter(context, |
202 | | - new MaxRequestSizeFilter( |
203 | | - config.getHttpMaxRequestSize())); |
| 225 | + public void addFilters(ServletContextHandler context, boolean requiresAuthentication) { |
| 226 | + for (FilterHolder filterHolder : filterHolders) { |
| 227 | + if (requiresAuthentication || filterHolder != authenticationFilterHolder) { |
| 228 | + context.addFilter(filterHolder, |
| 229 | + MATCH_ALL, EnumSet.allOf(DispatcherType.class)); |
| 230 | + } |
| 231 | + } |
204 | 232 | } |
205 | 233 |
|
206 | | - addFilter(context, new ResponseHandlerFilter(pulsar)); |
| 234 | + } |
| 235 | + |
| 236 | + public void addServlet(String path, ServletHolder servletHolder, boolean requiresAuthentication, |
| 237 | + Map<String, Object> attributeMap) { |
| 238 | + ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); |
| 239 | + // Notice: each context path should be unique, but there's nothing here to verify that |
| 240 | + context.setContextPath(path); |
| 241 | + context.addServlet(servletHolder, MATCH_ALL); |
| 242 | + if (attributeMap != null) { |
| 243 | + attributeMap.forEach((key, value) -> { |
| 244 | + context.setAttribute(key, value); |
| 245 | + }); |
| 246 | + } |
| 247 | + filterInitializer.addFilters(context, requiresAuthentication); |
207 | 248 | handlers.add(context); |
208 | 249 | } |
209 | 250 |
|
|
0 commit comments