Skip to content

Commit 7b545ea

Browse files
author
Rong Rong
committed
populate queryOption down to leaf
1 parent c7e05a7 commit 7b545ea

File tree

3 files changed

+46
-9
lines changed

3 files changed

+46
-9
lines changed

pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919
package org.apache.pinot.query.runtime.plan;
2020

2121
import com.google.common.collect.ImmutableList;
22-
import com.google.common.collect.ImmutableMap;
2322
import java.util.ArrayList;
2423
import java.util.Arrays;
24+
import java.util.HashMap;
2525
import java.util.List;
2626
import java.util.Map;
2727
import java.util.stream.Collectors;
@@ -126,8 +126,7 @@ public static ServerPlanRequestContext build(MailboxService<TransferableBlock> m
126126
QUERY_OPTIMIZER.optimize(pinotQuery, tableConfig, schema);
127127

128128
// 2. set pinot query options according to requestMetadataMap
129-
pinotQuery.setQueryOptions(
130-
ImmutableMap.of(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS, String.valueOf(timeoutMs)));
129+
updateQueryOptions(pinotQuery, requestMetadataMap, timeoutMs, traceEnabled);
131130

132131
// 3. wrapped around in broker request
133132
BrokerRequest brokerRequest = new BrokerRequest();
@@ -151,6 +150,19 @@ public static ServerPlanRequestContext build(MailboxService<TransferableBlock> m
151150
return context;
152151
}
153152

153+
private static void updateQueryOptions(PinotQuery pinotQuery, Map<String, String> requestMetadataMap, long timeoutMs,
154+
boolean traceEnabled) {
155+
Map<String, String> queryOptions = new HashMap<>();
156+
// put default timeout and trace options
157+
queryOptions.put(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS, String.valueOf(timeoutMs));
158+
if (traceEnabled) {
159+
queryOptions.put(CommonConstants.Broker.Request.TRACE, "true");
160+
}
161+
// overwrite with requestMetadataMap to carry query options from request:
162+
queryOptions.putAll(requestMetadataMap);
163+
pinotQuery.setQueryOptions(queryOptions);
164+
}
165+
154166
private static void walkStageNode(StageNode node, ServerPlanRequestContext context) {
155167
node.visit(INSTANCE, context);
156168
}

pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,8 @@
5454

5555

5656
/**
57-
* all legacy tests.
58-
*
59-
* @deprecated do not add to this test set. this class will be broken down and clean up.
60-
* add your test to appropraite files in {@link org.apache.pinot.query.runtime.queries} instead.
57+
* all special tests that doesn't fit into {@link org.apache.pinot.query.runtime.queries.ResourceBasedQueriesTest}
58+
* pattern goes here.
6159
*/
6260
public class QueryRunnerTest extends QueryRunnerTestBase {
6361
public static final Object[][] ROWS = new Object[][]{
@@ -156,13 +154,23 @@ public void tearDown() {
156154
_mailboxService.shutdown();
157155
}
158156

159-
@Test(dataProvider = "testDataWithSqlToFinalRowCount")
157+
158+
/**
159+
* Test compares with expected row count only.
160+
*/
161+
@Test(dataProvider = "testDataWithSqlToFinalRowCount")
160162
public void testSqlWithFinalRowCountChecker(String sql, int expectedRows)
161163
throws Exception {
162164
List<Object[]> resultRows = queryRunner(sql, null);
163165
Assert.assertEquals(resultRows.size(), expectedRows);
164166
}
165167

168+
/**
169+
* Test automatically compares against H2.
170+
*
171+
* @deprecated do not add to this test set. this class will be broken down and clean up.
172+
* add your test to the appropriate files in {@link org.apache.pinot.query.runtime.queries} instead.
173+
*/
166174
@Test(dataProvider = "testSql")
167175
public void testSqlWithH2Checker(String sql)
168176
throws Exception {
@@ -172,6 +180,9 @@ public void testSqlWithH2Checker(String sql)
172180
compareRowEquals(resultRows, expectedRows);
173181
}
174182

183+
/**
184+
* Test compares against its desired exceptions.
185+
*/
175186
@Test(dataProvider = "testDataWithSqlExecutionExceptions")
176187
public void testSqlWithExceptionMsgChecker(String sql, String exceptionMsg) {
177188
long requestId = RANDOM_REQUEST_ID_GEN.nextLong();
@@ -235,6 +246,10 @@ private Object[][] provideTestSqlAndRowCount() {
235246
new Object[]{"SELECT round_decimal(col3) FROM a", 15},
236247
new Object[]{"SELECT col1, roundDecimal(COUNT(*)) FROM a GROUP BY col1", 5},
237248
new Object[]{"SELECT col1, round_decimal(COUNT(*)) FROM a GROUP BY col1", 5},
249+
250+
// test queries with special query options attached
251+
// - when leaf limit is set, each server returns multiStageLeafLimit number of rows only.
252+
new Object[]{"SET multiStageLeafLimit = 1; SELECT * FROM a", 2},
238253
};
239254
}
240255

pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@
6161
import org.apache.pinot.spi.utils.BytesUtils;
6262
import org.apache.pinot.spi.utils.CommonConstants;
6363
import org.apache.pinot.spi.utils.StringUtil;
64+
import org.apache.pinot.sql.parsers.CalciteSqlParser;
65+
import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
6466
import org.h2.jdbc.JdbcArray;
6567
import org.testng.Assert;
6668

@@ -84,14 +86,22 @@ public abstract class QueryRunnerTestBase extends QueryTestSet {
8486
// --------------------------------------------------------------------------
8587
// QUERY UTILS
8688
// --------------------------------------------------------------------------
89+
90+
/**
91+
* Dispatch query to each pinot-server. The logic should mimic
92+
* {@link QueryDispatcher#submit(long, QueryPlan, long, Map)} but does not actually make ser/de dispatches.
93+
*/
8794
protected List<Object[]> queryRunner(String sql, Map<Integer, ExecutionStatsAggregator> executionStatsAggregatorMap) {
88-
QueryPlan queryPlan = _queryEnvironment.planQuery(sql);
8995
long requestId = RANDOM_REQUEST_ID_GEN.nextLong();
96+
SqlNodeAndOptions sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(sql);
97+
QueryPlan queryPlan = _queryEnvironment.planQuery(sql, sqlNodeAndOptions, requestId).getQueryPlan();
9098
Map<String, String> requestMetadataMap = new HashMap<>();
9199
requestMetadataMap.put(QueryConfig.KEY_OF_BROKER_REQUEST_ID, String.valueOf(requestId));
92100
requestMetadataMap.put(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS,
93101
String.valueOf(CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS));
102+
requestMetadataMap.putAll(sqlNodeAndOptions.getOptions());
94103

104+
// Putting trace testing here as extra options as it doesn't go along with the rest of the items.
95105
if (executionStatsAggregatorMap != null) {
96106
requestMetadataMap.put(CommonConstants.Broker.Request.TRACE, "true");
97107
}

0 commit comments

Comments
 (0)