Skip to content

Commit b6bf1ee

Browse files
committed
chore: use async HTTP call
1 parent 798ff29 commit b6bf1ee

2 files changed

Lines changed: 23 additions & 10 deletions

File tree

client/src/main/java/com/influxdb/client/internal/AbstractWriteClient.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import io.reactivex.subjects.PublishSubject;
5858
import org.reactivestreams.Publisher;
5959
import retrofit2.Call;
60+
import retrofit2.Callback;
6061
import retrofit2.HttpException;
6162
import retrofit2.Response;
6263

@@ -450,12 +451,21 @@ public Maybe<Notification<Response>> apply(final BatchWriteItem batchWrite) {
450451
String bucket = batchWrite.batchWriteOptions.bucket;
451452
WritePrecision precision = batchWrite.batchWriteOptions.precision;
452453

453-
Maybe<Response<Void>> requestSource = Maybe
454-
.fromCallable(() -> service
455-
.postWrite(organization, bucket, content, null,
456-
"identity", "text/plain; charset=utf-8", null,
457-
"application/json", null, precision))
458-
.map(Call::execute);
454+
Maybe<Response<Void>> requestSource = Maybe.create(emitter -> service
455+
.postWrite(organization, bucket, content, null,
456+
"identity", "text/plain; charset=utf-8", null,
457+
"application/json", null, precision)
458+
.enqueue(new Callback<Void>() {
459+
@Override
460+
public void onResponse(@Nonnull final Call<Void> call, @Nonnull final Response<Void> response) {
461+
emitter.onSuccess(response);
462+
}
463+
464+
@Override
465+
public void onFailure(@Nonnull final Call<Void> call, @Nonnull final Throwable throwable) {
466+
emitter.onError(throwable);
467+
}
468+
}));
459469

460470
return requestSource
461471
//

client/src/test/java/com/influxdb/client/ITWriteQueryApi.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -461,12 +461,15 @@ void queryDataFromNewOrganization() {
461461
}
462462

463463
@Test
464-
void flush() throws InterruptedException {
464+
void flush() {
465465

466466
String bucketName = bucket.getName();
467467

468-
writeApi = influxDBClient.makeWriteApi(WriteOptions.builder().batchSize(100_000)
469-
.flushInterval(100_000).build());
468+
writeApi = influxDBClient.makeWriteApi(WriteOptions.builder().batchSize(1_000_000)
469+
.flushInterval(1_000_000).build());
470+
471+
WriteEventListener<WriteSuccessEvent> successListener = new WriteEventListener<>();
472+
writeApi.listenEvents(WriteSuccessEvent.class, successListener);
470473

471474
String record = "h2o_feet,location=coyote_creek level\\ water_level=1.0 1";
472475

@@ -476,7 +479,7 @@ void flush() throws InterruptedException {
476479
Assertions.assertThat(query).hasSize(0);
477480

478481
writeApi.flush();
479-
Thread.sleep(10);
482+
waitToCallback(successListener.countDownLatch, 10);
480483

481484
query = queryApi.query("from(bucket:\"" + bucketName + "\") |> range(start: 1970-01-01T00:00:00.000000001Z)", organization.getId());
482485

0 commit comments

Comments
 (0)