Skip to content

Commit 8f72bf5

Browse files
committed
[ISSUE #4472]Fix MeshMessageProtocolAdaptor#fromCloudEvent throw NPE
1 parent b6f9ca5 commit 8f72bf5

File tree

4 files changed

+15
-13
lines changed

4 files changed

+15
-13
lines changed

eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,13 @@ public class Constants {
169169

170170
public static final int SUCCESS_CODE = 200;
171171

172+
// protocol desc
173+
public static final String PROTOCOL_DESC_GRPC_CLOUD_EVENT = "grpc-cloud-event";
174+
175+
public static final String PROTOCOL_DESC_HTTP = "http";
176+
177+
public static final String PROTOCOL_DESC_TCP = "tcp";
178+
172179
/**
173180
* GRPC PROTOCOL
174181
*/

eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/MeshMessageProtocolAdaptor.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.eventmesh.protocol.meshmessage;
1919

2020
import org.apache.eventmesh.common.Constants;
21+
import org.apache.eventmesh.common.enums.EventMeshProtocolType;
2122
import org.apache.eventmesh.common.protocol.ProtocolTransportObject;
2223
import org.apache.eventmesh.common.protocol.grpc.cloudevents.CloudEventBatch;
2324
import org.apache.eventmesh.common.protocol.grpc.common.BatchEventMeshCloudEventWrapper;
@@ -109,7 +110,7 @@ public ProtocolTransportObject fromCloudEvent(CloudEvent cloudEvent) throws Prot
109110
cloudEvent.getExtension(Constants.PROTOCOL_DESC) == null ? null : cloudEvent.getExtension(Constants.PROTOCOL_DESC).toString();
110111

111112
switch (Objects.requireNonNull(protocolDesc)) {
112-
case MeshMessageProtocolConstant.PROTOCOL_DESC_HTTP:
113+
case Constants.PROTOCOL_DESC_HTTP:
113114
HttpCommand httpCommand = new HttpCommand();
114115
Body body = new Body() {
115116

@@ -128,9 +129,9 @@ public Map<String, Object> toMap() {
128129
body.toMap();
129130
httpCommand.setBody(body);
130131
return httpCommand;
131-
case MeshMessageProtocolConstant.PROTOCOL_DESC_TCP:
132+
case Constants.PROTOCOL_DESC_TCP:
132133
return TcpMessageProtocolResolver.buildEventMeshMessage(cloudEvent);
133-
case MeshMessageProtocolConstant.PROTOCOL_DESC_GRPC_CLOUD_EVENT:
134+
case Constants.PROTOCOL_DESC_GRPC_CLOUD_EVENT:
134135
return GrpcEventMeshCloudEventProtocolResolver.buildEventMeshCloudEvent(cloudEvent);
135136
default:
136137
throw new ProtocolHandleException(String.format("Unsupported protocolDesc: %s", protocolDesc));
@@ -139,7 +140,7 @@ public Map<String, Object> toMap() {
139140

140141
@Override
141142
public String getProtocolType() {
142-
return MeshMessageProtocolConstant.PROTOCOL_NAME;
143+
return EventMeshProtocolType.EVENT_MESH_MESSAGE.protocolTypeName();
143144
}
144145

145146
private void validateCloudEvent(CloudEvent cloudEvent) {

eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/MeshMessageProtocolConstant.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,5 @@ public enum MeshMessageProtocolConstant {
2222

2323
public static final String PROTOCOL_NAME = "eventmeshmessage";
2424

25-
public static final String PROTOCOL_DESC_HTTP = "http";
26-
27-
public static final String PROTOCOL_DESC_GRPC = "grpc";
28-
29-
public static final String PROTOCOL_DESC_GRPC_CLOUD_EVENT = "grpc-cloud-event";
30-
31-
public static final String PROTOCOL_DESC_TCP = "tcp";
32-
3325
public static final String PROTOCOL_KEY_CONTENT = "content";
3426
}

eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/util/EventMeshCloudEventBuilder.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,8 @@ private static CloudEvent switchEventMeshMessage2EventMeshCloudEvent(EventMeshMe
153153
attributeValueMap.put(ProtocolKey.TTL, CloudEventAttributeValue.newBuilder().setCeString(ttl).build());
154154
attributeValueMap.put(ProtocolKey.SEQ_NUM, CloudEventAttributeValue.newBuilder().setCeString(seqNum).build());
155155
attributeValueMap.put(ProtocolKey.UNIQUE_ID, CloudEventAttributeValue.newBuilder().setCeString(uniqueId).build());
156+
attributeValueMap.put(ProtocolKey.PROTOCOL_DESC,
157+
CloudEventAttributeValue.newBuilder().setCeString(Constants.PROTOCOL_DESC_GRPC_CLOUD_EVENT).build());
156158
attributeValueMap.put(ProtocolKey.PRODUCERGROUP,
157159
CloudEventAttributeValue.newBuilder().setCeString(clientConfig.getProducerGroup()).build());
158160
if (null != message.getTopic()) {
@@ -197,7 +199,7 @@ private static CloudEvent switchCloudEvent2EventMeshCloudEvent(io.cloudevents.Cl
197199
buildCloudEventIfAbsent(message, cloudEventBuilder, ProtocolKey.SYS, clientConfig.getSys());
198200
buildCloudEventIfAbsent(message, cloudEventBuilder, ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA);
199201
buildCloudEventIfAbsent(message, cloudEventBuilder, ProtocolKey.PROTOCOL_TYPE, protocolType.protocolTypeName());
200-
buildCloudEventIfAbsent(message, cloudEventBuilder, ProtocolKey.PROTOCOL_DESC, "grpc-cloud-event");
202+
buildCloudEventIfAbsent(message, cloudEventBuilder, ProtocolKey.PROTOCOL_DESC, Constants.PROTOCOL_DESC_GRPC_CLOUD_EVENT);
201203
buildCloudEventIfAbsent(message, cloudEventBuilder, ProtocolKey.PROTOCOL_VERSION, message.getSpecVersion().toString());
202204
buildCloudEventIfAbsent(message, cloudEventBuilder, ProtocolKey.UNIQUE_ID, RandomStringUtils.generateNum(30));
203205
buildCloudEventIfAbsent(message, cloudEventBuilder, ProtocolKey.SEQ_NUM, RandomStringUtils.generateNum(30));

0 commit comments

Comments
 (0)