Skip to content

Commit d0f250f

Browse files
committed
feat: add replication ops to async client
1 parent 0979369 commit d0f250f

13 files changed

Lines changed: 690 additions & 3 deletions

File tree

src/main/java/io/weaviate/client/base/Result.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import java.util.List;
77
import java.util.Objects;
88
import java.util.Optional;
9+
import java.util.concurrent.Future;
910
import java.util.function.Function;
1011
import java.util.stream.Collectors;
1112

@@ -90,6 +91,33 @@ public static <T, R> Result<R> map(Response<T> response, Function<T, R> map) {
9091
return new Result<>(response, body);
9192
}
9293

94+
/**
95+
* Apply {@code map} function to {@code Response::getBody} and return
96+
* {@link Future} with the transformed body.
97+
*
98+
* <p>
99+
* A {@code null}-body is passed as-is.
100+
*
101+
* <p>
102+
* Usage:
103+
*
104+
* <pre>{@code @Override
105+
* public Future<String> run(FutureCallback<Result<String>> callback) {
106+
* // Deserializes into Person.class but returns Person's firstName or null.
107+
* return sendGetRequest("/person", callback, Result.mapParser(Person.class, Person::getFirstName));
108+
* }
109+
* }</pre>
110+
*/
111+
public static <T, R> ResponseParser<R> mapParser(Class<T> cls, Function<T, R> map) {
112+
return new ResponseParser<R>() {
113+
@Override
114+
public Result<R> parse(HttpResponse response, String body, ContentType contentType) {
115+
Response<T> resp = this.serializer.toResponse(response.getCode(), body, cls);
116+
return Result.map(resp, map);
117+
}
118+
};
119+
}
120+
93121
/**
94122
* Convert {@code T[]} response to a {@code List<T>} response.
95123
* This is handy for all request handlers which returns lists,
@@ -109,6 +137,32 @@ public static <T> Result<List<T>> toList(Response<T[]> response) {
109137
return new Result<>(response, Arrays.asList(response.getBody()));
110138
}
111139

140+
/**
141+
* Convert {@code T[]} response to a {@code List<T>} response.
142+
* This is handy for all request handlers which returns lists,
143+
* as the current client does not support deserializing into a parametrized
144+
* {@code List.class}.
145+
*
146+
* <p>
147+
* Usage:
148+
*
149+
* <pre>{@code @Override
150+
* public Future<List<String>> run(FutureCallback<List<String>> callback) {
151+
* return sendGetRequest("/names", callback, Result.toListParser(String[].class));
152+
* }
153+
* }</pre>
154+
*/
155+
public static <T> ResponseParser<List<T>> toListParser(Class<T[]> cls) {
156+
return new ResponseParser<List<T>>() {
157+
158+
@Override
159+
public Result<List<T>> parse(HttpResponse response, String body, ContentType contentType) {
160+
Response<T[]> resp = this.serializer.toResponse(response.getCode(), body, cls);
161+
return Result.toList(resp);
162+
}
163+
};
164+
}
165+
112166
/**
113167
* Convert {@code Result<Void>} response to a {@code Result<Boolean>}.
114168
* The result contains true if status code is in 100-299 range.
Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
package io.weaviate.client.v1.async.cluster;
22

3+
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
4+
35
import io.weaviate.client.Config;
46
import io.weaviate.client.v1.async.cluster.api.NodesStatusGetter;
7+
import io.weaviate.client.v1.async.cluster.api.Replicator;
8+
import io.weaviate.client.v1.async.cluster.api.ShardingStateQuerier;
9+
import io.weaviate.client.v1.async.cluster.api.replication.Replication;
510
import io.weaviate.client.v1.auth.provider.AccessTokenProvider;
611
import lombok.RequiredArgsConstructor;
7-
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
812

913
@RequiredArgsConstructor
1014
public class Cluster {
@@ -13,8 +17,19 @@ public class Cluster {
1317
private final Config config;
1418
private final AccessTokenProvider tokenProvider;
1519

20+
public Replication replication() {
21+
return new Replication(client, config, tokenProvider);
22+
}
1623

1724
public NodesStatusGetter nodesStatusGetter() {
1825
return new NodesStatusGetter(client, config, tokenProvider);
1926
}
27+
28+
public Replicator replicator() {
29+
return new Replicator(client, config, tokenProvider);
30+
}
31+
32+
public ShardingStateQuerier shardingStateQuerier() {
33+
return new ShardingStateQuerier(client, config, tokenProvider);
34+
}
2035
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package io.weaviate.client.v1.async.cluster.api;
2+
3+
import java.util.concurrent.Future;
4+
5+
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
6+
import org.apache.hc.core5.concurrent.FutureCallback;
7+
8+
import com.google.gson.annotations.SerializedName;
9+
10+
import io.weaviate.client.Config;
11+
import io.weaviate.client.base.AsyncBaseClient;
12+
import io.weaviate.client.base.AsyncClientResult;
13+
import io.weaviate.client.base.Result;
14+
import io.weaviate.client.v1.auth.provider.AccessTokenProvider;
15+
import io.weaviate.client.v1.cluster.model.ReplicationType;
16+
import lombok.Getter;
17+
18+
public class Replicator extends AsyncBaseClient<String> implements AsyncClientResult<String> {
19+
private String className;
20+
private String shard;
21+
private String sourceNode;
22+
private String targetNode;
23+
private ReplicationType replicationType;
24+
25+
public Replicator(CloseableHttpAsyncClient httpClient, Config config, AccessTokenProvider tokenProvider) {
26+
super(httpClient, config, tokenProvider);
27+
}
28+
29+
public Replicator withClassName(String className) {
30+
this.className = className;
31+
return this;
32+
}
33+
34+
public Replicator withShard(String shard) {
35+
this.shard = shard;
36+
return this;
37+
}
38+
39+
public Replicator withSourceNode(String sourceNode) {
40+
this.sourceNode = sourceNode;
41+
return this;
42+
}
43+
44+
public Replicator withTargetNode(String targetNode) {
45+
this.targetNode = targetNode;
46+
return this;
47+
}
48+
49+
public Replicator withReplicationType(ReplicationType replicationType) {
50+
this.replicationType = replicationType;
51+
return this;
52+
}
53+
54+
class RequestBody {
55+
@SerializedName("collection")
56+
String className = Replicator.this.className;
57+
@SerializedName("shard")
58+
String shard = Replicator.this.shard;
59+
@SerializedName("sourceNode")
60+
String sourceNode = Replicator.this.sourceNode;
61+
@SerializedName("targetNode")
62+
String targetNode = Replicator.this.targetNode;
63+
@SerializedName("type")
64+
ReplicationType replicationType = Replicator.this.replicationType;
65+
}
66+
67+
@Getter
68+
static class ResponseBody {
69+
@SerializedName("id")
70+
String replicationId;
71+
}
72+
73+
@Override
74+
public Future<Result<String>> run(FutureCallback<Result<String>> callback) {
75+
return sendPostRequest("/replication/replicate", new RequestBody(),
76+
callback, Result.mapParser(ResponseBody.class, ResponseBody::getReplicationId));
77+
78+
}
79+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package io.weaviate.client.v1.async.cluster.api;
2+
3+
import java.util.concurrent.Future;
4+
5+
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
6+
import org.apache.hc.core5.concurrent.FutureCallback;
7+
8+
import com.google.gson.annotations.SerializedName;
9+
10+
import io.weaviate.client.Config;
11+
import io.weaviate.client.base.AsyncBaseClient;
12+
import io.weaviate.client.base.AsyncClientResult;
13+
import io.weaviate.client.base.Result;
14+
import io.weaviate.client.base.util.UrlEncoder;
15+
import io.weaviate.client.v1.auth.provider.AccessTokenProvider;
16+
import io.weaviate.client.v1.cluster.model.ShardingState;
17+
import lombok.Getter;
18+
19+
public class ShardingStateQuerier extends AsyncBaseClient<ShardingState> implements AsyncClientResult<ShardingState> {
20+
private String className;
21+
private String shard;
22+
23+
public ShardingStateQuerier(CloseableHttpAsyncClient httpClient, Config config, AccessTokenProvider tokenProvider) {
24+
super(httpClient, config, tokenProvider);
25+
}
26+
27+
public ShardingStateQuerier withClassName(String className) {
28+
this.className = className;
29+
return this;
30+
}
31+
32+
public ShardingStateQuerier withShard(String shard) {
33+
this.shard = shard;
34+
return this;
35+
}
36+
37+
@Getter
38+
static class ResponseBody {
39+
@SerializedName("shardingState")
40+
ShardingState state;
41+
}
42+
43+
@Override
44+
public Future<Result<ShardingState>> run(FutureCallback<Result<ShardingState>> callback) {
45+
String path = "/replication/sharding-state?" + UrlEncoder.encodeQueryParam("collection", className);
46+
if (shard != null) {
47+
path += "&" + UrlEncoder.encodeQueryParam("shard", shard);
48+
}
49+
return sendGetRequest(path, callback, Result.mapParser(ResponseBody.class, ResponseBody::getState));
50+
}
51+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package io.weaviate.client.v1.async.cluster.api.replication;
2+
3+
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
4+
5+
import io.weaviate.client.Config;
6+
import io.weaviate.client.v1.async.cluster.api.replication.api.ReplicationAllDeleter;
7+
import io.weaviate.client.v1.async.cluster.api.replication.api.ReplicationAllGetter;
8+
import io.weaviate.client.v1.async.cluster.api.replication.api.ReplicationCanceler;
9+
import io.weaviate.client.v1.async.cluster.api.replication.api.ReplicationDeleter;
10+
import io.weaviate.client.v1.async.cluster.api.replication.api.ReplicationGetter;
11+
import io.weaviate.client.v1.async.cluster.api.replication.api.ReplicationQuerier;
12+
import io.weaviate.client.v1.auth.provider.AccessTokenProvider;
13+
import lombok.RequiredArgsConstructor;
14+
15+
@RequiredArgsConstructor
16+
public class Replication {
17+
18+
private final CloseableHttpAsyncClient client;
19+
private final Config config;
20+
private final AccessTokenProvider tokenProvider;
21+
22+
public ReplicationGetter getter() {
23+
return new ReplicationGetter(client, config, tokenProvider);
24+
}
25+
26+
public ReplicationAllGetter allGetter() {
27+
return new ReplicationAllGetter(client, config, tokenProvider);
28+
}
29+
30+
public ReplicationQuerier querier() {
31+
return new ReplicationQuerier(client, config, tokenProvider);
32+
}
33+
34+
public ReplicationCanceler canceler() {
35+
return new ReplicationCanceler(client, config, tokenProvider);
36+
}
37+
38+
public ReplicationDeleter deleter() {
39+
return new ReplicationDeleter(client, config, tokenProvider);
40+
}
41+
42+
public ReplicationAllDeleter allDeleter() {
43+
return new ReplicationAllDeleter(client, config, tokenProvider);
44+
}
45+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package io.weaviate.client.v1.async.cluster.api.replication.api;
2+
3+
import java.util.concurrent.Future;
4+
5+
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
6+
import org.apache.hc.core5.concurrent.FutureCallback;
7+
8+
import io.weaviate.client.Config;
9+
import io.weaviate.client.base.AsyncBaseClient;
10+
import io.weaviate.client.base.AsyncClientResult;
11+
import io.weaviate.client.base.Result;
12+
import io.weaviate.client.v1.auth.provider.AccessTokenProvider;
13+
14+
public class ReplicationAllDeleter extends AsyncBaseClient<Boolean> implements AsyncClientResult<Boolean> {
15+
16+
public ReplicationAllDeleter(CloseableHttpAsyncClient httpClient, Config config, AccessTokenProvider tokenProvider) {
17+
super(httpClient, config, tokenProvider);
18+
}
19+
20+
@Override
21+
public Future<Result<Boolean>> run(FutureCallback<Result<Boolean>> callback) {
22+
return sendDeleteRequest("/replication/replicate", null, callback, Result.voidToBooleanParser());
23+
24+
}
25+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package io.weaviate.client.v1.async.cluster.api.replication.api;
2+
3+
import java.util.List;
4+
import java.util.concurrent.Future;
5+
6+
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
7+
import org.apache.hc.core5.concurrent.FutureCallback;
8+
9+
import io.weaviate.client.Config;
10+
import io.weaviate.client.base.AsyncBaseClient;
11+
import io.weaviate.client.base.AsyncClientResult;
12+
import io.weaviate.client.base.Result;
13+
import io.weaviate.client.v1.auth.provider.AccessTokenProvider;
14+
import io.weaviate.client.v1.cluster.api.replication.model.ReplicateOperation;
15+
16+
public class ReplicationAllGetter extends AsyncBaseClient<List<ReplicateOperation>>
17+
implements AsyncClientResult<List<ReplicateOperation>> {
18+
19+
public ReplicationAllGetter(CloseableHttpAsyncClient httpClient, Config config, AccessTokenProvider tokenProvider) {
20+
super(httpClient, config, tokenProvider);
21+
}
22+
23+
@Override
24+
public Future<Result<List<ReplicateOperation>>> run(FutureCallback<Result<List<ReplicateOperation>>> callback) {
25+
return sendGetRequest("/replication/replicate/list?includeHistory=true", callback,
26+
Result.toListParser(ReplicateOperation[].class));
27+
}
28+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package io.weaviate.client.v1.async.cluster.api.replication.api;
2+
3+
import java.util.concurrent.Future;
4+
5+
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
6+
import org.apache.hc.core5.concurrent.FutureCallback;
7+
8+
import io.weaviate.client.Config;
9+
import io.weaviate.client.base.AsyncBaseClient;
10+
import io.weaviate.client.base.AsyncClientResult;
11+
import io.weaviate.client.base.Result;
12+
import io.weaviate.client.v1.auth.provider.AccessTokenProvider;
13+
14+
public class ReplicationCanceler extends AsyncBaseClient<Boolean> implements AsyncClientResult<Boolean> {
15+
private String uuid;
16+
17+
public ReplicationCanceler(CloseableHttpAsyncClient httpClient, Config config, AccessTokenProvider tokenProvider) {
18+
super(httpClient, config, tokenProvider);
19+
}
20+
21+
public ReplicationCanceler withUuid(String uuid) {
22+
this.uuid = uuid;
23+
return this;
24+
}
25+
26+
@Override
27+
public Future<Result<Boolean>> run(FutureCallback<Result<Boolean>> callback) {
28+
return sendPostRequest("/replication/replicate/" + uuid + "/cancel", null,
29+
callback, Result.voidToBooleanParser());
30+
}
31+
}

0 commit comments

Comments
 (0)