Skip to content

[Bug] HTTP Subscriber: Updating subscription information is not taking effect #768

@jinrongluo

Description

@jinrongluo

Search before asking

  • I had searched in the issues and found no similar issues.

Environment

Window

EventMesh version

master

What happened

In HTTP subscription, the changes in subscription will not trigger the ConsumerGroupManager to refresh. So the updates of the subscription is not taking effect.

The ConsumerGroupManager still using the old subscription even it is already updated.

How to reproduce

Run test using the http subscriber sample in eventmesh-examples -> Http.deom.sub package, with the following changes:

public void afterPropertiesSet() throws Exception {

  ....

 eventMeshHttpConsumer.subscribe(topicList, url);

 // after 10s, update the subscription with new topic
  Thread.sleep(10000);

  List<SubscriptionItem> topicList = Lists.newArrayList(
            new SubscriptionItem("TEST2-TOPIC-HTTP-ASYNC", SubscriptionMode.CLUSTERING, SubscriptionType.SYNC));
   eventMeshHttpConsumer.subscribe(topicList, url);

   ....
}

After running this test, the logs shows no consumer is started for topic TEST2-TOPIC-HTTP-ASYNC

Eventmesh Runtime Log


2022-02-14 15:00:19,434 DEBUG [eventMesh-http-worker-1] http(AbstractHTTPServer.java:381) - httpCommand={REQ,POST/HTTP,requestCode=206,opaque=1,header=subscribeRequestHeader={code=206,language=JAVA,version=V1,env=P,idc=FT,sys=1234,pid=18328,ip=127.0.0.1:51386,username=,passwd=},body=subscribeBody{consumerGroup='EventMeshTest-consumerGroup', url='http://192.168.2.31:8088/sub/test', topics=[SubscriptionItem{topic=TEST-TOPIC-HTTP-ASYNC, mode=CLUSTERING, type=ASYNC}]}}
2022-02-14 15:00:19,589 INFO  [pool-11-thread-1] ConsumerManager(ConsumerManager.java:78) - clientInfo check start.....
2022-02-14 15:00:19,608 INFO  [eventMesh-clientManage-] http(SubscribeProcessor.java:78) - cmd=SUBSCRIBE|http|client2eventMesh|from=127.0.0.1:51386|to=192.168.2.31
2022-02-14 15:00:19,767 INFO  [eventMesh-clientManage-] ConsumerManager(ConsumerManager.java:322) - onChange event:consumerGroupStateEvent={consumerGroup=EventMeshTest-consumerGroup,action=NEW}
2022-02-14 15:00:19,771 INFO  [eventMesh-clientManage-] MetaInfExtensionClassLoader(MetaInfExtensionClassLoader.java:79) - load extension class success, extensionType: interface org.apache.eventmesh.api.consumer.Consumer, extensionClass: class org.apache.eventmesh.connector.standalone.consumer.StandaloneConsumerAdaptor
2022-02-14 15:00:19,771 INFO  [eventMesh-clientManage-] EventMeshExtensionFactory(EventMeshExtensionFactory.java:101) - initialize extension instance success, extensionType: interface org.apache.eventmesh.api.consumer.Consumer, extensionName: standalone
2022-02-14 15:00:19,771 INFO  [eventMesh-clientManage-] EventMeshExtensionFactory(EventMeshExtensionFactory.java:101) - initialize extension instance success, extensionType: interface org.apache.eventmesh.api.consumer.Consumer, extensionName: standalone
2022-02-14 15:00:19,778 INFO  [eventMesh-clientManage-] EventMeshConsumer(EventMeshConsumer.java:105) - EventMeshConsumer [EventMeshTest-consumerGroup] inited.............
2022-02-14 15:00:19,783 DEBUG [StandaloneConsumerThread] SubScribeTask(SubScribeTask.java:56) - execute subscribe task, topic: TEST-TOPIC-HTTP-ASYNC, offset: null
2022-02-14 15:00:19,786 DEBUG [eventMesh-clientManage-] http(AbstractHTTPServer.java:429) - httpCommand={RES,POST/HTTP,requestCode=206,opaque=1,cost=534,header=org.apache.eventmesh.common.protocol.http.header.BaseResponseHeader@1679171f,body=org.apache.eventmesh.common.protocol.http.body.BaseResponseBody@47909e95}
2022-02-14 15:00:19,786 DEBUG [EventMesh-http-asyncContext-] http(SubscribeProcessor.java:257) - httpCommand={RES,POST/HTTP,requestCode=206,opaque=1,cost=534,header=org.apache.eventmesh.common.protocol.http.header.BaseResponseHeader@1679171f,body=org.apache.eventmesh.common.protocol.http.body.BaseResponseBody@47909e95}
2022-02-14 15:00:20,585 INFO  [eventMesh-metrics-2] httpMonitor(HTTPMetricsServer.java:108) - ===========================================SERVER METRICS==================================================
2022-02-14 15:00:20,585 INFO  [eventMesh-metrics-2] httpMonitor(HTTPMetricsServer.java:110) - {"maxHTTPTPS":"1.0","avgHTTPTPS":"0.0","maxHTTPCOST":"562","avgHTTPCOST":"549.5","avgHTTPBodyDecodeCost":"6.0", "httpDiscard":"0"}
2022-02-14 15:00:20,585 INFO  [eventMesh-metrics-2] httpMonitor(HTTPMetricsServer.java:119) - {"maxBatchSendMsgTPS":"0.0","avgBatchSendMsgTPS":"0.0", "sum":"0", "sumFail":"0", "sumFailRate":"0.00", "discard":"0"}
2022-02-14 15:00:20,586 INFO  [eventMesh-metrics-2] httpMonitor(HTTPMetricsServer.java:129) - {"maxSendMsgTPS":"0.0","avgSendMsgTPS":"0.0", "sum":"0", "sumFail":"0", "sumFailRate":"0.00", "replyMsg":"0", "replyFail":"0"}
2022-02-14 15:00:20,586 INFO  [eventMesh-metrics-2] httpMonitor(HTTPMetricsServer.java:140) - {"maxPushMsgTPS":"0.0","avgPushMsgTPS":"0.0", "sum":"0", "sumFail":"0", "sumFailRate":"0.0", "maxClientLatency":"0.0", "avgClientLatency":"0.0"}
2022-02-14 15:00:20,586 INFO  [eventMesh-metrics-2] httpMonitor(HTTPMetricsServer.java:151) - {"batchMsgQ":"0","sendMsgQ":"0","pushMsgQ":"0","httpRetryQ":"0"}
2022-02-14 15:00:20,586 INFO  [eventMesh-metrics-2] httpMonitor(HTTPMetricsServer.java:157) - {"batchAvgSend2MQCost":"0.0", "avgSend2MQCost":"0.0", "avgReply2MQCost":"0.0"}
2022-02-14 15:00:20,789 DEBUG [StandaloneConsumerThread] SubScribeTask(SubScribeTask.java:56) - execute subscribe task, topic: TEST-TOPIC-HTTP-ASYNC, offset: null
2022-02-14 15:00:21,791 DEBUG [StandaloneConsumerThread] SubScribeTask(SubScribeTask.java:56) - execute subscribe task, topic: TEST-TOPIC-HTTP-ASYNC, offset: null
2022-02-14 15:00:22,792 DEBUG [StandaloneConsumerThread] SubScribeTask(SubScribeTask.java:56) - execute subscribe task, topic: TEST-TOPIC-HTTP-ASYNC, offset: null
2022-02-14 15:00:23,804 DEBUG [StandaloneConsumerThread] SubScribeTask(SubScribeTask.java:56) - execute subscribe task, topic: TEST-TOPIC-HTTP-ASYNC, offset: null
2022-02-14 15:00:24,590 INFO  [BatchSpanProcessor_WorkerThread-1] LogExporter(LogExporter.java:61) - 'HTTP POST' : 13edb08ae53aa8fd95ca799e9c93e690 63b98f38a3ad1255 SERVER [tracer: class org.apache.eventmesh.runtime.boot.EventMeshHTTPServer:] AttributesMap{data={http.status_code=206, http.flavor=HTTP, http.method=POST}, capacity=128, totalAddedValues=3}


