Skip to content

Commit 637ad89

Browse files
committed
rebase and cr
1 parent 5a41365 commit 637ad89

File tree

9 files changed

+58
-40
lines changed

9 files changed

+58
-40
lines changed

pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -170,19 +170,21 @@ public String cancelQuery(
170170
boolean verbose) {
171171
try {
172172
Map<String, Integer> serverResponses = verbose ? new HashMap<>() : null;
173-
if (!_requestHandler.cancelQuery(queryId, timeoutMs, _executor, _httpConnMgr, serverResponses)) {
174-
throw new WebApplicationException(Response.status(Response.Status.NOT_FOUND)
175-
.entity(String.format("Query: %s not found on the broker", queryId)).build());
176-
}
177-
String resp = "Cancelled query: " + queryId;
178-
if (verbose) {
179-
resp += " with responses from servers: " + serverResponses;
173+
if (_requestHandler.cancelQuery(queryId, timeoutMs, _executor, _httpConnMgr, serverResponses)) {
174+
String resp = "Cancelled query: " + queryId;
175+
if (verbose) {
176+
resp += " with responses from servers: " + serverResponses;
177+
}
178+
return resp;
180179
}
181-
return resp;
182180
} catch (Exception e) {
183181
throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR)
184-
.entity(String.format("Failed to cancel query: %s due to error: %s", queryId, e.getMessage())).build());
182+
.entity(String.format("Failed to cancel query: %s on the broker due to error: %s", queryId, e.getMessage()))
183+
.build());
185184
}
185+
throw new WebApplicationException(
186+
Response.status(Response.Status.NOT_FOUND).entity(String.format("Query: %s not found on the broker", queryId))
187+
.build());
186188
}
187189

188190
@GET
@@ -198,7 +200,7 @@ public Map<Long, String> getRunningQueries() {
198200
return _requestHandler.getRunningQueries();
199201
} catch (Exception e) {
200202
throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR)
201-
.entity("Failed to get running queries due to error: " + e.getMessage()).build());
203+
.entity("Failed to get running queries on the broker due to error: " + e.getMessage()).build());
202204
}
203205
}
204206

pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -185,13 +185,13 @@ private String getDefaultBrokerId() {
185185

186186
@Override
187187
public Map<Long, String> getRunningQueries() {
188-
Preconditions.checkArgument(_enableQueryCancellation, "Query cancellation is not enabled");
188+
Preconditions.checkArgument(_enableQueryCancellation, "Query cancellation is not enabled on broker");
189189
return _queriesById.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue()._query));
190190
}
191191

192192
@VisibleForTesting
193193
Set<ServerInstance> getRunningServers(long requestId) {
194-
Preconditions.checkArgument(_enableQueryCancellation, "Query cancellation is not enabled");
194+
Preconditions.checkArgument(_enableQueryCancellation, "Query cancellation is not enabled on broker");
195195
QueryServers queryServers = _queriesById.get(requestId);
196196
return (queryServers == null) ? Collections.emptySet() : queryServers._servers;
197197
}
@@ -200,7 +200,7 @@ Set<ServerInstance> getRunningServers(long requestId) {
200200
public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpConnectionManager connMgr,
201201
Map<String, Integer> serverResponses)
202202
throws Exception {
203-
Preconditions.checkArgument(_enableQueryCancellation, "Query cancellation is not enabled");
203+
Preconditions.checkArgument(_enableQueryCancellation, "Query cancellation is not enabled on broker");
204204
QueryServers queryServers = _queriesById.get(queryId);
205205
if (queryServers == null) {
206206
return false;
@@ -225,9 +225,14 @@ public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpC
225225
// requests. The completion order is different from serverUrls, thus use uri in the response.
226226
deleteMethod = completionService.take().get();
227227
URI uri = deleteMethod.getURI();
228-
LOGGER.debug("Got response: {} to cancel query: {} via uri: {}", deleteMethod.getStatusCode(), globalId, uri);
228+
int status = deleteMethod.getStatusCode();
229+
// Unexpected server responses are collected and returned as exception.
230+
if (status != 200 && status != 404) {
231+
throw new Exception(String.format("Unexpected status=%d and response='%s' from uri='%s'", status,
232+
deleteMethod.getResponseBodyAsString(), uri));
233+
}
229234
if (serverResponses != null) {
230-
serverResponses.put(uri.getHost(), deleteMethod.getStatusCode());
235+
serverResponses.put(uri.getHost() + ":" + uri.getPort(), status);
231236
}
232237
} catch (Exception e) {
233238
LOGGER.error("Failed to cancel query: {}", globalId, e);
@@ -241,7 +246,7 @@ public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpC
241246
}
242247
}
243248
if (errMsgs.size() > 0) {
244-
throw new Exception("Failed to cancel query on servers: " + StringUtils.join(errMsgs, ","));
249+
throw new Exception("Unexpected responses from servers: " + StringUtils.join(errMsgs, ","));
245250
}
246251
return true;
247252
}

pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -121,19 +121,18 @@ private boolean useMultiStageEngine(JsonNode request, SqlNodeAndOptions sqlNodeA
121121

122122
@Override
123123
public Map<Long, String> getRunningQueries() {
124-
if (_multiStageWorkerRequestHandler != null) {
125-
return _multiStageWorkerRequestHandler.getRunningQueries();
126-
}
124+
// TODO: add support for multiStaged engine: track running queries for multiStaged engine and combine its
125+
// running queries with those from singleStaged engine. Both engines share the same request Id generator, so
126+
// the query will have unique ids across the two engines.
127127
return _singleStageBrokerRequestHandler.getRunningQueries();
128128
}
129129

130130
@Override
131131
public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpConnectionManager connMgr,
132132
Map<String, Integer> serverResponses)
133133
throws Exception {
134-
if (_multiStageWorkerRequestHandler != null) {
135-
return _multiStageWorkerRequestHandler.cancelQuery(queryId, timeoutMs, executor, connMgr, serverResponses);
136-
}
134+
// TODO: add support for multiStaged engine, basically try to cancel the query on multiStaged engine firstly; if
135+
// not found, try on the singleStaged engine.
137136
return _singleStageBrokerRequestHandler.cancelQuery(queryId, timeoutMs, executor, connMgr, serverResponses);
138137
}
139138
}

pinot-common/src/main/java/org/apache/pinot/common/utils/config/InstanceUtils.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,19 +60,18 @@ public static String getHelixInstanceId(Instance instance) {
6060
return prefix + instance.getHost() + "_" + instance.getPort();
6161
}
6262

