-
Notifications
You must be signed in to change notification settings - Fork 641
Description
Search before asking
- I had searched in the issues and found no similar issues.
Environment
Window
EventMesh version
master
What happened
In HTTP subscription, the changes in subscription will not trigger the ConsumerGroupManager to refresh. So the updates of the subscription is not taking effect.
The ConsumerGroupManager still using the old subscription even it is already updated.
How to reproduce
Run test using the http subscriber sample in eventmesh-examples -> Http.deom.sub package, with the following changes:
public void afterPropertiesSet() throws Exception {
....
eventMeshHttpConsumer.subscribe(topicList, url);
// after 10s, update the subscription with new topic
Thread.sleep(10000);
List<SubscriptionItem> topicList = Lists.newArrayList(
new SubscriptionItem("TEST2-TOPIC-HTTP-ASYNC", SubscriptionMode.CLUSTERING, SubscriptionType.SYNC));
eventMeshHttpConsumer.subscribe(topicList, url);
....
}
After running this test, the logs shows no consumer is started for topic TEST2-TOPIC-HTTP-ASYNC
Eventmesh Runtime Log
2022-02-14 15:00:19,434 DEBUG [eventMesh-http-worker-1] http(AbstractHTTPServer.java:381) - httpCommand={REQ,POST/HTTP,requestCode=206,opaque=1,header=subscribeRequestHeader={code=206,language=JAVA,version=V1,env=P,idc=FT,sys=1234,pid=18328,ip=127.0.0.1:51386,username=,passwd=},body=subscribeBody{consumerGroup='EventMeshTest-consumerGroup', url='http://192.168.2.31:8088/sub/test', topics=[SubscriptionItem{topic=TEST-TOPIC-HTTP-ASYNC, mode=CLUSTERING, type=ASYNC}]}}
2022-02-14 15:00:19,589 INFO [pool-11-thread-1] ConsumerManager(ConsumerManager.java:78) - clientInfo check start.....
2022-02-14 15:00:19,608 INFO [eventMesh-clientManage-] http(SubscribeProcessor.java:78) - cmd=SUBSCRIBE|http|client2eventMesh|from=127.0.0.1:51386|to=192.168.2.31
2022-02-14 15:00:19,767 INFO [eventMesh-clientManage-] ConsumerManager(ConsumerManager.java:322) - onChange event:consumerGroupStateEvent={consumerGroup=EventMeshTest-consumerGroup,action=NEW}
2022-02-14 15:00:19,771 INFO [eventMesh-clientManage-] MetaInfExtensionClassLoader(MetaInfExtensionClassLoader.java:79) - load extension class success, extensionType: interface org.apache.eventmesh.api.consumer.Consumer, extensionClass: class org.apache.eventmesh.connector.standalone.consumer.StandaloneConsumerAdaptor
2022-02-14 15:00:19,771 INFO [eventMesh-clientManage-] EventMeshExtensionFactory(EventMeshExtensionFactory.java:101) - initialize extension instance success, extensionType: interface org.apache.eventmesh.api.consumer.Consumer, extensionName: standalone
2022-02-14 15:00:19,771 INFO [eventMesh-clientManage-] EventMeshExtensionFactory(EventMeshExtensionFactory.java:101) - initialize extension instance success, extensionType: interface org.apache.eventmesh.api.consumer.Consumer, extensionName: standalone
2022-02-14 15:00:19,778 INFO [eventMesh-clientManage-] EventMeshConsumer(EventMeshConsumer.java:105) - EventMeshConsumer [EventMeshTest-consumerGroup] inited.............
2022-02-14 15:00:19,783 DEBUG [StandaloneConsumerThread] SubScribeTask(SubScribeTask.java:56) - execute subscribe task, topic: TEST-TOPIC-HTTP-ASYNC, offset: null
2022-02-14 15:00:19,786 DEBUG [eventMesh-clientManage-] http(AbstractHTTPServer.java:429) - httpCommand={RES,POST/HTTP,requestCode=206,opaque=1,cost=534,header=org.apache.eventmesh.common.protocol.http.header.BaseResponseHeader@1679171f,body=org.apache.eventmesh.common.protocol.http.body.BaseResponseBody@47909e95}
2022-02-14 15:00:19,786 DEBUG [EventMesh-http-asyncContext-] http(SubscribeProcessor.java:257) - httpCommand={RES,POST/HTTP,requestCode=206,opaque=1,cost=534,header=org.apache.eventmesh.common.protocol.http.header.BaseResponseHeader@1679171f,body=org.apache.eventmesh.common.protocol.http.body.BaseResponseBody@47909e95}
2022-02-14 15:00:20,585 INFO [eventMesh-metrics-2] httpMonitor(HTTPMetricsServer.java:108) - ===========================================SERVER METRICS==================================================
2022-02-14 15:00:20,585 INFO [eventMesh-metrics-2] httpMonitor(HTTPMetricsServer.java:110) - {"maxHTTPTPS":"1.0","avgHTTPTPS":"0.0","maxHTTPCOST":"562","avgHTTPCOST":"549.5","avgHTTPBodyDecodeCost":"6.0", "httpDiscard":"0"}
2022-02-14 15:00:20,585 INFO [eventMesh-metrics-2] httpMonitor(HTTPMetricsServer.java:119) - {"maxBatchSendMsgTPS":"0.0","avgBatchSendMsgTPS":"0.0", "sum":"0", "sumFail":"0", "sumFailRate":"0.00", "discard":"0"}
2022-02-14 15:00:20,586 INFO [eventMesh-metrics-2] httpMonitor(HTTPMetricsServer.java:129) - {"maxSendMsgTPS":"0.0","avgSendMsgTPS":"0.0", "sum":"0", "sumFail":"0", "sumFailRate":"0.00", "replyMsg":"0", "replyFail":"0"}
2022-02-14 15:00:20,586 INFO [eventMesh-metrics-2] httpMonitor(HTTPMetricsServer.java:140) - {"maxPushMsgTPS":"0.0","avgPushMsgTPS":"0.0", "sum":"0", "sumFail":"0", "sumFailRate":"0.0", "maxClientLatency":"0.0", "avgClientLatency":"0.0"}
2022-02-14 15:00:20,586 INFO [eventMesh-metrics-2] httpMonitor(HTTPMetricsServer.java:151) - {"batchMsgQ":"0","sendMsgQ":"0","pushMsgQ":"0","httpRetryQ":"0"}
2022-02-14 15:00:20,586 INFO [eventMesh-metrics-2] httpMonitor(HTTPMetricsServer.java:157) - {"batchAvgSend2MQCost":"0.0", "avgSend2MQCost":"0.0", "avgReply2MQCost":"0.0"}
2022-02-14 15:00:20,789 DEBUG [StandaloneConsumerThread] SubScribeTask(SubScribeTask.java:56) - execute subscribe task, topic: TEST-TOPIC-HTTP-ASYNC, offset: null
2022-02-14 15:00:21,791 DEBUG [StandaloneConsumerThread] SubScribeTask(SubScribeTask.java:56) - execute subscribe task, topic: TEST-TOPIC-HTTP-ASYNC, offset: null
2022-02-14 15:00:22,792 DEBUG [StandaloneConsumerThread] SubScribeTask(SubScribeTask.java:56) - execute subscribe task, topic: TEST-TOPIC-HTTP-ASYNC, offset: null
2022-02-14 15:00:23,804 DEBUG [StandaloneConsumerThread] SubScribeTask(SubScribeTask.java:56) - execute subscribe task, topic: TEST-TOPIC-HTTP-ASYNC, offset: null
2022-02-14 15:00:24,590 INFO [BatchSpanProcessor_WorkerThread-1] LogExporter(LogExporter.java:61) - 'HTTP POST' : 13edb08ae53aa8fd95ca799e9c93e690 63b98f38a3ad1255 SERVER [tracer: class org.apache.eventmesh.runtime.boot.EventMeshHTTPServer:] AttributesMap{data={http.status_code=206, http.flavor=HTTP, http.method=POST}, capacity=128, totalAddedValues=3}
2022-02-14 15:00:29,589 INFO [pool-11-thread-1] ConsumerManager(ConsumerManager.java:78) - clientInfo check start.....
2022-02-14 15:00:29,883 DEBUG [eventMesh-http-worker-1] http(AbstractHTTPServer.java:381) - httpCommand={REQ,POST/HTTP,requestCode=206,opaque=3,header=subscribeRequestHeader={code=206,language=JAVA,version=V1,env=P,idc=FT,sys=1234,pid=18328,ip=127.0.0.1:51386,username=,passwd=},body=subscribeBody{consumerGroup='EventMeshTest-consumerGroup', url='http://192.168.2.31:8088/sub/test', topics=[SubscriptionItem{topic=TEST2-TOPIC-HTTP-ASYNC, mode=CLUSTERING, type=SYNC}]}}
2022-02-14 15:00:29,884 DEBUG [StandaloneConsumerThread] SubScribeTask(SubScribeTask.java:56) - execute subscribe task, topic: TEST-TOPIC-HTTP-ASYNC, offset: null
2022-02-14 15:00:30,094 INFO [eventMesh-clientManage-] http(SubscribeProcessor.java:78) - cmd=SUBSCRIBE|http|client2eventMesh|from=127.0.0.1:51386|to=192.168.2.31
2022-02-14 15:00:30,302 DEBUG [eventMesh-clientManage-] http(AbstractHTTPServer.java:429) - httpCommand={RES,POST/HTTP,requestCode=206,opaque=3,cost=420,header=org.apache.eventmesh.common.protocol.http.header.BaseResponseHeader@4206e247,body=org.apache.eventmesh.common.protocol.http.body.BaseResponseBody@46396edf}
2022-02-14 15:00:30,302 DEBUG [EventMesh-http-asyncContext-] http(SubscribeProcessor.java:257) - httpCommand={RES,POST/HTTP,requestCode=206,opaque=3,cost=420,header=org.apache.eventmesh.common.protocol.http.header.BaseResponseHeader@4206e247,body=org.apache.eventmesh.common.protocol.http.body.BaseResponseBody@46396edf}
2022-02-14 15:00:30,900 DEBUG [StandaloneConsumerThread] SubScribeTask(SubScribeTask.java:56) - execute subscribe task, topic: TEST-TOPIC-HTTP-ASYNC, offset: null
2022-02-14 15:00:31,912 DEBUG [StandaloneConsumerThread] SubScribeTask(SubScribeTask.java:56) - execute subscribe task, topic: TEST-TOPIC-HTTP-ASYNC, offset: null
2022-02-14 15:00:32,925 DEBUG [StandaloneConsumerThread] SubScribeTask(SubScribeTask.java:56) - execute subscribe task, topic: TEST-TOPIC-HTTP-ASYNC, offset: null
2022-02-14 15:00:33,937 DEBUG [StandaloneConsumerThread] SubScribeTask(SubScribeTask.java:56) - execute subscribe task, topic: TEST-TOPIC-HTTP-ASYNC, offset: null
Root Cause Analysis
The Root cause of the issue is the org.apache.eventmesh.runtime.core.protocol.http.consumer package, ConsumerManager.notifyConsumerManager() method
- when adding a new
latestConsumerGroupConfigobject toConsumerManager.consumerTable, the object reference is used. So thelatestConsumerGroupConfighas the same reference as the one insideConsumerManager.consumerTable
if (cgm == null) {
ConsumerGroupStateEvent notification = new ConsumerGroupStateEvent();
notification.action = ConsumerGroupStateEvent.ConsumerGroupStateAction.NEW;
notification.consumerGroup = consumerGroup;
notification.consumerGroupConfig = latestConsumerGroupConfig;
eventMeshHTTPServer.getEventBus().post(notification);
return;
}
- When we update the subscription,
latestConsumerGroupConfigis updated, as the same object insideConsumerManager.consumerTableis also updated (same object reference)
So latestConsumerGroupConfig.equals(cgm.getConsumerGroupConfig() is always TRUE.
and the following code is never executed
if (!latestConsumerGroupConfig.equals(cgm.getConsumerGroupConfig())) {
ConsumerGroupStateEvent notification = new ConsumerGroupStateEvent();
notification.action = ConsumerGroupStateEvent.ConsumerGroupStateAction.CHANGE;
notification.consumerGroup = consumerGroup;
notification.consumerGroupConfig = latestConsumerGroupConfig;
eventMeshHTTPServer.getEventBus().post(notification);
return;
}
Solution
We need to pass a new cloned object in notifyConsumerManager(), So that the object comparison is taking effect.
Are you willing to submit PR?
- Yes I am willing to submit a PR!