Skip to content

Commit 69813b2

Browse files
authored
Merge 165e12a into 4e5428b
2 parents 4e5428b + 165e12a commit 69813b2

File tree

18 files changed

+129
-81
lines changed

18 files changed

+129
-81
lines changed

eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/app/controller/SubController.java

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,10 @@
1717

1818
package org.apache.eventmesh.grpc.sub.app.controller;
1919

20-
import org.apache.eventmesh.client.tcp.common.EventMeshCommon;
2120
import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
2221
import org.apache.eventmesh.common.utils.JsonUtils;
2322
import org.apache.eventmesh.grpc.sub.app.service.SubService;
2423

25-
import org.apache.commons.lang3.StringUtils;
26-
27-
import java.nio.charset.StandardCharsets;
2824
import java.util.HashMap;
2925
import java.util.Map;
3026

@@ -35,11 +31,6 @@
3531
import org.springframework.web.bind.annotation.RequestMethod;
3632
import org.springframework.web.bind.annotation.RestController;
3733

38-
import io.cloudevents.CloudEvent;
39-
import io.cloudevents.CloudEventData;
40-
import io.cloudevents.core.format.EventFormat;
41-
import io.cloudevents.core.provider.EventFormatProvider;
42-
4334
import lombok.extern.slf4j.Slf4j;
4435

4536
@Slf4j
@@ -59,22 +50,6 @@ public String subTest(final HttpServletRequest request) {
5950
log.info("=======receive message======= {}", content);
6051
}
6152

62-
if (StringUtils.equals(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME, protocolType)) {
63-
final String contentType = request.getHeader(ProtocolKey.CONTENT_TYPE);
64-
65-
final EventFormat eventFormat = EventFormatProvider.getInstance().resolveFormat(contentType);
66-
if (eventFormat != null) {
67-
final CloudEvent event = eventFormat.deserialize(content.getBytes(StandardCharsets.UTF_8));
68-
final CloudEventData cloudEventData = event.getData();
69-
if (cloudEventData != null) {
70-
final String data = new String(cloudEventData.toBytes(), StandardCharsets.UTF_8);
71-
if (log.isInfoEnabled()) {
72-
log.info("=======receive data======= {}", data);
73-
}
74-
}
75-
}
76-
}
77-
7853
subService.consumeMessage(content);
7954

8055
final Map<String, Object> map = new HashMap<>();

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/AbstractPublishBatchCloudEventProcessor.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,18 +56,18 @@ public void process(CloudEventBatch cloudEvent, EventEmitter<CloudEvent> emitter
5656
// control flow rate limit
5757
if (!eventMeshGrpcServer.getMsgRateLimiter().tryAcquire(EventMeshConstants.DEFAULT_FASTFAIL_TIMEOUT_IN_MILLISECONDS, TimeUnit.MILLISECONDS)) {
5858
log.error("Send message speed over limit.");
59-
ServiceUtils.completed(StatusCode.EVENTMESH_SEND_MESSAGE_SPEED_OVER_LIMIT_ERR, emitter);
59+
ServiceUtils.sendResponseCompleted(StatusCode.EVENTMESH_SEND_MESSAGE_SPEED_OVER_LIMIT_ERR, emitter);
6060
return;
6161
}
6262

6363
StatusCode cloudEventCheck = cloudEventBatchCheck(cloudEvent);
6464
if (cloudEventCheck != StatusCode.SUCCESS) {
65-
ServiceUtils.completed(cloudEventCheck, emitter);
65+
ServiceUtils.sendResponseCompleted(cloudEventCheck, emitter);
6666
return;
6767
}
6868
StatusCode aclCheck = this.aclCheck(cloudEvent.getEvents(0));
6969
if (aclCheck != StatusCode.SUCCESS) {
70-
ServiceUtils.completed(aclCheck, emitter);
70+
ServiceUtils.sendResponseCompleted(aclCheck, emitter);
7171
return;
7272
}
7373
handleCloudEvent(cloudEvent, emitter);

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/AbstractPublishCloudEventProcessor.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,18 +55,18 @@ public void process(CloudEvent cloudEvent, EventEmitter<CloudEvent> emitter) thr
5555
// control flow rate limit
5656
if (!eventMeshGrpcServer.getMsgRateLimiter().tryAcquire(EventMeshConstants.DEFAULT_FASTFAIL_TIMEOUT_IN_MILLISECONDS, TimeUnit.MILLISECONDS)) {
5757
log.error("Send message speed over limit.");
58-
ServiceUtils.streamCompleted(cloudEvent, StatusCode.EVENTMESH_SEND_MESSAGE_SPEED_OVER_LIMIT_ERR, emitter);
58+
ServiceUtils.sendStreamResponseCompleted(cloudEvent, StatusCode.EVENTMESH_SEND_MESSAGE_SPEED_OVER_LIMIT_ERR, emitter);
5959
return;
6060
}
6161

6262
StatusCode cloudEventCheck = cloudEventCheck(cloudEvent);
6363
if (cloudEventCheck != StatusCode.SUCCESS) {
64-
ServiceUtils.completed(cloudEventCheck, emitter);
64+
ServiceUtils.sendResponseCompleted(cloudEventCheck, emitter);
6565
return;
6666
}
6767
StatusCode aclCheck = this.aclCheck(cloudEvent);
6868
if (aclCheck != StatusCode.SUCCESS) {
69-
ServiceUtils.completed(aclCheck, emitter);
69+
ServiceUtils.sendResponseCompleted(aclCheck, emitter);
7070
return;
7171
}
7272
handleCloudEvent(cloudEvent, emitter);

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/BatchPublishCloudEventProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public void onException(OnExceptionContext context) {
8585
}
8686
});
8787
}
88-
ServiceUtils.completed(StatusCode.SUCCESS, "batch publish success", emitter);
88+
ServiceUtils.sendResponseCompleted(StatusCode.SUCCESS, "batch publish success", emitter);
8989
}
9090