2022-02-14 15:00:29,589 INFO  [pool-11-thread-1] ConsumerManager(ConsumerManager.java:78) - clientInfo check start.....
2022-02-14 15:00:29,883 DEBUG [eventMesh-http-worker-1] http(AbstractHTTPServer.java:381) - httpCommand={REQ,POST/HTTP,requestCode=206,opaque=3,header=subscribeRequestHeader={code=206,language=JAVA,version=V1,env=P,idc=FT,sys=1234,pid=18328,ip=127.0.0.1:51386,username=,passwd=},body=subscribeBody{consumerGroup='EventMeshTest-consumerGroup', url='http://192.168.2.31:8088/sub/test', topics=[SubscriptionItem{topic=TEST2-TOPIC-HTTP-ASYNC, mode=CLUSTERING, type=SYNC}]}}
2022-02-14 15:00:29,884 DEBUG [StandaloneConsumerThread] SubScribeTask(SubScribeTask.java:56) - execute subscribe task, topic: TEST-TOPIC-HTTP-ASYNC, offset: null
2022-02-14 15:00:30,094 INFO  [eventMesh-clientManage-] http(SubscribeProcessor.java:78) - cmd=SUBSCRIBE|http|client2eventMesh|from=127.0.0.1:51386|to=192.168.2.31
2022-02-14 15:00:30,302 DEBUG [eventMesh-clientManage-] http(AbstractHTTPServer.java:429) - httpCommand={RES,POST/HTTP,requestCode=206,opaque=3,cost=420,header=org.apache.eventmesh.common.protocol.http.header.BaseResponseHeader@4206e247,body=org.apache.eventmesh.common.protocol.http.body.BaseResponseBody@46396edf}
2022-02-14 15:00:30,302 DEBUG [EventMesh-http-asyncContext-] http(SubscribeProcessor.java:257) - httpCommand={RES,POST/HTTP,requestCode=206,opaque=3,cost=420,header=org.apache.eventmesh.common.protocol.http.header.BaseResponseHeader@4206e247,body=org.apache.eventmesh.common.protocol.http.body.BaseResponseBody@46396edf}
2022-02-14 15:00:30,900 DEBUG [StandaloneConsumerThread] SubScribeTask(SubScribeTask.java:56) - execute subscribe task, topic: TEST-TOPIC-HTTP-ASYNC, offset: null
2022-02-14 15:00:31,912 DEBUG [StandaloneConsumerThread] SubScribeTask(SubScribeTask.java:56) - execute subscribe task, topic: TEST-TOPIC-HTTP-ASYNC, offset: null
2022-02-14 15:00:32,925 DEBUG [StandaloneConsumerThread] SubScribeTask(SubScribeTask.java:56) - execute subscribe task, topic: TEST-TOPIC-HTTP-ASYNC, offset: null
2022-02-14 15:00:33,937 DEBUG [StandaloneConsumerThread] SubScribeTask(SubScribeTask.java:56) - execute subscribe task, topic: TEST-TOPIC-HTTP-ASYNC, offset: null

