Skip to content

Commit 5060e10

Browse files
authored
Allows upsert and autoid=true for MilvusClientV1 (#1505)
Signed-off-by: yhmo <[email protected]>
1 parent eb54f27 commit 5060e10

7 files changed

Lines changed: 388 additions & 20 deletions

File tree

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package io.milvus.v1;
21+
22+
import com.google.gson.Gson;
23+
import com.google.gson.JsonObject;
24+
import io.milvus.client.MilvusClient;
25+
import io.milvus.client.MilvusServiceClient;
26+
import io.milvus.common.clientenum.ConsistencyLevelEnum;
27+
import io.milvus.grpc.DataType;
28+
import io.milvus.grpc.MutationResult;
29+
import io.milvus.grpc.QueryResults;
30+
import io.milvus.param.*;
31+
import io.milvus.param.collection.*;
32+
import io.milvus.param.dml.InsertParam;
33+
import io.milvus.param.dml.QueryParam;
34+
import io.milvus.param.dml.UpsertParam;
35+
import io.milvus.param.index.CreateIndexParam;
36+
import io.milvus.response.QueryResultsWrapper;
37+
38+
import java.util.ArrayList;
39+
import java.util.Arrays;
40+
import java.util.Collections;
41+
import java.util.List;
42+
43+
public class UpsertExample {
44+
private static final MilvusClient client;
45+
46+
static {
47+
ConnectParam connectParam = ConnectParam.newBuilder()
48+
.withHost("localhost")
49+
.withPort(19530)
50+
.build();
51+
client = new MilvusServiceClient(connectParam);
52+
}
53+
private static final String COLLECTION_NAME = "java_sdk_example_upsert_v1";
54+
private static final String ID_FIELD = "pk";
55+
private static final String VECTOR_FIELD = "vector";
56+
private static final String TEXT_FIELD = "text";
57+
private static final Integer VECTOR_DIM = 128;
58+
59+
private static void queryWithExpr(String expr) {
60+
R<QueryResults> queryRet = client.query(QueryParam.newBuilder()
61+
.withCollectionName(COLLECTION_NAME)
62+
.withExpr(expr)
63+
.withOutFields(Arrays.asList(ID_FIELD, TEXT_FIELD))
64+
.withConsistencyLevel(ConsistencyLevelEnum.STRONG)
65+
.build());
66+
QueryResultsWrapper queryWrapper = new QueryResultsWrapper(queryRet.getData());
67+
System.out.println("\nQuery with expression: " + expr);
68+
List<QueryResultsWrapper.RowRecord> records = queryWrapper.getRowRecords();
69+
for (QueryResultsWrapper.RowRecord record : records) {
70+
System.out.println(record);
71+
}
72+
}
73+
74+
private static List<Long> createCollection(boolean autoID) {
75+
// Define fields
76+
List<FieldType> fieldsSchema = Arrays.asList(
77+
FieldType.newBuilder()
78+
.withName(ID_FIELD)
79+
.withDataType(DataType.Int64)
80+
.withPrimaryKey(true)
81+
.withAutoID(autoID)
82+
.build(),
83+
FieldType.newBuilder()
84+
.withName(VECTOR_FIELD)
85+
.withDataType(DataType.FloatVector)
86+
.withDimension(VECTOR_DIM)
87+
.build(),
88+
FieldType.newBuilder()
89+
.withName(TEXT_FIELD)
90+
.withDataType(DataType.VarChar)
91+
.withMaxLength(100)
92+
.build()
93+
);
94+
95+
CollectionSchemaParam collectionSchemaParam = CollectionSchemaParam.newBuilder()
96+
.withFieldTypes(fieldsSchema)
97+
.build();
98+
99+
// Drop the collection if exists
100+
client.dropCollection(DropCollectionParam.newBuilder()
101+
.withCollectionName(COLLECTION_NAME)
102+
.build());
103+
104+
// Create the collection with 3 fields
105+
R<RpcStatus> ret = client.createCollection(CreateCollectionParam.newBuilder()
106+
.withCollectionName(COLLECTION_NAME)
107+
.withSchema(collectionSchemaParam)
108+
.build());
109+
CommonUtils.handleResponseStatus(ret);
110+
111+
// Specify an index type on the vector field.
112+
ret = client.createIndex(CreateIndexParam.newBuilder()
113+
.withCollectionName(COLLECTION_NAME)
114+
.withFieldName(VECTOR_FIELD)
115+
.withIndexType(IndexType.FLAT)
116+
.withMetricType(MetricType.L2)
117+
.build());
118+
CommonUtils.handleResponseStatus(ret);
119+
120+
// Call loadCollection() to enable automatically loading data into memory for searching
121+
client.loadCollection(LoadCollectionParam.newBuilder()
122+
.withCollectionName(COLLECTION_NAME)
123+
.build());
124+
System.out.println("\nCollection created with autoID = " + autoID);
125+
126+
// insert rows
127+
Gson gson = new Gson();
128+
List<JsonObject> rows = new ArrayList<>();
129+
for (int i = 0; i < 100; i++) {
130+
JsonObject row = new JsonObject();
131+
if (!autoID) {
132+
row.addProperty(ID_FIELD, i);
133+
}
134+
List<Float> vector = CommonUtils.generateFloatVector(VECTOR_DIM);
135+
row.add(VECTOR_FIELD, gson.toJsonTree(vector));
136+
row.addProperty(TEXT_FIELD, String.format("text_%d", i));
137+
rows.add(row);
138+
}
139+
R<MutationResult> resp = client.insert(InsertParam.newBuilder()
140+
.withCollectionName(COLLECTION_NAME)
141+
.withRows(rows)
142+
.build());
143+
CommonUtils.handleResponseStatus(resp);
144+
return resp.getData().getIDs().getIntId().getDataList();
145+
}
146+
147+
private static void doUpsert(boolean autoID) {
148+
// if autoID is true, the collection primary key is auto-generated by server
149+
List<Long> ids = createCollection(autoID);
150+
151+
// query before upsert
152+
Long testID = ids.get(1);
153+
String filter = String.format("%s == %d", ID_FIELD, testID);
154+
queryWithExpr(filter);
155+
156+
// upsert
157+
// the server will return a new primary key, the old entity is deleted,
158+
// and a new entity is created with the new primary key
159+
Gson gson = new Gson();
160+
JsonObject row = new JsonObject();
161+
row.addProperty(ID_FIELD, testID);
162+
List<Float> vector = CommonUtils.generateFloatVector(VECTOR_DIM);
163+
row.add(VECTOR_FIELD, gson.toJsonTree(vector));
164+
row.addProperty(TEXT_FIELD, "this field has been updated");
165+
R<MutationResult> upsertResp = client.upsert(UpsertParam.newBuilder()
166+
.withCollectionName(COLLECTION_NAME)
167+
.withRows(Collections.singletonList(row))
168+
.build());
169+
CommonUtils.handleResponseStatus(upsertResp);
170+
List<Long> newIds = upsertResp.getData().getIDs().getIntId().getDataList();
171+
Long newID = newIds.get(0);
172+
System.out.println("\nUpsert done");
173+
174+
// query after upsert
175+
filter = String.format("%s == %d", ID_FIELD, newID);
176+
queryWithExpr(filter);
177+
}
178+
179+
public static void main(String[] args) {
180+
doUpsert(true);
181+
doUpsert(false);
182+
183+
client.close();
184+
}
185+
}
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package io.milvus.v2;
21+
22+
import com.google.gson.Gson;
23+
import com.google.gson.JsonObject;
24+
import io.milvus.v1.CommonUtils;
25+
import io.milvus.v2.client.ConnectConfig;
26+
import io.milvus.v2.client.MilvusClientV2;
27+
import io.milvus.v2.common.ConsistencyLevel;
28+
import io.milvus.v2.common.DataType;
29+
import io.milvus.v2.common.IndexParam;
30+
import io.milvus.v2.service.collection.request.AddFieldReq;
31+
import io.milvus.v2.service.collection.request.CreateCollectionReq;
32+
import io.milvus.v2.service.collection.request.DropCollectionReq;
33+
import io.milvus.v2.service.vector.request.InsertReq;
34+
import io.milvus.v2.service.vector.request.QueryReq;
35+
import io.milvus.v2.service.vector.request.UpsertReq;
36+
import io.milvus.v2.service.vector.response.InsertResp;
37+
import io.milvus.v2.service.vector.response.QueryResp;
38+
import io.milvus.v2.service.vector.response.UpsertResp;
39+
40+
import java.util.*;
41+
42+
public class UpsertExample {
43+
private static final MilvusClientV2 client;
44+
static {
45+
client = new MilvusClientV2(ConnectConfig.builder()
46+
.uri("http://localhost:19530")
47+
.build());
48+
}
49+
50+
private static final String COLLECTION_NAME = "java_sdk_example_upsert_v2";
51+
private static final String ID_FIELD = "pk";
52+
private static final String VECTOR_FIELD = "vector";
53+
private static final String TEXT_FIELD = "text";
54+
private static final Integer VECTOR_DIM = 128;
55+
56+
private static List<Object> createCollection(boolean autoID) {
57+
// Drop collection if exists
58+
client.dropCollection(DropCollectionReq.builder()
59+
.collectionName(COLLECTION_NAME)
60+
.build());
61+
62+
// Create collection
63+
CreateCollectionReq.CollectionSchema collectionSchema = CreateCollectionReq.CollectionSchema.builder()
64+
.build();
65+
collectionSchema.addField(AddFieldReq.builder()
66+
.fieldName(ID_FIELD)
67+
.dataType(DataType.Int64)
68+
.isPrimaryKey(Boolean.TRUE)
69+
.autoID(autoID)
70+
.build());
71+
collectionSchema.addField(AddFieldReq.builder()
72+
.fieldName(VECTOR_FIELD)
73+
.dataType(DataType.FloatVector)
74+
.dimension(VECTOR_DIM)
75+
.build());
76+
collectionSchema.addField(AddFieldReq.builder()
77+
.fieldName(TEXT_FIELD)
78+
.dataType(DataType.VarChar)
79+
.maxLength(100)
80+
.build());
81+
82+
List<IndexParam> indexes = new ArrayList<>();
83+
indexes.add(IndexParam.builder()
84+
.fieldName(VECTOR_FIELD)
85+
.indexType(IndexParam.IndexType.FLAT)
86+
.metricType(IndexParam.MetricType.COSINE)
87+
.build());
88+
89+
CreateCollectionReq requestCreate = CreateCollectionReq.builder()
90+
.collectionName(COLLECTION_NAME)
91+
.collectionSchema(collectionSchema)
92+
.indexParams(indexes)
93+
.consistencyLevel(ConsistencyLevel.BOUNDED)
94+
.build();
95+
client.createCollection(requestCreate);
96+
System.out.println("\nCollection created with autoID = " + autoID);
97+
98+
// Insert rows
99+
Gson gson = new Gson();
100+
List<JsonObject> rows = new ArrayList<>();
101+
for (int i = 0; i < 100; i++) {
102+
JsonObject row = new JsonObject();
103+
if (!autoID) {
104+
row.addProperty(ID_FIELD, i);
105+
}
106+
List<Float> vector = CommonUtils.generateFloatVector(VECTOR_DIM);
107+
row.add(VECTOR_FIELD, gson.toJsonTree(vector));
108+
row.addProperty(TEXT_FIELD, String.format("text_%d", i));
109+
rows.add(row);
110+
}
111+
InsertResp resp = client.insert(InsertReq.builder()
112+
.collectionName(COLLECTION_NAME)
113+
.data(rows)
114+
.build());
115+
return resp.getPrimaryKeys();
116+
}
117+
118+
private static void queryWithExpr(String expr) {
119+
QueryResp queryRet = client.query(QueryReq.builder()
120+
.collectionName(COLLECTION_NAME)
121+
.filter(expr)
122+
.outputFields(Arrays.asList(ID_FIELD, TEXT_FIELD))
123+
.consistencyLevel(ConsistencyLevel.STRONG)
124+
.build());
125+
System.out.println("\nQuery with expression: " + expr);
126+
List<QueryResp.QueryResult> records = queryRet.getQueryResults();
127+
for (QueryResp.QueryResult record : records) {
128+
System.out.println(record.getEntity());
129+
}
130+
}
131+
132+
private static void doUpsert(boolean autoID) {
133+
// if autoID is true, the collection primary key is auto-generated by server
134+
List<Object> ids = createCollection(autoID);
135+
136+
// query before upsert
137+
Long testID = (Long)ids.get(1);
138+
String filter = String.format("%s == %d", ID_FIELD, testID);
139+
queryWithExpr(filter);
140+
141+
// upsert
142+
// the server will return a new primary key, the old entity is deleted,
143+
// and a new entity is created with the new primary key
144+
Gson gson = new Gson();
145+
JsonObject row = new JsonObject();
146+
row.addProperty(ID_FIELD, testID);
147+
List<Float> vector = CommonUtils.generateFloatVector(VECTOR_DIM);
148+
row.add(VECTOR_FIELD, gson.toJsonTree(vector));
149+
row.addProperty(TEXT_FIELD, "this field has been updated");
150+
UpsertResp upsertResp = client.upsert(UpsertReq.builder()
151+
.collectionName(COLLECTION_NAME)
152+
.data(Collections.singletonList(row))
153+
.build());
154+
List<Object> newIds = upsertResp.getPrimaryKeys();
155+
Long newID = (Long)newIds.get(0);
156+
System.out.println("\nUpsert done");
157+
158+
// query after upsert
159+
filter = String.format("%s == %d", ID_FIELD, newID);
160+
queryWithExpr(filter);
161+
}
162+
163+
public static void main(String[] args) {
164+
doUpsert(true);
165+
doUpsert(false);
166+
167+
client.close();
168+
}
169+
}

sdk-core/src/main/java/io/milvus/param/ParamUtils.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -526,13 +526,6 @@ public InsertBuilderWrapper(@NonNull UpsertParam requestParam,
526526
DescCollResponseWrapper wrapper) {
527527
String collectionName = requestParam.getCollectionName();
528528

529-
// currently, not allow to upsert for collection whose primary key is auto-generated
530-
FieldType pk = wrapper.getPrimaryField();
531-
if (pk.isAutoID()) {
532-
throw new ParamException(String.format("Upsert don't support autoID==True, collection: %s",
533-
requestParam.getCollectionName()));
534-
}
535-
536529
// generate upsert request builder
537530
MsgBase msgBase = MsgBase.newBuilder().setMsgType(MsgType.Insert).build();
538531
upsertBuilder = UpsertRequest.newBuilder()
@@ -601,7 +594,8 @@ private void checkAndSetColumnData(DescCollResponseWrapper wrapper, List<InsertP
601594
boolean found = false;
602595
for (InsertParam.Field field : fields) {
603596
if (field.getName().equals(fieldType.getName())) {
604-
if (fieldType.isAutoID()) {
597+
// from v2.4.10, milvus allows upsert for auto-id pk, no need to check for upsert action
598+
if (fieldType.isAutoID() && insertBuilder != null) {
605599
String msg = String.format("The primary key: %s is auto generated, no need to input.",
606600
fieldType.getName());
607601
throw new ParamException(msg);
@@ -669,7 +663,8 @@ private void checkAndSetRowData(DescCollResponseWrapper wrapper, List<JsonObject
669663
rowFieldData = JsonNull.INSTANCE;
670664
}
671665

672-
if (fieldType.isAutoID()) {
666+
// from v2.4.10, milvus allows upsert for auto-id pk, no need to check for upsert action
667+
if (fieldType.isAutoID() && insertBuilder != null) {
673668
String msg = String.format("The primary key: %s is auto generated, no need to input.", fieldName);
674669
throw new ParamException(msg);
675670
}

0 commit comments

Comments
 (0)