@@ -79,6 +79,26 @@ public Publisher<FluxRecord> query(@Nonnull final String query, @Nonnull final S
7979 return query (Flowable .just (query ), org );
8080 }
8181
82+ @ Nonnull
83+ @ Override
84+ public Publisher <FluxRecord > query (@ Nonnull final Query query , @ Nonnull final String org ) {
85+
86+ Arguments .checkNotNull (query , "Flux query" );
87+ Arguments .checkNonEmpty (org , "org" );
88+ return queryQuery (Flowable .just (query ), org );
89+ }
90+
91+ @ Nonnull
92+
93+ @ Override
94+ public Publisher <FluxRecord > query (@ Nonnull final Query query ) {
95+
96+ Arguments .checkNotNull (options .getOrg (), "InfluxDBClientOptions.getOrg" );
97+ Arguments .checkNotNull (query , "Flux query" );
98+
99+ return queryQuery (Flowable .just (query ), options .getOrg ());
100+ }
101+
82102 @ Override
83103 public <M > Publisher <M > query (@ Nonnull final String query , @ Nonnull final Class <M > measurementType ) {
84104
@@ -99,6 +119,19 @@ public <M> Publisher<M> query(@Nonnull final String query,
99119 return query (Flowable .just (query ), org , measurementType );
100120 }
101121
122+ @ Override
123+ public <M > Publisher <M > query (@ Nonnull final Query query , @ Nonnull final String org ,
124+ @ Nonnull final Class <M > measurementType ) {
125+ Arguments .checkNotNull (query , "Flux query" );
126+ Arguments .checkNotNull (measurementType , "Measurement type" );
127+ Arguments .checkNonEmpty (org , "org" );
128+
129+ return Flowable
130+ .fromPublisher (queryQuery (Flowable .just (query ), org ))
131+ .map (fluxRecord -> resultMapper .toPOJO (fluxRecord , measurementType ));
132+ }
133+
134+
102135 @ Nonnull
103136 @ Override
104137 public Publisher <FluxRecord > query (@ Nonnull final Publisher <String > queryStream ) {
@@ -112,14 +145,22 @@ public Publisher<FluxRecord> query(@Nonnull final Publisher<String> queryStream)
112145 @ Override
113146 public Publisher <FluxRecord > query (@ Nonnull final Publisher <String > queryStream ,
114147 @ Nonnull final String org ) {
148+ return queryQuery (Flowable .fromPublisher (queryStream )
149+ .map (query -> new Query ().query (query ).dialect (AbstractInfluxDBClient .DEFAULT_DIALECT )), org );
150+ }
151+
152+ @ Nonnull
153+ @ Override
154+ public Publisher <FluxRecord > queryQuery (@ Nonnull final Publisher <Query > queryStream ,
155+ @ Nonnull final String org ) {
115156
116157 Arguments .checkNotNull (queryStream , "queryStream" );
117158 Arguments .checkNonEmpty (org , "org" );
118159
119160 return Flowable
120161 .fromPublisher (queryStream )
121162 .map (it -> service .postQueryResponseBody (null , null ,
122- null , org , null , new Query (). query ( it ) .dialect (AbstractInfluxDBClient .DEFAULT_DIALECT )))
163+ null , org , null , it .dialect (AbstractInfluxDBClient .DEFAULT_DIALECT )))
123164 .flatMap (queryCall -> {
124165
125166 Observable <FluxRecord > observable = Observable .create (subscriber -> {
@@ -187,6 +228,15 @@ public Publisher<String> queryRaw(@Nonnull final String query) {
187228 return queryRaw (query , options .getOrg ());
188229 }
189230
231+ @ Nonnull
232+ @ Override
233+ public Publisher <String > queryRaw (@ Nonnull final Query query ) {
234+
235+ Arguments .checkNotNull (options .getOrg (), "InfluxDBClientOptions.getOrg" );
236+
237+ return queryRawQuery (Flowable .just (query ), null , options .getOrg ());
238+ }
239+
190240 @ Nonnull
191241 @ Override
192242 public Publisher <String > queryRaw (@ Nonnull final String query , @ Nonnull final String org ) {
@@ -249,34 +299,55 @@ public Publisher<String> queryRaw(@Nonnull final Publisher<String> queryStream,
249299
250300 @ Nonnull
251301 @ Override
252- public Publisher <String > queryRaw (@ Nonnull final Publisher < String > queryStream ,
302+ public Publisher <String > queryRaw (@ Nonnull final Query query ,
253303 @ Nullable final Dialect dialect ,
254304 @ Nonnull final String org ) {
305+ return queryRawQuery (Flowable .just (query ), dialect , org );
306+ }
307+
308+ @ Nonnull
309+ @ Override
310+ public Publisher <String > queryRawQuery (@ Nonnull final Publisher <Query > queryStream ,
311+ @ Nullable final Dialect dialect ,
312+ @ Nonnull final String org ) {
255313
256314 Arguments .checkNotNull (queryStream , "queryStream" );
257315 Arguments .checkNonEmpty (org , "org" );
258316
259317 return Flowable
260- .fromPublisher (queryStream )
261- .map (it -> service .postQueryResponseBody (null , null ,
262- null , org , null , new Query (). query ( it ) .dialect (dialect )))
263- .flatMap (queryCall -> {
318+ .fromPublisher (queryStream )
319+ .map (it -> service .postQueryResponseBody (null , null ,
320+ null , org , null , it .dialect (dialect )))
321+ .flatMap (queryCall -> {
264322
265- Observable <String > observable = Observable .create (subscriber -> {
323+ Observable <String > observable = Observable .create (subscriber -> {
266324
267325
268- BiConsumer <Cancellable , String > consumer = (cancellable , line ) -> {
269- if (subscriber .isDisposed ()) {
270- cancellable .cancel ();
271- } else {
272- subscriber .onNext (line );
273- }
274- };
326+ BiConsumer <Cancellable , String > consumer = (cancellable , line ) -> {
327+ if (subscriber .isDisposed ()) {
328+ cancellable .cancel ();
329+ } else {
330+ subscriber .onNext (line );
331+ }
332+ };
275333
276- queryRaw (queryCall , consumer , subscriber ::onError , subscriber ::onComplete , false );
277- });
278-
279- return observable .toFlowable (BackpressureStrategy .BUFFER );
334+ queryRaw (queryCall , consumer , subscriber ::onError , subscriber ::onComplete , false );
280335 });
336+
337+ return observable .toFlowable (BackpressureStrategy .BUFFER );
338+ });
339+ }
340+
341+ @ Nonnull
342+ @ Override
343+ public Publisher <String > queryRaw (@ Nonnull final Publisher <String > queryStream ,
344+ @ Nullable final Dialect dialect ,
345+ @ Nonnull final String org ) {
346+
347+ Arguments .checkNotNull (queryStream , "queryStream" );
348+ Arguments .checkNonEmpty (org , "org" );
349+
350+ return queryRawQuery (Flowable .fromPublisher (queryStream )
351+ .map (q -> new Query ().query (q ).dialect (dialect )), dialect , org );
281352 }
282353}
0 commit comments