9191

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/HeartbeatProcessor.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,27 +56,27 @@ public HeartbeatProcessor(final EventMeshGrpcServer eventMeshGrpcServer) {
5656
public void process(CloudEvent heartbeat, EventEmitter<CloudEvent> emitter) throws Exception {
5757

5858
if (!ServiceUtils.validateCloudEventAttributes(heartbeat)) {
59-
ServiceUtils.completed(StatusCode.EVENTMESH_PROTOCOL_HEADER_ERR, emitter);
59+
ServiceUtils.sendResponseCompleted(StatusCode.EVENTMESH_PROTOCOL_HEADER_ERR, emitter);
6060
return;
6161
}
6262

6363
if (!ServiceUtils.validateHeartBeat(heartbeat)) {
64-
ServiceUtils.completed(StatusCode.EVENTMESH_PROTOCOL_BODY_ERR, emitter);
64+
ServiceUtils.sendResponseCompleted(StatusCode.EVENTMESH_PROTOCOL_BODY_ERR, emitter);
6565
return;
6666
}
6767

6868
try {
6969
doAclCheck(heartbeat);
7070
} catch (AclException e) {
7171
aclLogger.warn("CLIENT HAS NO PERMISSION, HeartbeatProcessor failed", e);
72-
ServiceUtils.completed(StatusCode.EVENTMESH_ACL_ERR, e.getMessage(), emitter);
72+
ServiceUtils.sendResponseCompleted(StatusCode.EVENTMESH_ACL_ERR, e.getMessage(), emitter);
7373
return;
7474
}
7575

7676
// only handle heartbeat for consumers
7777
org.apache.eventmesh.common.protocol.grpc.common.ClientType clientType = EventMeshCloudEventUtils.getClientType(heartbeat);
7878
if (org.apache.eventmesh.common.protocol.grpc.common.ClientType.SUB != clientType) {
79-
ServiceUtils.completed(StatusCode.EVENTMESH_PROTOCOL_BODY_ERR, emitter);
79+
ServiceUtils.sendResponseCompleted(StatusCode.EVENTMESH_PROTOCOL_BODY_ERR, emitter);
8080
return;
8181
}
8282

@@ -106,11 +106,11 @@ public void process(CloudEvent heartbeat, EventEmitter<CloudEvent> emitter) thro
106106

107107
// consumer group client is lost, and the client needs to resubscribe.
108108
if (!consumerManager.updateClientTime(hbClient)) {
109-
ServiceUtils.completed(StatusCode.CLIENT_RESUBSCRIBE, emitter);
109+
ServiceUtils.sendResponseCompleted(StatusCode.CLIENT_RESUBSCRIBE, emitter);
110110
return;
111111
}
112112
}
113-
ServiceUtils.completed(StatusCode.SUCCESS, "heartbeat success", emitter);
113+
ServiceUtils.sendResponseCompleted(StatusCode.SUCCESS, "heartbeat success", emitter);
114114
}
115115

