Skip to content

Commit 0979369

Browse files
committed
feat: add integration tests for replication
1 parent ae4d0a3 commit 0979369

6 files changed

Lines changed: 252 additions & 21 deletions

File tree

src/main/java/io/weaviate/client/v1/cluster/Cluster.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import io.weaviate.client.v1.cluster.api.NodesStatusGetter;
66
import io.weaviate.client.v1.cluster.api.Replicator;
77
import io.weaviate.client.v1.cluster.api.ShardingStateQuerier;
8+
import io.weaviate.client.v1.cluster.api.replication.Replication;
89

910
public class Cluster {
1011

@@ -16,6 +17,10 @@ public Cluster(HttpClient httpClient, Config config) {
1617
this.httpClient = httpClient;
1718
}
1819

20+
public Replication replication() {
21+
return new Replication(httpClient, config);
22+
}
23+
1924
public NodesStatusGetter nodesStatusGetter() {
2025
return new NodesStatusGetter(httpClient, config);
2126
}
@@ -27,4 +32,5 @@ public ShardingStateQuerier shardingStateQuerier() {
2732
public Replicator replicator() {
2833
return new Replicator(httpClient, config);
2934
}
35+
3036
}

src/main/java/io/weaviate/client/v1/cluster/api/replication/model/ReplicateOperation.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,11 @@
55
import com.google.gson.annotations.SerializedName;
66

77
import io.weaviate.client.v1.cluster.model.ReplicationType;
8+
import lombok.Getter;
9+
import lombok.ToString;
810

11+
@Getter
12+
@ToString
913
public class ReplicateOperation {
1014
@SerializedName("id")
1115
String uuid;

src/main/java/io/weaviate/client/v1/cluster/api/replication/model/ReplicateOperationStatus.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@
44

55
import com.google.gson.annotations.SerializedName;
66

7+
import lombok.Getter;
8+
import lombok.ToString;
9+
10+
@Getter
11+
@ToString
712
public class ReplicateOperationStatus {
813
@SerializedName("state")
914
ReplicateOperationState state;

src/test/java/io/weaviate/integration/client/WeaviateDockerComposeCluster.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.weaviate.integration.client;
22

33
import java.time.Duration;
4+
45
import org.junit.rules.TestRule;
56
import org.junit.runner.Description;
67
import org.junit.runners.model.Statement;
@@ -35,8 +36,11 @@ public Weaviate(String dockerImageName, String hostname, Boolean isJoining) {
3536
withEnv("RAFT_JOIN", "weaviate-0");
3637
if (isJoining) {
3738
withEnv("CLUSTER_JOIN", "weaviate-0:7110");
38-
waitingFor(Wait.forHttp("/v1/.well-known/ready").forPort(8080).forStatusCode(200).withStartupTimeout(Duration.ofSeconds(10)));
39+
waitingFor(Wait.forHttp("/v1/.well-known/ready").forPort(8080).forStatusCode(200)
40+
.withStartupTimeout(Duration.ofSeconds(10)));
3941
}
42+
43+
withEnv("REPLICA_MOVEMENT_ENABLED", "true");
4044
}
4145
}
4246

src/test/java/io/weaviate/integration/client/cluster/ClientClusterTest.java

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
package io.weaviate.integration.client.cluster;
22

3+
import java.util.function.Supplier;
4+
5+
import org.junit.After;
6+
import org.junit.Before;
7+
import org.junit.ClassRule;
8+
import org.junit.Test;
9+
310
import io.weaviate.client.Config;
411
import io.weaviate.client.WeaviateClient;
512
import io.weaviate.client.base.Result;
@@ -8,12 +15,6 @@
815
import io.weaviate.integration.client.WeaviateDockerCompose;
916
import io.weaviate.integration.client.WeaviateTestGenerics;
1017
import io.weaviate.integration.tests.cluster.ClusterTestSuite;
11-
import org.junit.After;
12-
import org.junit.Before;
13-
import org.junit.ClassRule;
14-
import org.junit.Test;
15-
16-
import java.util.function.Supplier;
1718

1819
public class ClientClusterTest {
1920

@@ -37,43 +38,43 @@ public void after() {
3738
@Test
3839
public void testClusterNodesEndpointWithoutDataWithOutputVerbose() {
3940
Supplier<Result<NodesStatusResponse>> resultSupplier = () -> client.cluster().nodesStatusGetter()
40-
.withOutput(NodeStatusOutput.VERBOSE)
41-
.run();
41+
.withOutput(NodeStatusOutput.VERBOSE)
42+
.run();
4243

4344
ClusterTestSuite.testNoDataOutputVerbose(resultSupplier);
4445
}
4546

4647
@Test
4748
public void testClusterNodesEndpointWithDataWithOutputVerbose() throws InterruptedException {
4849
Supplier<Result<NodesStatusResponse>> resultSupplier = () -> client.cluster().nodesStatusGetter()
49-
.withOutput(NodeStatusOutput.VERBOSE)
50-
.run();
50+
.withOutput(NodeStatusOutput.VERBOSE)
51+
.run();
5152

5253
ClusterTestSuite.testDataOutputVerbose(resultSupplier, testGenerics, client);
5354
}
5455

5556
@Test
5657
public void shouldGetNodeStatusPerClassWithOutputVerbose() throws InterruptedException {
5758
Supplier<Result<NodesStatusResponse>> resultSupplierAll = () -> client.cluster().nodesStatusGetter()
58-
.withOutput(NodeStatusOutput.VERBOSE)
59-
.run();
59+
.withOutput(NodeStatusOutput.VERBOSE)
60+
.run();
6061
Supplier<Result<NodesStatusResponse>> resultSupplierPizza = () -> client.cluster().nodesStatusGetter()
61-
.withOutput(NodeStatusOutput.VERBOSE)
62-
.withClassName("Pizza")
63-
.run();
62+
.withOutput(NodeStatusOutput.VERBOSE)
63+
.withClassName("Pizza")
64+
.run();
6465
Supplier<Result<NodesStatusResponse>> resultSupplierSoup = () -> client.cluster().nodesStatusGetter()
65-
.withOutput(NodeStatusOutput.VERBOSE)
66-
.withClassName("Soup")
67-
.run();
66+
.withOutput(NodeStatusOutput.VERBOSE)
67+
.withClassName("Soup")
68+
.run();
6869

6970
ClusterTestSuite.testDataPerClassOutputVerbose(resultSupplierAll, resultSupplierPizza, resultSupplierSoup,
70-
testGenerics, client);
71+
testGenerics, client);
7172
}
7273

7374
@Test
7475
public void testClusterNodesEndpointWithOutputMinimalImplicit() {
7576
Supplier<Result<NodesStatusResponse>> resultSupplier = () -> client.cluster().nodesStatusGetter()
76-
.run();
77+
.run();
7778

7879
ClusterTestSuite.testNoDataOutputMinimalImplicit(resultSupplier);
7980
}
Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
package io.weaviate.integration.client.cluster;
2+
3+
import static org.junit.jupiter.api.Assumptions.assumeTrue;
4+
5+
import java.util.List;
6+
import java.util.concurrent.CompletableFuture;
7+
import java.util.concurrent.ExecutionException;
8+
import java.util.concurrent.TimeUnit;
9+
import java.util.concurrent.TimeoutException;
10+
import java.util.function.Supplier;
11+
12+
import org.assertj.core.api.Assertions;
13+
import org.assertj.core.util.Arrays;
14+
import org.junit.After;
15+
import org.junit.Before;
16+
import org.junit.ClassRule;
17+
import org.junit.Test;
18+
19+
import io.weaviate.client.Config;
20+
import io.weaviate.client.WeaviateClient;
21+
import io.weaviate.client.base.Result;
22+
import io.weaviate.client.v1.cluster.api.replication.Replication;
23+
import io.weaviate.client.v1.cluster.api.replication.model.ReplicateOperation;
24+
import io.weaviate.client.v1.cluster.api.replication.model.ReplicateOperationState;
25+
import io.weaviate.client.v1.cluster.model.NodeStatusOutput;
26+
import io.weaviate.client.v1.cluster.model.NodesStatusResponse;
27+
import io.weaviate.client.v1.cluster.model.ReplicationType;
28+
import io.weaviate.client.v1.cluster.model.ShardReplicas;
29+
import io.weaviate.client.v1.cluster.model.ShardingState;
30+
import io.weaviate.client.v1.schema.model.WeaviateClass;
31+
import io.weaviate.integration.client.WeaviateDockerComposeCluster;
32+
33+
public class ClientReplicateTest {
34+
@ClassRule
35+
public static WeaviateDockerComposeCluster cluster = new WeaviateDockerComposeCluster();
36+
37+
private static WeaviateClient client;
38+
39+
@Before
40+
public void before() {
41+
Config config = new Config("http", cluster.getHttpHost0Address());
42+
client = new WeaviateClient(config);
43+
}
44+
45+
private static final String CLASSNAME = "ShardDweller";
46+
47+
@After
48+
public void afterEach() {
49+
client.schema().classDeleter().withClassName(CLASSNAME).run();
50+
}
51+
52+
@Test
53+
public void testQueryShardingState() {
54+
// Arrange
55+
Boolean created = client.schema().classCreator()
56+
.withClass(WeaviateClass.builder().className(CLASSNAME).build())
57+
.run().getResult();
58+
assumeTrue(created, "created test collection");
59+
60+
NodesStatusResponse nodes = client.cluster().nodesStatusGetter()
61+
.withClassName(CLASSNAME)
62+
.withOutput(NodeStatusOutput.VERBOSE)
63+
.run().getResult();
64+
65+
assumeTrue(nodes != null, "nodes status result is not null");
66+
assumeTrue(!Arrays.isArrayEmpty(nodes.getNodes()), "there're 1+ nodes in the cluster");
67+
String wantShard = nodes.getNodes()[0].getShards()[0].getName();
68+
69+
ShardingState shardingState;
70+
71+
// Act: query by collection name
72+
shardingState = client.cluster().shardingStateQuerier()
73+
.withClassName(CLASSNAME)
74+
.run().getResult();
75+
Assertions.assertThat(shardingState.getShards())
76+
.as("shard present in the sharding state output (by collection)")
77+
.extracting(ShardReplicas::getName).contains(wantShard);
78+
79+
// Act: query by collection + shard name
80+
shardingState = client.cluster().shardingStateQuerier()
81+
.withClassName(CLASSNAME)
82+
.withShard(wantShard)
83+
.run().getResult();
84+
Assertions.assertThat(shardingState.getShards())
85+
.as("shard present in the sharding state output (by collection+shard)")
86+
.extracting(ShardReplicas::getName).contains(wantShard);
87+
88+
ShardingState inexistent;
89+
// Act: query inexistent
90+
inexistent = client.cluster().shardingStateQuerier()
91+
.withClassName("Unknown")
92+
.run().getResult();
93+
Assertions.assertThat(inexistent).isNull();
94+
}
95+
96+
@Test
97+
/**
98+
* This test starts a replication operation between two nodes,
99+
* queries for its status, then cancels the replication and eventually deletes
100+
* it.
101+
*
102+
* Note that assertions that use {@link #eventually} helper may be flaky.
103+
*/
104+
public void testReplicateLifecycle() {
105+
// Arrange
106+
Boolean created = client.schema().classCreator()
107+
.withClass(WeaviateClass.builder().className(CLASSNAME).build())
108+
.run().getResult();
109+
assumeTrue(created, "created test collection");
110+
111+
NodesStatusResponse nodes = client.cluster().nodesStatusGetter()
112+
.withClassName(CLASSNAME)
113+
.withOutput(NodeStatusOutput.VERBOSE)
114+
.run().getResult();
115+
116+
assumeTrue(nodes != null, "nodes status result is not null");
117+
assumeTrue(nodes.getNodes().length >= 2, "there're 2+ nodes in the cluster");
118+
119+
String srcNode = nodes.getNodes()[0].getName();
120+
String tgtNode = nodes.getNodes()[1].getName();
121+
String wantShard = nodes.getNodes()[0].getShards()[0].getName();
122+
123+
deleteAllReplications(5);
124+
125+
// Act: kick-off replication
126+
String uuid = client.cluster().replicator()
127+
.withClassName(CLASSNAME)
128+
.withShard(wantShard)
129+
.withSourceNode(srcNode)
130+
.withTargetNode(tgtNode)
131+
.run().getResult();
132+
assumeTrue(uuid != null, "replication started with valid uuid");
133+
134+
// Act: get status
135+
ReplicateOperation status_1 = client.cluster().replication().getter()
136+
.withUuid(uuid).run().getResult();
137+
138+
Assertions.assertThat(status_1).isNotNull()
139+
.as("expected replication status")
140+
.returns(CLASSNAME, ReplicateOperation::getClassName)
141+
.returns(wantShard, ReplicateOperation::getShard)
142+
.returns(srcNode, ReplicateOperation::getSourceNode)
143+
.returns(tgtNode, ReplicateOperation::getTargetNode)
144+
.returns(ReplicationType.COPY, ReplicateOperation::getTransferType)
145+
.returns(null, ReplicateOperation::getStatusHistory)
146+
.extracting(ReplicateOperation::getStatus).isNotNull();
147+
148+
// Act: get status with history
149+
ReplicateOperation status_2 = client.cluster().replication().getter()
150+
.withUuid(uuid).includeHistory(true)
151+
.run().getResult();
152+
153+
Assertions.assertThat(status_2).isNotNull()
154+
.as("includes replication status history")
155+
.extracting(ReplicateOperation::getStatusHistory).isNotNull();
156+
157+
// Act: query status
158+
List<ReplicateOperation> operations = client.cluster().replication().querier()
159+
.withClassName(CLASSNAME).withShard(wantShard).withTargetNode(tgtNode)
160+
.run().getResult();
161+
162+
Assertions.assertThat(operations).as("no. replications").hasSize(1);
163+
164+
// Act: cancel
165+
Result<Boolean> cancel = client.cluster().replication().canceler().withUuid(uuid).run();
166+
Assertions.assertThat(cancel).as("cancel error").returns(null, Result::getError);
167+
168+
eventually(() -> client.cluster().replication().getter().withUuid(uuid).run().getResult()
169+
.getStatus().getState() == ReplicateOperationState.CANCELLED,
170+
25, "replication was not cancelled");
171+
172+
// Act: delete
173+
Result<Boolean> delete = client.cluster().replication().deleter().withUuid(uuid).run();
174+
Assertions.assertThat(delete).as("delete error").returns(null, Result::getError);
175+
176+
eventually(() -> client.cluster().replication().allGetter().run().getResult().isEmpty(),
177+
15, "replication was not deleted");
178+
}
179+
180+
private static void deleteAllReplications(int timeoutSeconds) {
181+
Replication replication = client.cluster().replication();
182+
replication.allDeleter().run();
183+
eventually(() -> replication.allGetter().run().getResult().isEmpty(),
184+
timeoutSeconds,
185+
"did not delete existing replications");
186+
}
187+
188+
private static void eventually(Supplier<Boolean> cond, int timeoutSeconds, String... message) {
189+
CompletableFuture<?> check = CompletableFuture.runAsync(() -> {
190+
while (!Thread.currentThread().isInterrupted() && !cond.get()) {
191+
try {
192+
Thread.sleep(500);
193+
} catch (InterruptedException ex) {
194+
Thread.currentThread().interrupt();
195+
}
196+
}
197+
});
198+
199+
try {
200+
check.get(timeoutSeconds, TimeUnit.SECONDS);
201+
} catch (TimeoutException ex) {
202+
check.cancel(true);
203+
Assertions.fail(message.length >= 0 ? message[0] : null, ex);
204+
} catch (InterruptedException ex) {
205+
Thread.currentThread().interrupt();
206+
Assertions.fail(ex);
207+
} catch (ExecutionException ex) {
208+
throw new RuntimeException(ex);
209+
}
210+
}
211+
}

0 commit comments

Comments
 (0)