Skip to content

Commit bae1307

Browse files
authored
[ISSUE #4166] Fix grpc AsyncPublishInstance has no push messages. (#4167)
* Create .lift.toml * Update .lift.toml * Update .lift.toml * Update .lift.toml * Update .lift.toml * Update .lift.toml * Update .lift.toml * Update .lift.toml * Update .lift.toml * delete lift * push message optimization methods and correction of topic. * push message optimization methods and correction of topic.
1 parent 525a8dd commit bae1307

File tree

8 files changed

+18
-11
lines changed

8 files changed

+18
-11
lines changed

eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/GrpcAbstractDemo.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,10 @@ protected static EventMeshGrpcClientConfig initEventMeshGrpcClientConfig(final S
5353
.build();
5454
}
5555

56-
protected static CloudEvent buildCloudEvent(final Map<String, String> content) {
56+
protected static CloudEvent buildCloudEvent(final Map<String, String> content, String topic) {
5757
return CloudEventBuilder.v1()
5858
.withId(UUID.randomUUID().toString())
59-
.withSubject(ExampleConstants.EVENTMESH_GRPC_ASYNC_TEST_TOPIC)
59+
.withSubject(topic)
6060
.withSource(URI.create("/"))
6161
.withDataContentType(ExampleConstants.CLOUDEVENT_CONTENT_TYPE)
6262
.withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME)
@@ -66,10 +66,10 @@ protected static CloudEvent buildCloudEvent(final Map<String, String> content) {
6666

6767
}
6868

69-
protected static EventMeshMessage buildEventMeshMessage(final Map<String, String> content) {
69+
protected static EventMeshMessage buildEventMeshMessage(final Map<String, String> content, String topic) {
7070
return EventMeshMessage.builder()
7171
.content(JsonUtils.toJSONString(content))
72-
.topic(ExampleConstants.EVENTMESH_GRPC_BROADCAT_TEST_TOPIC)
72+
.topic(topic)
7373
.uniqueId(RandomStringUtils.generateNum(30))
7474
.bizSeqNo(RandomStringUtils.generateNum(30))
7575
.build()

eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/cloudevents/CloudEventsBatchPublishInstance.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ public static void main(String[] args) throws Exception {
4545

4646
final List<CloudEvent> cloudEventList = new ArrayList<>();
4747
for (int i = 0; i < 5; i++) {
48-
cloudEventList.add(buildCloudEvent(content));
48+
cloudEventList.add(buildCloudEvent(content,
49+
ExampleConstants.EVENTMESH_GRPC_BROADCAT_TEST_TOPIC));
4950
}
5051
eventMeshGrpcProducer.publish(cloudEventList);
5152
ThreadUtils.sleep(10, TimeUnit.SECONDS);

eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/cloudevents/CloudEventsPublishInstance.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ public static void main(String[] args) throws Exception {
4444
content.put("content", "testAsyncMessage");
4545

4646
for (int i = 0; i < MESSAGE_SIZE; i++) {
47-
eventMeshGrpcProducer.publish(buildCloudEvent(content));
47+
eventMeshGrpcProducer.publish(buildCloudEvent(content,
48+
ExampleConstants.EVENTMESH_GRPC_ASYNC_TEST_TOPIC));
4849
ThreadUtils.sleep(1, TimeUnit.SECONDS);
4950
}
5051

eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/cloudevents/CloudEventsRequestInstance.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ public static void main(String[] args) throws Exception {
4343
content.put("content", "testRequestReplyMessage");
4444

4545
for (int i = 0; i < MESSAGE_SIZE; i++) {
46-
eventMeshGrpcProducer.requestReply(buildCloudEvent(content), EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
46+
eventMeshGrpcProducer.requestReply(buildCloudEvent(content,
47+
ExampleConstants.EVENTMESH_GRPC_RR_TEST_TOPIC), EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
4748
ThreadUtils.sleep(1, TimeUnit.SECONDS);
4849
}
4950

eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/AsyncPublishBroadcast.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ public static void main(String[] args) throws Exception {
4343
content.put("content", "testAsyncMessage");
4444

4545
for (int i = 0; i < MESSAGE_SIZE; i++) {
46-
eventMeshGrpcProducer.publish(buildEventMeshMessage(content));
46+
eventMeshGrpcProducer.publish(buildEventMeshMessage(content,
47+
ExampleConstants.EVENTMESH_GRPC_BROADCAT_TEST_TOPIC));
4748
ThreadUtils.sleep(1, TimeUnit.SECONDS);
4849
}
4950
ThreadUtils.sleep(30, TimeUnit.SECONDS);

eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/AsyncPublishInstance.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ public static void main(String[] args) throws Exception {
4343
content.put("content", "testAsyncMessage");
4444

4545
for (int i = 0; i < MESSAGE_SIZE; i++) {
46-
buildEventMeshMessage(content);
46+
eventMeshGrpcProducer.publish(buildEventMeshMessage(content,
47+
ExampleConstants.EVENTMESH_GRPC_ASYNC_TEST_TOPIC));
4748
ThreadUtils.sleep(1, TimeUnit.SECONDS);
4849
}
4950
ThreadUtils.sleep(30, TimeUnit.SECONDS);

eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/BatchPublishInstance.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ public static void main(String[] args) throws Exception {
4545

4646
List<EventMeshMessage> messageList = new ArrayList<>();
4747
for (int i = 0; i < 5; i++) {
48-
messageList.add(buildEventMeshMessage(content));
48+
messageList.add(buildEventMeshMessage(content,
49+
ExampleConstants.EVENTMESH_GRPC_BROADCAT_TEST_TOPIC));
4950
}
5051

5152
eventMeshGrpcProducer.publish(messageList);

eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/RequestReplyInstance.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ public static void main(String[] args) throws Exception {
4545
content.put("content", "testRequestReplyMessage");
4646

4747
for (int i = 0; i < MESSAGE_SIZE; i++) {
48-
eventMeshGrpcProducer.requestReply(buildEventMeshMessage(content),
48+
eventMeshGrpcProducer.requestReply(buildEventMeshMessage(content,
49+
ExampleConstants.EVENTMESH_GRPC_RR_TEST_TOPIC),
4950
EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
5051
ThreadUtils.sleep(1, TimeUnit.SECONDS);
5152
}

0 commit comments

Comments
 (0)