116116
private void doAclCheck(CloudEvent heartbeat) throws AclException {

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/PublishCloudEventsProcessor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public void handleCloudEvent(CloudEvent message, EventEmitter<CloudEvent> emitte
6666
eventMeshProducer.send(sendMessageContext, new SendCallback() {
6767
@Override
6868
public void onSuccess(SendResult sendResult) {
69-
ServiceUtils.completed(StatusCode.SUCCESS, sendResult.toString(), emitter);
69+
ServiceUtils.sendResponseCompleted(StatusCode.SUCCESS, sendResult.toString(), emitter);
7070
long endTime = System.currentTimeMillis();
7171
log.info("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
7272
endTime - startTime, topic, seqNum, uniqueId);
@@ -75,7 +75,7 @@ public void onSuccess(SendResult sendResult) {
7575

7676
@Override
7777
public void onException(OnExceptionContext context) {
78-
ServiceUtils.completed(StatusCode.EVENTMESH_SEND_ASYNC_MSG_ERR,
78+
ServiceUtils.sendResponseCompleted(StatusCode.EVENTMESH_SEND_ASYNC_MSG_ERR,
7979
EventMeshUtil.stackTrace(context.getException(), 2), emitter);
8080
long endTime = System.currentTimeMillis();
8181
log.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/ReplyMessageProcessor.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,28 +66,28 @@ public ReplyMessageProcessor(final EventMeshGrpcServer eventMeshGrpcServer) {
6666
public void process(CloudEvent message, EventEmitter<CloudEvent> emitter) throws Exception {
6767

6868
if (!ServiceUtils.validateCloudEventAttributes(message)) {
69-
ServiceUtils.streamCompleted(message, StatusCode.EVENTMESH_PROTOCOL_HEADER_ERR, emitter);
69+
ServiceUtils.sendStreamResponseCompleted(message, StatusCode.EVENTMESH_PROTOCOL_HEADER_ERR, emitter);
7070
return;
7171
}
7272

7373
if (!ServiceUtils.validateCloudEventData(message)) {
74-
ServiceUtils.streamCompleted(message, StatusCode.EVENTMESH_PROTOCOL_BODY_ERR, emitter);
74+
ServiceUtils.sendStreamResponseCompleted(message, StatusCode.EVENTMESH_PROTOCOL_BODY_ERR, emitter);
7575
return;
7676
}
7777

7878
try {
7979
doAclCheck(message);
8080
} catch (Exception e) {
8181
aclLogger.warn("CLIENT HAS NO PERMISSION,RequestReplyMessageProcessor reply failed", e);
82-
ServiceUtils.streamCompleted(message, StatusCode.EVENTMESH_ACL_ERR, e.getMessage(), emitter);
82+
ServiceUtils.sendStreamResponseCompleted(message, StatusCode.EVENTMESH_ACL_ERR, e.getMessage(), emitter);
8383
return;
8484
}
8585

8686
// control flow rate limit
8787
if (!eventMeshGrpcServer.getMsgRateLimiter()
8888
.tryAcquire(EventMeshConstants.DEFAULT_FASTFAIL_TIMEOUT_IN_MILLISECONDS, TimeUnit.MILLISECONDS)) {
8989
log.error("Send message speed over limit.");
90-
ServiceUtils.streamCompleted(message, StatusCode.EVENTMESH_SEND_MESSAGE_SPEED_OVER_LIMIT_ERR, emitter);
90+
ServiceUtils.sendStreamResponseCompleted(message, StatusCode.EVENTMESH_SEND_MESSAGE_SPEED_OVER_LIMIT_ERR, emitter);
9191
return;
9292
}
9393

@@ -122,7 +122,7 @@ public void onSuccess(SendResult sendResult) {
122122

123123
@Override
124124
public void onException(OnExceptionContext onExceptionContext) {
125-
ServiceUtils.streamCompleted(messageReply, StatusCode.EVENTMESH_REPLY_MSG_ERR,
125+
ServiceUtils.sendStreamResponseCompleted(messageReply, StatusCode.EVENTMESH_REPLY_MSG_ERR,
126126
EventMeshUtil.stackTrace(onExceptionContext.getException(), 2), emitter);
127127
long endTime = System.currentTimeMillis();
128128
log.error("message|mq2eventmesh|REPLY|ReplyToServer|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/RequestCloudEventProcessor.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ public void onSuccess(io.cloudevents.CloudEvent event) {
7979
endTime - startTime, topic, seqNum, uniqueId);
8080
eventMeshGrpcServer.getMetricsMonitor().recordSendMsgToClient();
8181
} catch (Exception e) {
82-
ServiceUtils.streamCompleted(message, StatusCode.EVENTMESH_REQUEST_REPLY_MSG_ERR, EventMeshUtil.stackTrace(e, 2), emitter);
82+
ServiceUtils.sendStreamResponseCompleted(message, StatusCode.EVENTMESH_REQUEST_REPLY_MSG_ERR, EventMeshUtil.stackTrace(e, 2),
83+
emitter);
8384
long endTime = System.currentTimeMillis();
8485
log.error("message|mq2eventmesh|REPLY|RequestReply|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
8586
endTime - startTime, topic, seqNum, uniqueId, e);
@@ -88,7 +89,8 @@ public void onSuccess(io.cloudevents.CloudEvent event) {
8889

8990
@Override
9091
public void onException(Throwable e) {
91-
ServiceUtils.streamCompleted(message, StatusCode.EVENTMESH_REQUEST_REPLY_MSG_ERR, EventMeshUtil.stackTrace(e, 2), emitter);
92+
ServiceUtils.sendStreamResponseCompleted(message, StatusCode.EVENTMESH_REQUEST_REPLY_MSG_ERR, EventMeshUtil.stackTrace(e, 2),
93+
emitter);
9294
long endTime = System.currentTimeMillis();
9395
log.error("message|eventMesh2mq|REPLY|RequestReply|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
9496
endTime - startTime, topic, seqNum, uniqueId, e);

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/SubscribeProcessor.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,12 @@ public void process(final CloudEvent subscription, final EventEmitter<CloudEvent
6262
Objects.requireNonNull(emitter, "emitter can not be null");
6363

6464
if (!ServiceUtils.validateCloudEventAttributes(subscription)) {
65-
ServiceUtils.completed(StatusCode.EVENTMESH_PROTOCOL_HEADER_ERR, emitter);
65+
ServiceUtils.sendResponseCompleted(StatusCode.EVENTMESH_PROTOCOL_HEADER_ERR, emitter);
6666
return;
6767
}
6868

6969
if (!ServiceUtils.validateSubscription(grpcType, subscription)) {
70-
ServiceUtils.completed(StatusCode.EVENTMESH_PROTOCOL_BODY_ERR, emitter);
70+
ServiceUtils.sendResponseCompleted(StatusCode.EVENTMESH_PROTOCOL_BODY_ERR, emitter);
7171
return;
7272
}
7373

@@ -77,7 +77,7 @@ public void process(final CloudEvent subscription, final EventEmitter<CloudEvent
7777
if (log.isWarnEnabled()) {
7878
log.warn("CLIENT HAS NO PERMISSION to Subscribe. failed", e);
7979
}
80-
ServiceUtils.completed(StatusCode.EVENTMESH_ACL_ERR, e.getMessage(), emitter);
80+
ServiceUtils.sendResponseCompleted(StatusCode.EVENTMESH_ACL_ERR, e.getMessage(), emitter);
8181
return;
8282
}
8383

@@ -136,7 +136,7 @@ public void process(final CloudEvent subscription, final EventEmitter<CloudEvent
136136
log.warn("EventMesh consumer [{}] didn't restart.", consumerGroup);
137137
}
138138
}
139-
ServiceUtils.completed(StatusCode.SUCCESS, "subscribe success", emitter);
139+
ServiceUtils.sendResponseCompleted(StatusCode.SUCCESS, "subscribe success", emitter);
140140
}
141141

142142
private void doAclCheck(final CloudEvent subscription) throws AclException {

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/SubscribeStreamProcessor.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,20 +64,20 @@ public SubscribeStreamProcessor(final EventMeshGrpcServer eventMeshGrpcServer) {
6464
public void process(CloudEvent subscription, EventEmitter<CloudEvent> emitter) throws Exception {
6565

6666
if (!ServiceUtils.validateCloudEventAttributes(subscription)) {
67-
ServiceUtils.completed(StatusCode.EVENTMESH_PROTOCOL_HEADER_ERR, emitter);
67+
ServiceUtils.sendResponseCompleted(StatusCode.EVENTMESH_PROTOCOL_HEADER_ERR, emitter);
6868
return;
6969
}
7070

7171
if (!ServiceUtils.validateSubscription(grpcType, subscription)) {
72-
ServiceUtils.completed(StatusCode.EVENTMESH_PROTOCOL_BODY_ERR, emitter);
72+
ServiceUtils.sendResponseCompleted(StatusCode.EVENTMESH_PROTOCOL_BODY_ERR, emitter);
7373
return;
7474
}
7575

7676
try {
7777
doAclCheck(subscription);
7878
} catch (AclException e) {
7979
aclLogger.warn("CLIENT HAS NO PERMISSION to Subscribe. failed", e);
80-
ServiceUtils.streamCompleted(subscription, StatusCode.EVENTMESH_ACL_ERR, e.getMessage(), emitter);
80+
ServiceUtils.sendStreamResponseCompleted(subscription, StatusCode.EVENTMESH_ACL_ERR, e.getMessage(), emitter);
8181
return;
8282
}
8383

@@ -135,7 +135,7 @@ public void process(CloudEvent subscription, EventEmitter<CloudEvent> emitter) t
135135
log.warn("EventMesh consumer [{}] didn't restart.", consumerGroup);
136136
}
137137

138-
ServiceUtils.streamCompleted(subscription, StatusCode.SUCCESS, "subscribe success", emitter);
138+
ServiceUtils.sendStreamResponse(subscription, StatusCode.SUCCESS, "subscribe success", emitter);
139139
}
140140

141141
private void doAclCheck(CloudEvent subscription) throws AclException {

0 commit comments

Comments
 (0)