63-
public static String getInstanceAdminEndpoint(InstanceConfig instanceConfig, int defaultPort) {
63+
public static String getServerAdminEndpoint(InstanceConfig instanceConfig) {
6464
// Backward-compatible with legacy hostname of format 'Server_<hostname>'
6565
String hostname = instanceConfig.getHostName();
6666
if (hostname.startsWith(Helix.PREFIX_OF_SERVER_INSTANCE)) {
6767
hostname = hostname.substring(Helix.SERVER_INSTANCE_PREFIX_LENGTH);
6868
}
69-
return getInstanceAdminEndpoint(instanceConfig, CommonConstants.HTTP_PROTOCOL, hostname, defaultPort);
69+
return getServerAdminEndpoint(instanceConfig, hostname, CommonConstants.HTTP_PROTOCOL);
7070
}
7171

72-
public static String getInstanceAdminEndpoint(InstanceConfig instanceConfig, String defaultProtocol, String hostname,
73-
int defaultPort) {
72+
public static String getServerAdminEndpoint(InstanceConfig instanceConfig, String hostname, String defaultProtocol) {
7473
String protocol = defaultProtocol;
75-
int port = defaultPort;
74+
int port = CommonConstants.Server.DEFAULT_ADMIN_API_PORT;
7675
int adminPort = instanceConfig.getRecord().getIntField(Helix.Instance.ADMIN_PORT_KEY, -1);
7776
int adminHttpsPort = instanceConfig.getRecord().getIntField(Helix.Instance.ADMIN_HTTPS_PORT_KEY, -1);
7877
// NOTE: preference for insecure is sub-optimal, but required for incremental upgrade scenarios

pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,6 @@
158158
import org.apache.pinot.spi.utils.CommonConstants.Helix;
159159
import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.BrokerResourceStateModel;
160160
import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
161-
import org.apache.pinot.spi.utils.CommonConstants.Server;
162161
import org.apache.pinot.spi.utils.IngestionConfigUtils;
163162
import org.apache.pinot.spi.utils.InstanceTypeUtils;
164163
import org.apache.pinot.spi.utils.TimeUtils;
@@ -226,7 +225,7 @@ public PinotHelixResourceManager(String zkURL, String helixClusterName, @Nullabl
226225
public String load(String instanceId) {
227226
InstanceConfig instanceConfig = getHelixInstanceConfig(instanceId);
228227
Preconditions.checkNotNull(instanceConfig, "Failed to find instance config for: %s", instanceId);
229-
return InstanceUtils.getInstanceAdminEndpoint(instanceConfig, Server.DEFAULT_ADMIN_API_PORT);
228+
return InstanceUtils.getServerAdminEndpoint(instanceConfig);
230229
}
231230
});
232231
_tableUpdaterLocks = new Object[DEFAULT_TABLE_UPDATER_LOCKERS_SIZE];

pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,11 +157,13 @@ public void onFailure(Throwable ignored) {
157157
/**
158158
* Cancel a query as identified by the queryId. This method is non-blocking and the query may still run for a while
159159
* after calling this method. This method can be called multiple times.
160+
* TODO: refine the errmsg when query is cancelled, instead of bubbling up the executor's CancellationException.
160161
*
161162
* @param queryId a unique Id to find the query
162163
* @return true if a running query exists for the given queryId.
163164
*/
164165
public boolean cancelQuery(String queryId) {
166+
Preconditions.checkArgument(_enableQueryCancellation, "Query cancellation is not enabled on server");
165167
// Keep the future as it'll be cleaned up by the thread executing the query.
166168
Future<byte[]> future = _queryFuturesById.get(queryId);
167169
if (future == null) {
@@ -180,7 +182,8 @@ public boolean cancelQuery(String queryId) {
180182
/**
181183
* @return list of ids of the queries currently running on the server.
182184
*/
183-
public Set<String> getRunningQueries() {
185+
public Set<String> getRunningQueryIds() {
186+
Preconditions.checkArgument(_enableQueryCancellation, "Query cancellation is not enabled on server");
184187
return new HashSet<>(_queryFuturesById.keySet());
185188
}
186189

pinot-core/src/main/java/org/apache/pinot/core/transport/ServerInstance.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,7 @@ public ServerInstance(InstanceConfig instanceConfig) {
7878
INVALID_PORT);
7979
_queryMailboxPort = instanceConfig.getRecord().getIntField(Helix.Instance.MULTI_STAGE_QUERY_ENGINE_MAILBOX_PORT_KEY,
8080
INVALID_PORT);
81-
_adminEndpoint = InstanceUtils.getInstanceAdminEndpoint(instanceConfig, CommonConstants.HTTP_PROTOCOL, _hostname,
82-
CommonConstants.Server.DEFAULT_ADMIN_API_PORT);
81+
_adminEndpoint = InstanceUtils.getServerAdminEndpoint(instanceConfig, _hostname, CommonConstants.HTTP_PROTOCOL);
8382
}
8483

8584
@VisibleForTesting

pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/QuerySchedulerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,11 @@ public void testCancelQuery() {
5050
when(query.getQueryId()).thenReturn(id);
5151
qs.submitQuery(query);
5252
}
53-
Assert.assertEquals(qs.getRunningQueries(), queryIds);
53+
Assert.assertEquals(qs.getRunningQueryIds(), queryIds);
5454
for (String id : queryIds) {
5555
qs.cancelQuery(id);
5656
}
57-
Assert.assertTrue(qs.getRunningQueries().isEmpty());
57+
Assert.assertTrue(qs.getRunningQueryIds().isEmpty());
5858
Assert.assertFalse(qs.cancelQuery("unknown"));
5959
}
6060

pinot-server/src/main/java/org/apache/pinot/server/api/resources/QueryResource.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,22 +67,34 @@ public class QueryResource {
6767
public String cancelQuery(
6868
@ApiParam(value = "QueryId as in the format of <brokerId>_<requestId>", required = true) @PathParam("queryId")
6969
String queryId) {
70-
if (_serverInstance.getQueryScheduler().cancelQuery(queryId)) {
71-
return "Cancelled query: " + queryId;
70+
try {
71+
if (_serverInstance.getQueryScheduler().cancelQuery(queryId)) {
72+
return "Cancelled query: " + queryId;
73+
}
74+
} catch (Exception e) {
75+
throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR)
76+
.entity(String.format("Failed to cancel query: %s on the server due to error: %s", queryId, e.getMessage()))
77+
.build());
7278
}
7379
throw new WebApplicationException(
74-
Response.status(Response.Status.NOT_FOUND).entity("Query: " + queryId + " not found on the server").build());
80+
Response.status(Response.Status.NOT_FOUND).entity(String.format("Query: %s not found on the server", queryId))
81+
.build());
7582
}
7683

7784
@GET
78-
@Path("/query/id")
85+
@Path("/queries/id")
7986
@Produces(MediaType.APPLICATION_JSON)
8087
@ApiOperation(value = "Get queryIds of running queries on the server", notes = "QueryIds are in the format of "
8188
+ "<brokerId>_<requestId>")
8289
@ApiResponses(value = {
8390
@ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, message = "Internal server error")
8491
})
85-
public Set<String> getRunningQueries() {
86-
return _serverInstance.getQueryScheduler().getRunningQueries();
92+
public Set<String> getRunningQueryIds() {
93+
try {
94+
return _serverInstance.getQueryScheduler().getRunningQueryIds();
95+
} catch (Exception e) {
96+
throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR)
97+
.entity("Failed to get queryIds of running queries on the server due to error: " + e.getMessage()).build());
98+
}
8799
}
88100
}

0 commit comments

Comments
 (0)