Skip to content

Commit 318e5c2

Browse files
Support CDC api (#1623)
Signed-off-by: huanghaoyuanhhy <[email protected]>
1 parent c4029da commit 318e5c2

8 files changed

Lines changed: 598 additions & 0 deletions

File tree

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package io.milvus.v2;
21+
22+
import io.milvus.v2.client.ConnectConfig;
23+
import io.milvus.v2.client.MilvusClientV2;
24+
import io.milvus.v2.service.cdc.request.CrossClusterTopology;
25+
import io.milvus.v2.service.cdc.request.MilvusCluster;
26+
import io.milvus.v2.service.cdc.request.ReplicateConfiguration;
27+
import io.milvus.v2.service.cdc.request.UpdateReplicateConfigurationReq;
28+
29+
import java.util.ArrayList;
30+
import java.util.List;
31+
32+
public class CDCExample {
33+
private static final String clusterAURI = "http://192.168.1.1:19530";
34+
private static final String clusterBURI = "http://192.168.1.1:19500";
35+
36+
private static final String clusterAId = "cdc-test-upstream";
37+
private static final String clusterBId = "cdc-test-downstream";
38+
39+
private static final Integer pchannelNum = 16;
40+
41+
private static List<String> generatePChannels(String clusterId) {
42+
List<String> pchannels = new ArrayList<>(pchannelNum);
43+
for (int i = 0; i < pchannelNum; i++) {
44+
pchannels.add(clusterId + "-rootcoord-dml_" + i);
45+
}
46+
return pchannels;
47+
}
48+
49+
public static void main(String[] args) {
50+
ConnectConfig clusterA = ConnectConfig.builder()
51+
.uri(clusterAURI)
52+
.build();
53+
MilvusClientV2 clusterAClient = new MilvusClientV2(clusterA);
54+
55+
ConnectConfig clusterB = ConnectConfig.builder()
56+
.uri(clusterBURI)
57+
.build();
58+
MilvusClientV2 clusterBClient = new MilvusClientV2(clusterB);
59+
60+
MilvusCluster milvusClusterA = MilvusCluster.builder()
61+
.clusterId(clusterAId)
62+
.uri(clusterAURI)
63+
.pchannels(generatePChannels(clusterAId))
64+
.build();
65+
MilvusCluster milvusClusterB = MilvusCluster.builder()
66+
.clusterId(clusterBId)
67+
.uri(clusterBURI)
68+
.pchannels(generatePChannels(clusterBId))
69+
.build();
70+
71+
CrossClusterTopology topology = CrossClusterTopology.builder()
72+
.sourceClusterId(clusterAId)
73+
.targetClusterId(clusterBId)
74+
.build();
75+
76+
ReplicateConfiguration configuration = ReplicateConfiguration.builder()
77+
.clusters(new ArrayList<MilvusCluster>(){{ add(milvusClusterA); add(milvusClusterB); }})
78+
.crossClusterTopologies(new ArrayList<CrossClusterTopology>(){{ add(topology); }} )
79+
.build();
80+
81+
UpdateReplicateConfigurationReq updateReq = UpdateReplicateConfigurationReq.builder()
82+
.replicateConfiguration(configuration)
83+
.build();
84+
85+
clusterAClient.updateReplicateConfiguration(updateReq);
86+
clusterBClient.updateReplicateConfiguration(updateReq);
87+
}
88+
}

sdk-core/src/main/java/io/milvus/v2/client/MilvusClientV2.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@
2525
import io.milvus.orm.iterator.SearchIterator;
2626
import io.milvus.orm.iterator.SearchIteratorV2;
2727

28+
import io.milvus.v2.service.cdc.CDCService;
29+
import io.milvus.v2.service.cdc.request.*;
30+
import io.milvus.v2.service.cdc.response.*;
2831
import io.milvus.v2.service.database.DatabaseService;
2932
import io.milvus.v2.service.database.request.*;
3033
import io.milvus.v2.service.database.response.*;
@@ -74,6 +77,7 @@ public class MilvusClientV2 {
7477
private final RBACService rbacService = new RBACService();
7578
private final ResourceGroupService rgroupService = new ResourceGroupService();
7679
private final UtilityService utilityService = new UtilityService();
80+
private final CDCService cdcService = new CDCService();
7781
private RpcUtils rpcUtils = new RpcUtils();
7882
private ConnectConfig connectConfig;
7983

@@ -1053,6 +1057,10 @@ public CheckHealthResp checkHealth() {
10531057
return rpcUtils.retry(()->utilityService.checkHealth(this.getRpcStub()));
10541058
}
10551059

1060+
public UpdateReplicateConfigurationResp updateReplicateConfiguration(UpdateReplicateConfigurationReq request) {
1061+
return rpcUtils.retry(()->cdcService.updateReplicateConfiguration(this.getRpcStub(), request));
1062+
}
1063+
10561064
/**
10571065
* Disconnects from a Milvus server with configurable timeout
10581066
*
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package io.milvus.v2.service.cdc;
21+
22+
import io.milvus.grpc.MilvusServiceGrpc;
23+
import io.milvus.grpc.Status;
24+
import io.milvus.grpc.UpdateReplicateConfigurationRequest;
25+
import io.milvus.v2.service.BaseService;
26+
import io.milvus.v2.service.cdc.request.UpdateReplicateConfigurationReq;
27+
import io.milvus.v2.service.cdc.response.UpdateReplicateConfigurationResp;
28+
29+
public class CDCService extends BaseService {
30+
public UpdateReplicateConfigurationResp updateReplicateConfiguration(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, UpdateReplicateConfigurationReq requestParam) {
31+
UpdateReplicateConfigurationRequest request = UpdateReplicateConfigurationRequest.newBuilder()
32+
.setReplicateConfiguration(requestParam.getReplicateConfiguration().toGRPC())
33+
.build();
34+
35+
String title = "UpdateReplicateConfiguration";
36+
37+
Status response = blockingStub.updateReplicateConfiguration(request);
38+
rpcUtils.handleResponse(title, response);
39+
return UpdateReplicateConfigurationResp.builder().build();
40+
}
41+
}
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package io.milvus.v2.service.cdc.request;
21+
22+
23+
import java.util.Objects;
24+
25+
26+
public class CrossClusterTopology {
27+
private String sourceClusterId;
28+
private String targetClusterId;
29+
30+
public io.milvus.grpc.CrossClusterTopology toGRPC() {
31+
return io.milvus.grpc.CrossClusterTopology.newBuilder()
32+
.setSourceClusterId(this.sourceClusterId)
33+
.setTargetClusterId(this.targetClusterId)
34+
.build();
35+
}
36+
37+
public String getSourceClusterId() {
38+
return sourceClusterId;
39+
}
40+
41+
public void setSourceClusterId(String sourceClusterId) {
42+
this.sourceClusterId = sourceClusterId;
43+
}
44+
45+
public String getTargetClusterId() {
46+
return targetClusterId;
47+
}
48+
49+
public void setTargetClusterId(String targetClusterId) {
50+
this.targetClusterId = targetClusterId;
51+
}
52+
53+
@Override
54+
public boolean equals(Object o) {
55+
if (o == null || getClass() != o.getClass()) return false;
56+
CrossClusterTopology topology = (CrossClusterTopology) o;
57+
return Objects.equals(sourceClusterId, topology.sourceClusterId) && Objects.equals(targetClusterId, topology.targetClusterId);
58+
}
59+
60+
@Override
61+
public int hashCode() {
62+
return Objects.hash(sourceClusterId, targetClusterId);
63+
}
64+
65+
@Override
66+
public String toString() {
67+
return "CrossClusterTopology{" +
68+
"sourceClusterId='" + sourceClusterId + '\'' +
69+
", targetClusterId='" + targetClusterId + '\'' +
70+
'}';
71+
}
72+
73+
private CrossClusterTopology(Builder builder) {
74+
this.sourceClusterId = builder.sourceClusterId;
75+
this.targetClusterId = builder.targetClusterId;
76+
}
77+
78+
public static class Builder {
79+
private String sourceClusterId;
80+
private String targetClusterId;
81+
82+
public Builder sourceClusterId(String sourceClusterId) {
83+
this.sourceClusterId = sourceClusterId;
84+
return this;
85+
}
86+
87+
public Builder targetClusterId(String targetClusterId) {
88+
this.targetClusterId = targetClusterId;
89+
return this;
90+
}
91+
92+
public CrossClusterTopology build() {
93+
return new CrossClusterTopology(this);
94+
}
95+
}
96+
97+
public static Builder builder() {
98+
return new Builder();
99+
}
100+
}

0 commit comments

Comments
 (0)