Root Cause Analysis

The Root cause of the issue is the org.apache.eventmesh.runtime.core.protocol.http.consumer package, ConsumerManager.notifyConsumerManager() method

  1. when adding a new latestConsumerGroupConfig object to ConsumerManager.consumerTable, the object reference is used. So the latestConsumerGroupConfig has the same reference as the one inside ConsumerManager.consumerTable
 if (cgm == null) {
            ConsumerGroupStateEvent notification = new ConsumerGroupStateEvent();
            notification.action = ConsumerGroupStateEvent.ConsumerGroupStateAction.NEW;
            notification.consumerGroup = consumerGroup;
            notification.consumerGroupConfig = latestConsumerGroupConfig;
            eventMeshHTTPServer.getEventBus().post(notification);
            return;
        }
  1. When we update the subscription, latestConsumerGroupConfig is updated, as the same object inside ConsumerManager.consumerTable is also updated (same object reference)

So latestConsumerGroupConfig.equals(cgm.getConsumerGroupConfig() is always TRUE.

and the following code is never executed

 if (!latestConsumerGroupConfig.equals(cgm.getConsumerGroupConfig())) {
            ConsumerGroupStateEvent notification = new ConsumerGroupStateEvent();
            notification.action = ConsumerGroupStateEvent.ConsumerGroupStateAction.CHANGE;
            notification.consumerGroup = consumerGroup;
            notification.consumerGroupConfig = latestConsumerGroupConfig;
            eventMeshHTTPServer.getEventBus().post(notification);
            return;
     }

Solution

We need to pass a new cloned object in notifyConsumerManager(), So that the object comparison is taking effect.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions