Skip to content

Commit 329e231

Browse files
authored
[proxy] Fix proxy routing to functions worker (#6486)
### Motivation Currently, the proxy only works to proxy v1/v2 functions routes to the function worker. ### Modifications This changes this code to proxy all routes for the function worker when those routes match. At the moment this is still a static list of prefixes, but in the future it may be possible to have this list of prefixes be dynamically fetched from the REST routes. ### Verifying this change - added some tests to ensure the routing works as expected
1 parent f9ada10 commit 329e231

File tree

2 files changed

+89
-3
lines changed

2 files changed

+89
-3
lines changed

pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,12 @@
2626
import java.net.URI;
2727
import java.nio.ByteBuffer;
2828
import java.security.cert.X509Certificate;
29+
import java.util.Arrays;
2930
import java.util.Collections;
31+
import java.util.HashSet;
3032
import java.util.Iterator;
3133
import java.util.Objects;
34+
import java.util.Set;
3235
import java.util.concurrent.Executor;
3336

3437
import javax.net.ssl.SSLContext;
@@ -60,6 +63,21 @@
6063

6164
class AdminProxyHandler extends ProxyServlet {
6265
private static final Logger LOG = LoggerFactory.getLogger(AdminProxyHandler.class);
66+
private static final Set<String> functionRoutes = new HashSet<>(Arrays.asList(
67+
"/admin/v3/function",
68+
"/admin/v2/function",
69+
"/admin/function",
70+
"/admin/v3/source",
71+
"/admin/v2/source",
72+
"/admin/source",
73+
"/admin/v3/sink",
74+
"/admin/v2/sink",
75+
"/admin/sink",
76+
"/admin/v2/worker",
77+
"/admin/v2/worker-stats",
78+
"/admin/worker",
79+
"/admin/worker-stats"
80+
));
6381

6482
private final ProxyConfiguration config;
6583
private final BrokerDiscoveryProvider discoveryProvider;
@@ -260,9 +278,11 @@ protected String rewriteTarget(HttpServletRequest request) {
260278

261279
boolean isFunctionsRestRequest = false;
262280
String requestUri = request.getRequestURI();
263-
if (requestUri.startsWith("/admin/v2/functions")
264-
|| requestUri.startsWith("/admin/functions")) {
265-
isFunctionsRestRequest = true;
281+
for (String routePrefix: functionRoutes) {
282+
if (requestUri.startsWith(routePrefix)) {
283+
isFunctionsRestRequest = true;
284+
break;
285+
}
266286
}
267287

268288
if (isFunctionsRestRequest && !isBlank(functionWorkerWebServiceUrl)) {
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.proxy.server;
20+
21+
import org.testng.Assert;
22+
import org.testng.annotations.Test;
23+
24+
import javax.servlet.http.HttpServletRequest;
25+
26+
import static org.mockito.Mockito.mock;
27+
import static org.mockito.Mockito.when;
28+
29+
public class FunctionWorkerRoutingTest {
30+
31+
@Test
32+
public void testFunctionWorkerRedirect() throws Exception {
33+
String functionWorkerUrl = "http://function";
34+
String brokerUrl = "http://broker";
35+
36+
ProxyConfiguration proxyConfig = new ProxyConfiguration();
37+
proxyConfig.setBrokerWebServiceURL(brokerUrl);
38+
proxyConfig.setFunctionWorkerWebServiceURL(functionWorkerUrl);
39+
40+
BrokerDiscoveryProvider discoveryProvider = mock(BrokerDiscoveryProvider.class);
41+
AdminProxyHandler handler = new AdminProxyHandler(proxyConfig, discoveryProvider);
42+
43+
String funcUrl = handler.rewriteTarget(buildRequest("/admin/v3/functions/test/test"));
44+
Assert.assertEquals(funcUrl, String.format("%s/admin/v3/functions/%s/%s",
45+
functionWorkerUrl, "test", "test"));
46+
47+
String sourceUrl = handler.rewriteTarget(buildRequest("/admin/v3/sources/test/test"));
48+
Assert.assertEquals(sourceUrl, String.format("%s/admin/v3/sources/%s/%s",
49+
functionWorkerUrl, "test", "test"));
50+
51+
String sinkUrl = handler.rewriteTarget(buildRequest("/admin/v3/sinks/test/test"));
52+
Assert.assertEquals(sinkUrl, String.format("%s/admin/v3/sinks/%s/%s",
53+
functionWorkerUrl, "test", "test"));
54+
55+
String tenantUrl = handler.rewriteTarget(buildRequest("/admin/v2/tenants/test"));
56+
Assert.assertEquals(tenantUrl, String.format("%s/admin/v2/tenants/%s",
57+
brokerUrl, "test"));
58+
}
59+
60+
static HttpServletRequest buildRequest(String url) {
61+
HttpServletRequest mockReq = mock(HttpServletRequest.class);
62+
when(mockReq.getRequestURI()).thenReturn(url);
63+
return mockReq;
64+
}
65+
66+
}

0 commit comments

Comments
 (0)