1818 */
1919package org .apache .pinot .queries ;
2020
21+ import java .util .Collections ;
2122import java .util .HashMap ;
2223import java .util .List ;
2324import java .util .Map ;
2425import java .util .concurrent .ExecutorService ;
2526import java .util .concurrent .Executors ;
2627import javax .annotation .Nullable ;
27- import org .apache .pinot .common .Utils ;
2828import org .apache .pinot .common .request .BrokerRequest ;
29+ import org .apache .pinot .common .request .PinotQuery ;
2930import org .apache .pinot .common .response .broker .BrokerResponseNative ;
3031import org .apache .pinot .common .utils .DataTable ;
3132import org .apache .pinot .core .common .Operator ;
3839import org .apache .pinot .core .query .reduce .BrokerReduceService ;
3940import org .apache .pinot .core .query .request .context .QueryContext ;
4041import org .apache .pinot .core .query .request .context .utils .BrokerRequestToQueryContextConverter ;
41- import org .apache .pinot .core .query .request .context .utils .QueryContextConverterUtils ;
4242import org .apache .pinot .core .transport .ServerRoutingInstance ;
4343import org .apache .pinot .core .util .GapfillUtils ;
44- import org .apache .pinot .pql .parsers .Pql2Compiler ;
4544import org .apache .pinot .segment .spi .IndexSegment ;
4645import org .apache .pinot .spi .config .table .TableConfig ;
4746import org .apache .pinot .spi .config .table .TableType ;
5756 * Base class for queries tests.
5857 */
5958public abstract class BaseQueriesTest {
60- protected static final Pql2Compiler PQL_COMPILER = new Pql2Compiler ();
61- protected static final CalciteSqlCompiler SQL_COMPILER = new CalciteSqlCompiler ();
59+ protected static final CalciteSqlCompiler COMPILER = new CalciteSqlCompiler ();
6260 protected static final PlanMaker PLAN_MAKER = new InstancePlanMakerImplV2 ();
6361 protected static final QueryOptimizer OPTIMIZER = new QueryOptimizer ();
6462
@@ -71,141 +69,93 @@ public abstract class BaseQueriesTest {
7169 protected abstract List <IndexSegment > getIndexSegments ();
7270
7371 /**
74- * Run PQL query on single index segment.
72+ * Run query on single index segment.
7573 * <p>Use this to test a single operator.
7674 */
7775 @ SuppressWarnings ({"rawtypes" , "unchecked" })
78- protected <T extends Operator > T getOperatorForPqlQuery (String pqlQuery ) {
79- QueryContext queryContext = QueryContextConverterUtils .getQueryContextFromPQL (pqlQuery );
80- return (T ) PLAN_MAKER .makeSegmentPlanNode (getIndexSegment (), queryContext ).run ();
81- }
82-
83- /**
84- * Run PQL query with hard-coded filter on single index segment.
85- * <p>Use this to test a single operator.
86- */
87- @ SuppressWarnings ("rawtypes" )
88- protected <T extends Operator > T getOperatorForPqlQueryWithFilter (String pqlQuery ) {
89- return getOperatorForPqlQuery (pqlQuery + getFilter ());
90- }
91-
92- /**
93- * Run SQL query on single index segment.
94- * <p>Use this to test a single operator.
95- */
96- @ SuppressWarnings ({"rawtypes" , "unchecked" })
97- protected <T extends Operator > T getOperatorForSqlQuery (String sqlQuery ) {
98- QueryContext queryContext = QueryContextConverterUtils .getQueryContextFromSQL (sqlQuery );
76+ protected <T extends Operator > T getOperator (String query ) {
77+ BrokerRequest brokerRequest = COMPILER .compileToBrokerRequest (query );
78+ PinotQuery pinotQuery = brokerRequest .getPinotQuery ();
79+ Map <String , String > queryOptions = pinotQuery .getQueryOptions ();
80+ if (queryOptions == null ) {
81+ queryOptions = new HashMap <>();
82+ pinotQuery .setQueryOptions (queryOptions );
83+ }
84+ queryOptions .put (Request .QueryOptionKey .GROUP_BY_MODE , Request .SQL );
85+ queryOptions .put (Request .QueryOptionKey .RESPONSE_FORMAT , Request .SQL );
86+ BrokerRequest serverBrokerRequest = GapfillUtils .stripGapfill (brokerRequest );
87+ QueryContext queryContext = BrokerRequestToQueryContextConverter .convert (serverBrokerRequest );
9988 return (T ) PLAN_MAKER .makeSegmentPlanNode (getIndexSegment (), queryContext ).run ();
10089 }
10190
10291 /**
103- * Run SQL query with hard-coded filter on single index segment.
92+ * Run query with hard-coded filter on single index segment.
10493 * <p>Use this to test a single operator.
10594 */
10695 @ SuppressWarnings ("rawtypes" )
107- protected <T extends Operator > T getOperatorForSqlQueryWithFilter (String sqlQuery ) {
108- return getOperatorForSqlQuery ( sqlQuery + getFilter ());
96+ protected <T extends Operator > T getOperatorWithFilter (String query ) {
97+ return getOperator ( query + getFilter ());
10998 }
11099
111100 /**
112- * Run PQL query on multiple index segments.
101+ * Run query on multiple index segments.
113102 * <p>Use this to test the whole flow from server to broker.
114103 * <p>The result should be equivalent to querying 4 identical index segments.
115104 */
116- protected BrokerResponseNative getBrokerResponseForPqlQuery (String pqlQuery ) {
117- return getBrokerResponseForPqlQuery ( pqlQuery , PLAN_MAKER );
105+ protected BrokerResponseNative getBrokerResponse (String query ) {
106+ return getBrokerResponse ( query , PLAN_MAKER );
118107 }
119108
120109 /**
121- * Run PQL query with hard-coded filter on multiple index segments.
110+ * Run query with hard-coded filter on multiple index segments.
122111 * <p>Use this to test the whole flow from server to broker.
123112 * <p>The result should be equivalent to querying 4 identical index segments.
124113 */
125- protected BrokerResponseNative getBrokerResponseForPqlQueryWithFilter (String pqlQuery ) {
126- return getBrokerResponseForPqlQuery ( pqlQuery + getFilter ());
114+ protected BrokerResponseNative getBrokerResponseWithFilter (String query ) {
115+ return getBrokerResponse ( query + getFilter ());
127116 }
128117
129118 /**
130- * Run PQL query on multiple index segments with custom plan maker.
119+ * Run query on multiple index segments with custom plan maker.
131120 * <p>Use this to test the whole flow from server to broker.
132121 * <p>The result should be equivalent to querying 4 identical index segments.
133122 */
134- protected BrokerResponseNative getBrokerResponseForPqlQuery (String pqlQuery , PlanMaker planMaker ) {
135- return getBrokerResponseForPqlQuery ( pqlQuery , planMaker , null );
123+ protected BrokerResponseNative getBrokerResponse (String query , PlanMaker planMaker ) {
124+ return getBrokerResponse ( query , planMaker , null );
136125 }
137126
138127 /**
139- * Run PQL query on multiple index segments.
128+ * Run query on multiple index segments.
140129 * <p>Use this to test the whole flow from server to broker.
141130 * <p>The result should be equivalent to querying 4 identical index segments.
142131 */
143- protected BrokerResponseNative getBrokerResponseForPqlQuery (String pqlQuery ,
144- @ Nullable Map <String , String > extraQueryOptions ) {
145- return getBrokerResponseForPqlQuery (pqlQuery , PLAN_MAKER , extraQueryOptions );
132+ protected BrokerResponseNative getBrokerResponse (String query , @ Nullable Map <String , String > extraQueryOptions ) {
133+ return getBrokerResponse (query , PLAN_MAKER , extraQueryOptions );
146134 }
147135
148136 /**
149- * Run PQL query on multiple index segments with custom plan maker and queryOptions.
137+ * Run query on multiple index segments with custom plan maker and queryOptions.
150138 * <p>Use this to test the whole flow from server to broker.
151139 * <p>The result should be equivalent to querying 4 identical index segments.
152140 */
153- private BrokerResponseNative getBrokerResponseForPqlQuery (String pqlQuery , PlanMaker planMaker ,
141+ private BrokerResponseNative getBrokerResponse (String query , PlanMaker planMaker ,
154142 @ Nullable Map <String , String > extraQueryOptions ) {
155- BrokerRequest brokerRequest = PQL_COMPILER .compileToBrokerRequest (pqlQuery );
156- if (extraQueryOptions != null ) {
157- Map <String , String > queryOptions = brokerRequest .getQueryOptions ();
158- if (queryOptions != null ) {
159- queryOptions .putAll (extraQueryOptions );
160- } else {
161- brokerRequest .setQueryOptions (extraQueryOptions );
162- }
163- }
164- QueryContext queryContext = BrokerRequestToQueryContextConverter .convert (brokerRequest );
165- return getBrokerResponse (queryContext , queryContext , planMaker );
166- }
167-
168- /**
169- * Run SQL query on multiple index segments.
170- * <p>Use this to test the whole flow from server to broker.
171- * <p>The result should be equivalent to querying 4 identical index segments.
172- */
173- protected BrokerResponseNative getBrokerResponseForSqlQuery (String sqlQuery ) {
174- return getBrokerResponseForSqlQuery (sqlQuery , PLAN_MAKER );
175- }
176-
177- /**
178- * Run SQL query with hard-coded filter on multiple index segments.
179- * <p>Use this to test the whole flow from server to broker.
180- * <p>The result should be equivalent to querying 4 identical index segments.
181- */
182- protected BrokerResponseNative getBrokerResponseForSqlQueryWithFilter (String sqlQuery ) {
183- return getBrokerResponseForSqlQuery (sqlQuery + getFilter ());
184- }
185-
186- /**
187- * Run SQL query on multiple index segments with custom plan maker.
188- * <p>Use this to test the whole flow from server to broker.
189- * <p>The result should be equivalent to querying 4 identical index segments.
190- */
191- @ SuppressWarnings ("SameParameterValue" )
192- protected BrokerResponseNative getBrokerResponseForSqlQuery (String sqlQuery , PlanMaker planMaker ) {
193- BrokerRequest brokerRequest = SQL_COMPILER .compileToBrokerRequest (sqlQuery );
194- Map <String , String > queryOptions = brokerRequest .getPinotQuery ().getQueryOptions ();
143+ BrokerRequest brokerRequest = COMPILER .compileToBrokerRequest (query );
144+ PinotQuery pinotQuery = brokerRequest .getPinotQuery ();
145+ Map <String , String > queryOptions = pinotQuery .getQueryOptions ();
195146 if (queryOptions == null ) {
196147 queryOptions = new HashMap <>();
197- brokerRequest . getPinotQuery () .setQueryOptions (queryOptions );
148+ pinotQuery .setQueryOptions (queryOptions );
198149 }
199150 queryOptions .put (Request .QueryOptionKey .GROUP_BY_MODE , Request .SQL );
200151 queryOptions .put (Request .QueryOptionKey .RESPONSE_FORMAT , Request .SQL );
152+ if (extraQueryOptions != null ) {
153+ queryOptions .putAll (extraQueryOptions );
154+ }
201155 BrokerRequest serverBrokerRequest = GapfillUtils .stripGapfill (brokerRequest );
202156 QueryContext queryContext = BrokerRequestToQueryContextConverter .convert (brokerRequest );
203- QueryContext serverQueryContext ;
204- if (brokerRequest == serverBrokerRequest ) {
205- serverQueryContext = queryContext ;
206- } else {
207- serverQueryContext = BrokerRequestToQueryContextConverter .convert (serverBrokerRequest );
208- }
157+ QueryContext serverQueryContext = brokerRequest == serverBrokerRequest ? queryContext
158+ : BrokerRequestToQueryContextConverter .convert (serverBrokerRequest );
209159 return getBrokerResponse (queryContext , serverQueryContext , planMaker );
210160 }
211161
@@ -214,66 +164,58 @@ protected BrokerResponseNative getBrokerResponseForSqlQuery(String sqlQuery, Pla
214164 * <p>Use this to test the whole flow from server to broker.
215165 * <p>The result should be equivalent to querying 4 identical index segments.
216166 */
217- private BrokerResponseNative getBrokerResponse (
218- QueryContext queryContext , QueryContext serverQueryContext , PlanMaker planMaker ) {
219- // Server side.
167+ private BrokerResponseNative getBrokerResponse (QueryContext queryContext , QueryContext serverQueryContext ,
168+ PlanMaker planMaker ) {
169+ // Server side
220170 serverQueryContext .setEndTimeMs (System .currentTimeMillis () + Server .DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS );
221171 Plan plan = planMaker .makeInstancePlan (getIndexSegments (), serverQueryContext , EXECUTOR_SERVICE );
222-
223- BrokerRequest brokerRequest = serverQueryContext .getBrokerRequest ();
172+ PinotQuery pinotQuery = serverQueryContext .getBrokerRequest ().getPinotQuery ();
224173 DataTable instanceResponse =
225- brokerRequest != null && brokerRequest .getPinotQuery () != null && brokerRequest .getPinotQuery ().isExplain ()
226- ? ServerQueryExecutorV1Impl .processExplainPlanQueries (plan ) : plan .execute ();
174+ pinotQuery .isExplain () ? ServerQueryExecutorV1Impl .processExplainPlanQueries (plan ) : plan .execute ();
227175
228- // Broker side.
229- Map < String , Object > properties = new HashMap <>();
230- properties . put ( CommonConstants . Broker . CONFIG_OF_MAX_REDUCE_THREADS_PER_QUERY , 2 ); // 2 Threads for 2 Data-tables.
231- BrokerReduceService brokerReduceService = new BrokerReduceService ( new PinotConfiguration ( properties ));
176+ // Broker side
177+ // Use 2 Threads for 2 data-tables
178+ BrokerReduceService brokerReduceService = new BrokerReduceService ( new PinotConfiguration (
179+ Collections . singletonMap ( CommonConstants . Broker . CONFIG_OF_MAX_REDUCE_THREADS_PER_QUERY , 2 ) ));
232180 Map <ServerRoutingInstance , DataTable > dataTableMap = new HashMap <>();
233-
234181 try {
235-
236- // For multi-threaded BrokerReduceService, we cannot reuse the same data-table.
182+ // For multi-threaded BrokerReduceService, we cannot reuse the same data-table
237183 byte [] serializedResponse = instanceResponse .toBytes ();
238184 dataTableMap .put (new ServerRoutingInstance ("localhost" , 1234 , TableType .OFFLINE ),
239185 DataTableFactory .getDataTable (serializedResponse ));
240186 dataTableMap .put (new ServerRoutingInstance ("localhost" , 1234 , TableType .REALTIME ),
241187 DataTableFactory .getDataTable (serializedResponse ));
242188 } catch (Exception e ) {
243- Utils . rethrowException (e );
189+ throw new RuntimeException (e );
244190 }
245-
246191 BrokerResponseNative brokerResponse =
247192 brokerReduceService .reduceOnDataTable (queryContext .getBrokerRequest (), serverQueryContext .getBrokerRequest (),
248193 dataTableMap , CommonConstants .Broker .DEFAULT_BROKER_TIMEOUT_MS , null );
249194 brokerReduceService .shutDown ();
250- return brokerResponse ;
251- }
252195
253- protected BrokerResponseNative getBrokerResponseForOptimizedSqlQuery (String sqlQuery , @ Nullable TableConfig config ,
254- @ Nullable Schema schema ) {
255- return getBrokerResponseForOptimizedSqlQuery (sqlQuery , config , schema , PLAN_MAKER );
196+ return brokerResponse ;
256197 }
257198
258199 /**
259- * Run optimized SQL query on multiple index segments with custom plan maker .
200+ * Run optimized query on multiple index segments.
260201 * <p>Use this to test the whole flow from server to broker.
261202 * <p>The result should be equivalent to querying 4 identical index segments.
262203 */
263- protected BrokerResponseNative getBrokerResponseForOptimizedSqlQuery (String sqlQuery , @ Nullable TableConfig config ,
264- @ Nullable Schema schema , PlanMaker planMaker ) {
265- BrokerRequest brokerRequest = SQL_COMPILER .compileToBrokerRequest (sqlQuery );
266- OPTIMIZER . optimize ( brokerRequest .getPinotQuery (), config , schema );
267- Map <String , String > queryOptions = brokerRequest . getPinotQuery () .getQueryOptions ();
204+ protected BrokerResponseNative getBrokerResponseForOptimizedQuery (String query , @ Nullable TableConfig config ,
205+ @ Nullable Schema schema ) {
206+ BrokerRequest brokerRequest = COMPILER .compileToBrokerRequest (query );
207+ PinotQuery pinotQuery = brokerRequest .getPinotQuery ();
208+ Map <String , String > queryOptions = pinotQuery .getQueryOptions ();
268209 if (queryOptions == null ) {
269210 queryOptions = new HashMap <>();
270- brokerRequest . getPinotQuery () .setQueryOptions (queryOptions );
211+ pinotQuery .setQueryOptions (queryOptions );
271212 }
272213 queryOptions .put (Request .QueryOptionKey .GROUP_BY_MODE , Request .SQL );
273214 queryOptions .put (Request .QueryOptionKey .RESPONSE_FORMAT , Request .SQL );
215+ OPTIMIZER .optimize (pinotQuery , config , schema );
274216 BrokerRequest serverBrokerRequest = GapfillUtils .stripGapfill (brokerRequest );
275217 QueryContext queryContext = BrokerRequestToQueryContextConverter .convert (brokerRequest );
276218 QueryContext serverQueryContext = BrokerRequestToQueryContextConverter .convert (serverBrokerRequest );
277- return getBrokerResponse (queryContext , serverQueryContext , planMaker );
219+ return getBrokerResponse (queryContext , serverQueryContext , PLAN_MAKER );
278220 }
279221}
0 commit comments