Skip to content

Commit e1aa252

Browse files
committed
feat: implemented invoking scrip
1 parent 141a595 commit e1aa252

9 files changed

Lines changed: 550 additions & 5 deletions

File tree

client-core/src/main/java/com/influxdb/internal/AbstractQueryApi.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public abstract class AbstractQueryApi extends AbstractRestClient {
6464

6565
private static final Logger LOG = Logger.getLogger(AbstractQueryApi.class.getName());
6666

67-
protected final FluxCsvParser fluxCsvParser = new FluxCsvParser();
67+
protected final FluxCsvParser fluxCsvParser;
6868
protected final FluxResultMapper resultMapper = new FluxResultMapper();
6969

7070
protected static final Runnable EMPTY_ACTION = () -> {
@@ -89,6 +89,14 @@ public abstract class AbstractQueryApi extends AbstractRestClient {
8989
}
9090
};
9191

92+
protected AbstractQueryApi() {
93+
this(new FluxCsvParser());
94+
}
95+
96+
protected AbstractQueryApi(@Nonnull final FluxCsvParser fluxCsvParser) {
97+
this.fluxCsvParser = fluxCsvParser;
98+
}
99+
92100
@Nonnull
93101
protected RequestBody createBody(@Nullable final String dialect, @Nonnull final String query) {
94102

client-core/src/main/java/com/influxdb/query/internal/FluxCsvParser.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ private enum ParsingState {
7070
IN_ERROR
7171
}
7272

73-
enum ResponseMetadataMode {
73+
public enum ResponseMetadataMode {
7474
FULL,
7575

7676
// useful for Invocable scripts
@@ -123,7 +123,7 @@ public FluxCsvParser() {
123123
this(ResponseMetadataMode.FULL);
124124
}
125125

126-
FluxCsvParser(@Nonnull final ResponseMetadataMode responseMetadataMode) {
126+
public FluxCsvParser(@Nonnull final ResponseMetadataMode responseMetadataMode) {
127127
Arguments.checkNotNull(responseMetadataMode, "ResponseMetadataMode");
128128
this.responseMetadataMode = responseMetadataMode;
129129
}

client/src/generated/java/com/influxdb/client/service/InvocableScriptsService.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,4 +97,19 @@ Call<String> postScriptsIDInvoke(
9797
@retrofit2.http.Path("scriptID") String scriptID, @retrofit2.http.Body ScriptInvocationParams scriptInvocationParams
9898
);
9999

100+
/**
101+
* Invoke a script
102+
* Invokes a script and substitutes &#x60;params&#x60; keys referenced in the script with &#x60;params&#x60; key-values sent in the request body.
103+
* @param scriptID (required)
104+
* @param scriptInvocationParams (optional)
105+
* @return Call&lt;ResponseBody&gt;
106+
*/
107+
@Headers({
108+
"Content-Type:application/json"
109+
})
110+
@POST("api/v2/scripts/{scriptID}/invoke")
111+
Call<ResponseBody> postScriptsIDInvokeResponseBody(
112+
@retrofit2.http.Path("scriptID") String scriptID, @retrofit2.http.Body ScriptInvocationParams scriptInvocationParams
113+
);
114+
100115
}

client/src/main/java/com/influxdb/client/InvocableScriptsApi.java

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,19 @@
2222
package com.influxdb.client;
2323

2424
import java.util.List;
25+
import java.util.Map;
26+
import java.util.function.BiConsumer;
27+
import java.util.function.Consumer;
2528
import javax.annotation.Nonnull;
29+
import javax.annotation.Nullable;
2630
import javax.annotation.concurrent.ThreadSafe;
2731

32+
import com.influxdb.Cancellable;
2833
import com.influxdb.client.domain.Script;
2934
import com.influxdb.client.domain.ScriptCreateRequest;
3035
import com.influxdb.client.domain.ScriptUpdateRequest;
36+
import com.influxdb.query.FluxRecord;
37+
import com.influxdb.query.FluxTable;
3138

3239
/**
3340
* Use API invokable scripts to create custom InfluxDB API endpoints that query, process, and shape data.
@@ -61,13 +68,15 @@ public interface InvocableScriptsApi {
6168

6269
/**
6370
* List scripts.
71+
*
6472
* @return scripts
6573
*/
6674
@Nonnull
6775
List<Script> findScripts();
6876

6977
/**
7078
* List scripts.
79+
*
7180
* @param query to filtering results
7281
* @return scripts
7382
*/
@@ -80,4 +89,139 @@ public interface InvocableScriptsApi {
8089
* @param scriptId The ID of the script to delete. (required)
8190
*/
8291
void deleteScript(@Nonnull final String scriptId);
92+
93+
/**
94+
* Executes the script and synchronously map whole response to {@code List<FluxTable>}.
95+
* <p>
96+
* NOTE: This method is not intended for large query results.
97+
* Use {@link InvocableScriptsApi#invokeScript(String, Map, BiConsumer, Consumer, Runnable)}
98+
* for large data streaming.
99+
*
100+
* @param scriptId The ID of the script to invoke. (required)
101+
* @param params bind parameters
102+
* @return {@code List<FluxTable>}
103+
*/
104+
@Nonnull
105+
List<FluxTable> invokeScript(@Nonnull final String scriptId, @Nullable final Map<String, Object> params);
106+
107+
/**
108+
* Executes the script and asynchronously stream {@link FluxRecord}s to {@code onNext} consumer.
109+
*
110+
* @param scriptId The ID of the script to invoke. (required)
111+
* @param params bind parameters
112+
* @param onNext the callback to consume the FluxRecord result with capability
113+
* to discontinue a streaming invocation
114+
*/
115+
void invokeScript(@Nonnull final String scriptId,
116+
@Nullable final Map<String, Object> params,
117+
@Nonnull final BiConsumer<Cancellable, FluxRecord> onNext);
118+
119+
/**
120+
* Executes the script and asynchronously stream {@link FluxRecord}s to {@code onNext} consumer.
121+
*
122+
* @param scriptId The ID of the script to invoke. (required)
123+
* @param params bind parameters
124+
* @param onNext the callback to consume the FluxRecord result with capability
125+
* to discontinue a streaming invocation
126+
* @param onError the callback to consume any error notification
127+
* @param onComplete the callback to consume a notification about successfully end of stream
128+
*/
129+
void invokeScript(@Nonnull final String scriptId,
130+
@Nullable final Map<String, Object> params,
131+
@Nonnull final BiConsumer<Cancellable, FluxRecord> onNext,
132+
@Nonnull final Consumer<? super Throwable> onError,
133+
@Nonnull final Runnable onComplete);
134+
135+
/**
136+
* Executes the script and synchronously map whole response to list of object with given type. <p>
137+
* <p>
138+
* NOTE: This method is not intended for large query results.
139+
* Use {@link InvocableScriptsApi#invokeScript(String, Map, Class, BiConsumer, Consumer, Runnable)}
140+
* for large data streaming.
141+
*
142+
* @param scriptId The ID of the script to invoke. (required)
143+
* @param params bind parameters
144+
* @param measurementType the type of measurement
145+
* @param <M> the type of the measurement (POJO)
146+
* @return {@code List<T>}
147+
*/
148+
@Nonnull
149+
<M> List<M> invokeScript(@Nonnull final String scriptId,
150+
@Nullable final Map<String, Object> params,
151+
@Nonnull final Class<M> measurementType);
152+
153+
/**
154+
* Executes the script and asynchronously stream POJO classes to {@code onNext} consumer.
155+
*
156+
* @param scriptId The ID of the script to invoke. (required)
157+
* @param params bind parameters
158+
* @param measurementType the type of measurement
159+
* @param onNext the callback to consume the mapped Measurements with capability to discontinue
160+
* a streaming invocation
161+
* @param <M> the type of the measurement (POJO)
162+
*/
163+
<M> void invokeScript(@Nonnull final String scriptId,
164+
@Nullable final Map<String, Object> params,
165+
@Nonnull final Class<M> measurementType,
166+
@Nonnull final BiConsumer<Cancellable, M> onNext);
167+
168+
/**
169+
* Executes the script and asynchronously stream POJO classes to {@code onNext} consumer.
170+
*
171+
* @param scriptId The ID of the script to invoke. (required)
172+
* @param params bind parameters
173+
* @param measurementType the type of measurement
174+
* @param onNext the callback to consume the mapped Measurements with capability to discontinue
175+
* a streaming invocation
176+
* @param onError the callback to consume any error notification
177+
* @param onComplete the callback to consume a notification about successfully end of stream
178+
* @param <M> the type of the measurement (POJO)
179+
*/
180+
<M> void invokeScript(@Nonnull final String scriptId,
181+
@Nullable final Map<String, Object> params,
182+
@Nonnull final Class<M> measurementType,
183+
@Nonnull final BiConsumer<Cancellable, M> onNext,
184+
@Nonnull final Consumer<? super Throwable> onError,
185+
@Nonnull final Runnable onComplete);
186+
187+
/**
188+
* Executes the script and synchronously map whole response to {@link String} result.
189+
* <p>
190+
* NOTE: This method is not intended for large query results.
191+
* Use {@link InvocableScriptsApi#invokeScriptRaw(String, Map, BiConsumer, Consumer, Runnable)}
192+
* for large data streaming.
193+
*
194+
* @param scriptId The ID of the script to invoke. (required)
195+
* @param params bind parameters
196+
* @return the raw response that matched the invocation
197+
*/
198+
@Nonnull
199+
String invokeScriptRaw(@Nonnull final String scriptId,
200+
@Nullable final Map<String, Object> params);
201+
202+
/**
203+
* Executes the script and asynchronously stream response (line by line) to {@code onResponse}.
204+
*
205+
* @param scriptId The ID of the script to invoke. (required)
206+
* @param params bind parameters
207+
* @param onResponse callback to consume the response line by line with capability to discontinue a streaming query
208+
*/
209+
void invokeScriptRaw(@Nonnull final String scriptId,
210+
@Nullable final Map<String, Object> params,
211+
@Nonnull final BiConsumer<Cancellable, String> onResponse);
212+
213+
/**
214+
* Executes the script and asynchronously stream response (line by line) to {@code onResponse}.
215+
*
216+
* @param scriptId The ID of the script to invoke. (required)
217+
* @param params bind parameters
218+
* @param onResponse callback to consume the response line by line with capability to discontinue a streaming query
219+
* @param onError the callback to consume any error notification
220+
* @param onComplete the callback to consume a notification about successfully end of stream
221+
*/
222+
void invokeScriptRaw(@Nonnull final String scriptId,
223+
@Nullable final Map<String, Object> params,
224+
@Nonnull final BiConsumer<Cancellable, String> onResponse,
225+
@Nonnull final Consumer<? super Throwable> onError,
226+
@Nonnull final Runnable onComplete);
83227
}

client/src/main/java/com/influxdb/client/QueryApi.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ public interface QueryApi {
107107
* @see <a href="https://docs.influxdata.com/influxdb/cloud/query-data/parameterized-queries/">InfluxDB Cloud
108108
* Parametrized Queries</a>
109109
*/
110+
@Nonnull
110111
List<FluxTable> query(@Nonnull final String query, @Nonnull final String org,
111112
@Nullable Map<String, Object> params);
112113

0 commit comments

Comments
 (0)