Skip to content

Commit 5ca93a5

Browse files
authored
feat: parameterized queries support (#286)
1 parent 43eb46b commit 5ca93a5

13 files changed

Lines changed: 753 additions & 20 deletions

File tree

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
## 4.1.0 [unreleased]
22

3+
### Features
4+
1. [#286](https://github.com/influxdata/influxdb-client-java/pull/286): Add support for Parameterized Queries
5+
36
### Bug Fixes
47
1. [#283](https://github.com/influxdata/influxdb-client-java/pull/283): Serialization `null` tag's value into LineProtocol
58
1. [#285](https://github.com/influxdata/influxdb-client-java/pull/285): Default dialect for Query APIs

client-kotlin/src/main/kotlin/com/influxdb/client/kotlin/QueryKotlinApi.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ import kotlinx.coroutines.channels.Channel
3030
/**
3131
* The client that allows perform Flux queries against the InfluxDB /api/v2/query endpoint.
3232
*
33+
* For parametrized queries use [Query] object, see [com.influxdb.client.QueryApi] in Java module
34+
* for more details.
35+
*
3336
* @author Jakub Bednar (bednar@github) (29/10/2018 10:45)
3437
*/
3538
interface QueryKotlinApi {

client-reactive/src/main/java/com/influxdb/client/reactive/QueryReactiveApi.java

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,16 @@
2727

2828
import com.influxdb.client.InfluxDBClientOptions;
2929
import com.influxdb.client.domain.Dialect;
30+
import com.influxdb.client.domain.Query;
3031
import com.influxdb.query.FluxRecord;
3132

3233
import org.reactivestreams.Publisher;
3334

3435
/**
35-
* The client that allow perform Flux query against theInfluxDB 2.0by a reactive way.
36+
* The client that allow perform Flux query against the InfluxDB 2.0 by a reactive way.
37+
*
38+
* For parametrized queries use {@link Query} object, see <code>com.influxdb.client.QueryApi</code> in Java module
39+
* for more details.
3640
*
3741
* @author Jakub Bednar (bednar@github) (21/11/2018 07:19)
3842
*/
@@ -51,6 +55,18 @@ public interface QueryReactiveApi {
5155
@Nonnull
5256
Publisher<FluxRecord> query(@Nonnull final String query);
5357

58+
/**
59+
* Returns {@link Publisher} emitting {@link FluxRecord}s which are matched the query.
60+
* If none found than return empty sequence.
61+
*
62+
* <p>The {@link InfluxDBClientOptions#getOrg()} will be used as source organization.</p>
63+
*
64+
* @param query the Flux query to execute
65+
* @return {@link Publisher} of {@link FluxRecord}s
66+
*/
67+
@Nonnull
68+
Publisher<FluxRecord> query(@Nonnull Query query);
69+
5470
/**
5571
* Returns {@link Publisher} emitting {@link FluxRecord}s which are matched the query.
5672
* If none found than return empty sequence.
@@ -62,6 +78,17 @@ public interface QueryReactiveApi {
6278
@Nonnull
6379
Publisher<FluxRecord> query(@Nonnull final String query, @Nonnull final String org);
6480

81+
/**
82+
* Returns {@link Publisher} emitting {@link FluxRecord}s which are matched the query.
83+
* If none found than return empty sequence.
84+
*
85+
* @param query the Flux query to execute
86+
* @param org specifies the source organization
87+
* @return {@link Publisher} of {@link FluxRecord}s
88+
*/
89+
@Nonnull
90+
Publisher<FluxRecord> query(@Nonnull final Query query, @Nonnull final String org);
91+
6592
/**
6693
* Execute a Flux against the Flux service.
6794
*
@@ -90,6 +117,20 @@ <M> Publisher<M> query(@Nonnull final String query,
90117
@Nonnull final String org,
91118
@Nonnull final Class<M> measurementType);
92119

120+
/**
121+
* Execute a Flux against the Flux service.
122+
*
123+
* @param query the flux query to execute
124+
* @param org specifies the source organization
125+
* @param measurementType the class type used to which will be result mapped
126+
* @param <M> the type of the measurement (POJO)
127+
* @return {@link Publisher} emitting a POJO mapped to {@code measurementType} which are matched
128+
* the query or empty sequence if none found.
129+
*/
130+
<M> Publisher<M> query(@Nonnull final Query query,
131+
@Nonnull final String org,
132+
@Nonnull final Class<M> measurementType);
133+
93134
/**
94135
* Returns {@link Publisher} emitting {@link FluxRecord}s which are matched the query.
95136
* If none found than return empty sequence.
@@ -113,6 +154,18 @@ <M> Publisher<M> query(@Nonnull final String query,
113154
@Nonnull
114155
Publisher<FluxRecord> query(@Nonnull final Publisher<String> queryStream, @Nonnull final String org);
115156

157+
/**
158+
* Returns {@link Publisher} emitting {@link FluxRecord}s which are matched the query.
159+
* If none found than return empty sequence.
160+
*
161+
* @param queryStream the Flux query publisher
162+
* @param org specifies the source organization
163+
* @return {@link Publisher} of {@link FluxRecord}s
164+
*/
165+
@Nonnull
166+
Publisher<FluxRecord> queryQuery(@Nonnull Publisher<Query> queryStream,
167+
@Nonnull String org);
168+
116169
/**
117170
* Returns the {@link Publisher} emitting POJO stream.
118171
* <p>
@@ -156,6 +209,17 @@ <M> Publisher<M> query(@Nonnull final Publisher<String> queryStream,
156209
@Nonnull
157210
Publisher<String> queryRaw(@Nonnull final String query);
158211

212+
/**
213+
* Returns {@link Publisher} emitting raw response fromInfluxDB 2.0server line by line.
214+
*
215+
* <p>The {@link InfluxDBClientOptions#getOrg()} will be used as source organization.</p>
216+
*
217+
* @param query the Flux query to execute
218+
* @return {@link Publisher} of response lines
219+
*/
220+
@Nonnull
221+
Publisher<String> queryRaw(@Nonnull Query query);
222+
159223
/**
160224
* Returns {@link Publisher} emitting raw response fromInfluxDB 2.0server line by line.
161225
*
@@ -215,6 +279,20 @@ Publisher<String> queryRaw(@Nonnull final String query,
215279
@Nullable final Dialect dialect,
216280
@Nonnull final String org);
217281

282+
/**
283+
* Returns {@link Publisher} emitting queryRaw response fromInfluxDB 2.0server line by line.
284+
*
285+
* @param dialect Dialect is an object defining the options to use when encoding the response.
286+
* <a href="http://bit.ly/flux-dialect">See dialect SPEC.</a>.
287+
* @param query the Flux query to execute
288+
* @param org specifies the source organization
289+
* @return {@link Publisher} of response lines
290+
*/
291+
@Nonnull
292+
Publisher<String> queryRaw(@Nonnull final Query query,
293+
@Nullable final Dialect dialect,
294+
@Nonnull final String org);
295+
218296
/**
219297
* Returns {@link Publisher} emitting queryRaw response fromInfluxDB 2.0server line by line.
220298
*
@@ -242,4 +320,18 @@ Publisher<String> queryRaw(@Nonnull final Publisher<String> queryStream,
242320
Publisher<String> queryRaw(@Nonnull final Publisher<String> queryStream,
243321
@Nullable final Dialect dialect,
244322
@Nonnull final String org);
323+
324+
/**
325+
* Returns {@link Publisher} emitting queryRaw response fromInfluxDB 2.0server line by line.
326+
*
327+
* @param dialect Dialect is an object defining the options to use when encoding the response.
328+
* <a href="http://bit.ly/flux-dialect">See dialect SPEC.</a>.
329+
* @param queryStream the Flux query publisher
330+
* @param org specifies the source organization
331+
* @return {@link Publisher} of response lines
332+
*/
333+
Publisher<String> queryRawQuery(@Nonnull final Publisher<Query> queryStream,
334+
@Nullable final Dialect dialect,
335+
@Nonnull final String org);
336+
245337
}

client-reactive/src/main/java/com/influxdb/client/reactive/internal/QueryReactiveApiImpl.java

Lines changed: 89 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,26 @@ public Publisher<FluxRecord> query(@Nonnull final String query, @Nonnull final S
7979
return query(Flowable.just(query), org);
8080
}
8181

82+
@Nonnull
83+
@Override
84+
public Publisher<FluxRecord> query(@Nonnull final Query query, @Nonnull final String org) {
85+
86+
Arguments.checkNotNull(query, "Flux query");
87+
Arguments.checkNonEmpty(org, "org");
88+
return queryQuery(Flowable.just(query), org);
89+
}
90+
91+
@Nonnull
92+
93+
@Override
94+
public Publisher<FluxRecord> query(@Nonnull final Query query) {
95+
96+
Arguments.checkNotNull(options.getOrg(), "InfluxDBClientOptions.getOrg");
97+
Arguments.checkNotNull(query, "Flux query");
98+
99+
return queryQuery(Flowable.just(query), options.getOrg());
100+
}
101+
82102
@Override
83103
public <M> Publisher<M> query(@Nonnull final String query, @Nonnull final Class<M> measurementType) {
84104

@@ -99,6 +119,19 @@ public <M> Publisher<M> query(@Nonnull final String query,
99119
return query(Flowable.just(query), org, measurementType);
100120
}
101121

122+
@Override
123+
public <M> Publisher<M> query(@Nonnull final Query query, @Nonnull final String org,
124+
@Nonnull final Class<M> measurementType) {
125+
Arguments.checkNotNull(query, "Flux query");
126+
Arguments.checkNotNull(measurementType, "Measurement type");
127+
Arguments.checkNonEmpty(org, "org");
128+
129+
return Flowable
130+
.fromPublisher(queryQuery(Flowable.just(query), org))
131+
.map(fluxRecord -> resultMapper.toPOJO(fluxRecord, measurementType));
132+
}
133+
134+
102135
@Nonnull
103136
@Override
104137
public Publisher<FluxRecord> query(@Nonnull final Publisher<String> queryStream) {
@@ -112,14 +145,22 @@ public Publisher<FluxRecord> query(@Nonnull final Publisher<String> queryStream)
112145
@Override
113146
public Publisher<FluxRecord> query(@Nonnull final Publisher<String> queryStream,
114147
@Nonnull final String org) {
148+
return queryQuery(Flowable.fromPublisher(queryStream)
149+
.map(query -> new Query().query(query).dialect(AbstractInfluxDBClient.DEFAULT_DIALECT)), org);
150+
}
151+
152+
@Nonnull
153+
@Override
154+
public Publisher<FluxRecord> queryQuery(@Nonnull final Publisher<Query> queryStream,
155+
@Nonnull final String org) {
115156

116157
Arguments.checkNotNull(queryStream, "queryStream");
117158
Arguments.checkNonEmpty(org, "org");
118159

119160
return Flowable
120161
.fromPublisher(queryStream)
121162
.map(it -> service.postQueryResponseBody(null, null,
122-
null, org, null, new Query().query(it).dialect(AbstractInfluxDBClient.DEFAULT_DIALECT)))
163+
null, org, null, it.dialect(AbstractInfluxDBClient.DEFAULT_DIALECT)))
123164
.flatMap(queryCall -> {
124165

125166
Observable<FluxRecord> observable = Observable.create(subscriber -> {
@@ -187,6 +228,15 @@ public Publisher<String> queryRaw(@Nonnull final String query) {
187228
return queryRaw(query, options.getOrg());
188229
}
189230

231+
@Nonnull
232+
@Override
233+
public Publisher<String> queryRaw(@Nonnull final Query query) {
234+
235+
Arguments.checkNotNull(options.getOrg(), "InfluxDBClientOptions.getOrg");
236+
237+
return queryRawQuery(Flowable.just(query), null, options.getOrg());
238+
}
239+
190240
@Nonnull
191241
@Override
192242
public Publisher<String> queryRaw(@Nonnull final String query, @Nonnull final String org) {
@@ -249,34 +299,55 @@ public Publisher<String> queryRaw(@Nonnull final Publisher<String> queryStream,
249299

250300
@Nonnull
251301
@Override
252-
public Publisher<String> queryRaw(@Nonnull final Publisher<String> queryStream,
302+
public Publisher<String> queryRaw(@Nonnull final Query query,
253303
@Nullable final Dialect dialect,
254304
@Nonnull final String org) {
305+
return queryRawQuery(Flowable.just(query), dialect, org);
306+
}
307+
308+
@Nonnull
309+
@Override
310+
public Publisher<String> queryRawQuery(@Nonnull final Publisher<Query> queryStream,
311+
@Nullable final Dialect dialect,
312+
@Nonnull final String org) {
255313

256314
Arguments.checkNotNull(queryStream, "queryStream");
257315
Arguments.checkNonEmpty(org, "org");
258316

259317
return Flowable
260-
.fromPublisher(queryStream)
261-
.map(it -> service.postQueryResponseBody(null, null,
262-
null, org, null, new Query().query(it).dialect(dialect)))
263-
.flatMap(queryCall -> {
318+
.fromPublisher(queryStream)
319+
.map(it -> service.postQueryResponseBody(null, null,
320+
null, org, null, it.dialect(dialect)))
321+
.flatMap(queryCall -> {
264322

265-
Observable<String> observable = Observable.create(subscriber -> {
323+
Observable<String> observable = Observable.create(subscriber -> {
266324

267325

268-
BiConsumer<Cancellable, String> consumer = (cancellable, line) -> {
269-
if (subscriber.isDisposed()) {
270-
cancellable.cancel();
271-
} else {
272-
subscriber.onNext(line);
273-
}
274-
};
326+
BiConsumer<Cancellable, String> consumer = (cancellable, line) -> {
327+
if (subscriber.isDisposed()) {
328+
cancellable.cancel();
329+
} else {
330+
subscriber.onNext(line);
331+
}
332+
};
275333

276-
queryRaw(queryCall, consumer, subscriber::onError, subscriber::onComplete, false);
277-
});
278-
279-
return observable.toFlowable(BackpressureStrategy.BUFFER);
334+
queryRaw(queryCall, consumer, subscriber::onError, subscriber::onComplete, false);
280335
});
336+
337+
return observable.toFlowable(BackpressureStrategy.BUFFER);
338+
});
339+
}
340+
341+
@Nonnull
342+
@Override
343+
public Publisher<String> queryRaw(@Nonnull final Publisher<String> queryStream,
344+
@Nullable final Dialect dialect,
345+
@Nonnull final String org) {
346+
347+
Arguments.checkNotNull(queryStream, "queryStream");
348+
Arguments.checkNonEmpty(org, "org");
349+
350+
return queryRawQuery(Flowable.fromPublisher(queryStream)
351+
.map(q -> new Query().query(q).dialect(dialect)), dialect, org);
281352
}
282353
}

client-reactive/src/test/java/com/influxdb/client/reactive/ITWriteQueryReactiveApi.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import com.influxdb.client.domain.Dialect;
3434
import com.influxdb.client.domain.Permission;
3535
import com.influxdb.client.domain.PermissionResource;
36+
import com.influxdb.client.domain.Query;
3637
import com.influxdb.client.domain.User;
3738
import com.influxdb.client.domain.WritePrecision;
3839
import com.influxdb.client.internal.AbstractInfluxDBClient;
@@ -45,6 +46,7 @@
4546
import org.assertj.core.api.Assertions;
4647
import org.junit.jupiter.api.AfterEach;
4748
import org.junit.jupiter.api.BeforeEach;
49+
import org.junit.jupiter.api.Disabled;
4850
import org.junit.jupiter.api.Test;
4951
import org.junit.platform.runner.JUnitPlatform;
5052
import org.junit.runner.RunWith;
@@ -357,6 +359,35 @@ void queryRawDialect() {
357359
.assertValueAt(2, "");
358360
}
359361

362+
@Test
363+
@Disabled("https://github.com/influxdata/influxdb/issues/16109")
364+
void queryRawParams() {
365+
366+
Publisher<WriteReactiveApi.Success> success = writeClient.writeRecord(WritePrecision.NS,
367+
"h2o_feet,location=coyote_creek level\\ water_level=1.0 1");
368+
369+
Flowable.fromPublisher(success)
370+
.test()
371+
.assertValueCount(1);
372+
373+
Publisher<String> result = queryClient.queryRaw(
374+
new Query().query("from(bucket: params.bucketParam) |> range(start: 1970-01-01T00:00:00.000000001Z)")
375+
.dialect(null).putParamsItem("bucketParam", bucket.getName()));
376+
377+
Flowable.fromPublisher(result)
378+
.test()
379+
.assertValueCount(3)
380+
.assertValueAt(0, ",result,table,_start,_stop,_time,_value,_field,_measurement,location")
381+
.assertValueAt(1, value -> {
382+
383+
Assertions.assertThat(value).endsWith("1970-01-01T00:00:00.000000001Z,1,level water_level,h2o_feet,coyote_creek");
384+
385+
return true;
386+
})
387+
.assertValueAt(2, "");
388+
}
389+
390+
360391
@Test
361392
public void subscribeOn() {
362393

client-scala/src/main/scala/com/influxdb/client/scala/QueryScalaApi.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ import javax.annotation.Nonnull
3030
/**
3131
* The client that allows perform Flux queries against the InfluxDB /api/v2/query endpoint.
3232
*
33+
* For parametrized queries use [[Query]] object, see [[com.influxdb.client.QueryApi]] in Java module
34+
* for more details.
35+
*
3336
* @author Jakub Bednar (bednar@github) (02/11/2018 09:48)
3437
*/
3538
trait QueryScalaApi {

0 commit comments

Comments
 (0)