-
Notifications
You must be signed in to change notification settings - Fork 641
Description
I am testing the Http LiteConsumer of EventMesh using Eventmesh-Test project.
Problem Description
I modify the org.apache.eventmesh.http.demo.sub.service.SubService.java to perform a serial of Subscribe / unsubscribe operations, as the following:
liteConsumer = new LiteConsumer(eventMeshClientConfig);
liteConsumer.start();
liteConsumer.heartBeat(topicList, url);
for(int i = 0; i < 10; i++) {
liteConsumer.subscribe(topicList, url);
Thread.sleep(3000);
liteConsumer.unsubscribe(topicList, url);
Thread.sleep(3000);
}
I start the org.apache.eventmesh.http.demo.sub.SpringBootDemoApplication.java for the testing. During the SubService run, I observe the following exception in the EventMesh runtime log:
2021-05-11 13:51:00,848 DEBUG [eventMesh-http-worker-1] http(AbrstractHTTPServer.java:287) - httpCommand={REQ,POST/HTTP,requestCode=203,opaque=73,header=heartbeatRequestHeader={code=203,language=JAVA,version=V1,env=P,region=,idc=FT,dcn=FT0,sys=1234,pid=0,ip=127.0.0.1:60141,username=userName,passwd=password},body=heartbeatRequestBody={clientType=SUB}}
2021-05-11 13:51:00,855 DEBUG [eventMesh-http-worker-1] http(AbrstractHTTPServer.java:287) - httpCommand={REQ,POST/HTTP,requestCode=206,opaque=74,header=subscribeRequestHeader={code=206,language=JAVA,version=V1,env=P,region=,idc=FT,dcn=FT0,sys=1234,pid=0,ip=127.0.0.1:60141,username=userName,passwd=password},body=subscribeBody{url='http://10.48.245.117:8088/sub/test', topics=[FT0-e-80010001-01-1]}}
2021-05-11 13:51:01,078 INFO [eventMesh-clientmanage-2] http(SubscribeProcessor.java:67) - cmd=SUBSCRIBE|http|client2eventMesh|from=127.0.0.1:60141|to=10.48.245.117
2021-05-11 13:51:01,080 INFO [eventMesh-clientmanage-1] http(HeartBeatProcessor.java:62) - cmd=HEARTBEAT|http|client2eventMesh|from=127.0.0.1:60141|to=10.48.245.117
2021-05-11 13:51:01,315 ERROR [eventMesh-clientmanage-2] http(SubscribeProcessor.java:116) - group 1234-FT0 topic FT0-e-80010001-01-1 clients is empty
2021-05-11 13:51:01,315 ERROR [eventMesh-clientmanage-2] EventMeshHTTPServer(AbrstractHTTPServer.java:333) - process error
java.lang.NullPointerException: null
at org.apache.eventmesh.runtime.core.protocol.http.processor.SubscribeProcessor.processRequest(SubscribeProcessor.java:120) ~[eventmesh-runtime-1.2.0-SNAPSHOT.jar:1.2.0-SNAPSHOT]
at org.apache.eventmesh.runtime.boot.AbrstractHTTPServer$HTTPHandler.lambda$processEventMeshRequest$1(AbrstractHTTPServer.java:320) ~[eventmesh-runtime-1.2.0-SNAPSHOT.jar:1.2.0-SNAPSHOT]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_261]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_261]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_261]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_261]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_261]
2021-05-11 13:51:01,315 DEBUG [eventMesh-clientmanage-1] http(AbrstractHTTPServer.java:328) - httpCommand={RES,POST/HTTP,requestCode=203,opaque=73,cost=468,header=org.apache.eventmesh.common.protocol.http.header.BaseResponseHeader@185e56de,body=org.apache.eventmesh.common.protocol.http.body.BaseResponseBody@71864bc6}
2021-05-11 13:51:01,315 DEBUG [eventMesh-http-asyncContext-7] http(HeartBeatProcessor.java:164) - httpCommand={RES,POST/HTTP,requestCode=203,opaque=73,cost=468,header=org.apache.eventmesh.common.protocol.http.header.BaseResponseHeader@185e56de,body=org.apache.eventmesh.common.protocol.http.body.BaseResponseBody@71864bc6}
2021-05-11 13:51:01,376 INFO [eventMesh-tcp-scheduler-1] appMonitor(EventMeshTcpMonitor.java:143) - {"protocol":"tcp","s":"retryQueueSize","t":"0"}
2021-05-11 13:51:02,376 INFO [eventMesh-tcp-scheduler-5] appMonitor(EventMeshTcpMonitor.java:143) - {"protocol":"tcp","s":"retryQueueSize","t":"0"}
2021-05-11 13:51:03,378 INFO [eventMesh-tcp-scheduler-4] appMonitor(EventMeshTcpMonitor.java:143) - {"protocol":"tcp","s":"retryQueueSize","t":"0"}
2021-05-11 13:51:03,862 DEBUG [eventMesh-http-worker-1] http(AbrstractHTTPServer.java:287) - httpCommand={REQ,POST/HTTP,requestCode=203,opaque=76,header=heartbeatRequestHeader={code=203,language=JAVA,version=V1,env=P,region=,idc=FT,dcn=FT0,sys=1234,pid=0,ip=127.0.0.1:60141,username=userName,passwd=password},body=heartbeatRequestBody={clientType=SUB}}
2021-05-11 13:51:03,868 DEBUG [eventMesh-http-worker-1] http(AbrstractHTTPServer.java:287) - httpCommand={REQ,POST/HTTP,requestCode=207,opaque=77,header=subscribeRequestHeader={code=207,language=JAVA,version=V1,env=P,region=,idc=FT,dcn=FT0,sys=1234,pid=0,ip=127.0.0.1:60141,username=userName,passwd=password},body=unSubscribeRequestBody{url='http://10.48.245.117:8088/sub/test', topics=[FT0-e-80010001-01-1]}}
2021-05-11 13:51:04,043 INFO [eventMesh-clientmanage-3] http(HeartBeatProcessor.java:62) - cmd=HEARTBEAT|http|client2eventMesh|from=127.0.0.1:60141|to=10.48.245.117
2021-05-11 13:51:04,047 INFO [eventMesh-clientmanage-4] http(UnSubscribeProcessor.java:71) - cmd=UNSUBSCRIBE|http|client2eventMesh|from=127.0.0.1:60141|to=10.48.245.117
2021-05-11 13:51:04,242 DEBUG [eventMesh-clientmanage-3] http(AbrstractHTTPServer.java:328) - httpCommand={RES,POST/HTTP,requestCode=203,opaque=76,cost=380,header=org.apache.eventmesh.common.protocol.http.header.BaseResponseHeader@2f95daec,body=org.apache.eventmesh.common.protocol.http.body.BaseResponseBody@74bd4bb1}
2021-05-11 13:51:04,242 DEBUG [eventMesh-http-asyncContext-8] http(HeartBeatProcessor.java:164) - httpCommand={RES,POST/HTTP,requestCode=203,opaque=76,cost=380,header=org.apache.eventmesh.common.protocol.http.header.BaseResponseHeader@2f95daec,body=org.apache.eventmesh.common.protocol.http.body.BaseResponseBody@74bd4bb1}
2021-05-11 13:51:04,247 WARN [eventMesh-clientmanage-4] http(UnSubscribeProcessor.java:140) - client {"consumerGroup":"1234-FT0","dcn":"FT0","env":"P","idc":"FT","ip":"127.0.0.1:60141","lastUpTime":1620755464242,"pid":"0","sys":"1234","topic":"FT0-e-80010001-01-1","url":"http://10.48.245.117:8088/sub/test"} start unsubscribe
2021-05-11 13:51:04,247 INFO [eventMesh-clientmanage-4] ConsumerManager(ConsumerManager.java:280) - onChange event:consumerGroupStateEvent={consumerGroup=1234-FT0,action=NEW}
2021-05-11 13:51:04,247 ERROR [eventMesh-clientmanage-4] ConsumerManager(ConsumerManager.java:296) - onChange event:consumerGroupStateEvent={consumerGroup=1234-FT0,action=NEW} err
java.lang.NullPointerException: null
at org.apache.eventmesh.runtime.core.protocol.http.push.HTTPMessageHandler.<init>(HTTPMessageHandler.java:63) ~[eventmesh-runtime-1.2.0-SNAPSHOT.jar:1.2.0-SNAPSHOT]
at org.apache.eventmesh.runtime.core.protocol.http.consumer.EventMeshConsumer.init(EventMeshConsumer.java:78) ~[eventmesh-runtime-1.2.0-SNAPSHOT.jar:1.2.0-SNAPSHOT]
at org.apache.eventmesh.runtime.core.protocol.http.consumer.ConsumerGroupManager.init(ConsumerGroupManager.java:44) ~[eventmesh-runtime-1.2.0-SNAPSHOT.jar:1.2.0-SNAPSHOT]
at org.apache.eventmesh.runtime.core.protocol.http.consumer.ConsumerManager.addConsumer(ConsumerManager.java:211) ~[eventmesh-runtime-1.2.0-SNAPSHOT.jar:1.2.0-SNAPSHOT]
at org.apache.eventmesh.runtime.core.protocol.http.consumer.ConsumerManager.onChange(ConsumerManager.java:282) ~[eventmesh-runtime-1.2.0-SNAPSHOT.jar:1.2.0-SNAPSHOT]
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source) ~[?:?]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_261]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_261]
at com.google.common.eventbus.Subscriber.invokeSubscriberMethod(Subscriber.java:87) ~[guava-29.0-jre.jar:?]
at com.google.common.eventbus.Subscriber$SynchronizedSubscriber.invokeSubscriberMethod(Subscriber.java:144) ~[guava-29.0-jre.jar:?]
at com.google.common.eventbus.Subscriber$1.run(Subscriber.java:72) ~[guava-29.0-jre.jar:?]
at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30) ~[guava-29.0-jre.jar:?]
at com.google.common.eventbus.Subscriber.dispatchEvent(Subscriber.java:67) ~[guava-29.0-jre.jar:?]
at com.google.common.eventbus.Dispatcher$PerThreadQueuedDispatcher.dispatch(Dispatcher.java:108) ~[guava-29.0-jre.jar:?]
at com.google.common.eventbus.EventBus.post(EventBus.java:212) ~[guava-29.0-jre.jar:?]
at org.apache.eventmesh.runtime.core.protocol.http.consumer.ConsumerManager.notifyConsumerManager(ConsumerManager.java:164) ~[eventmesh-runtime-1.2.0-SNAPSHOT.jar:1.2.0-SNAPSHOT]
at org.apache.eventmesh.runtime.core.protocol.http.processor.UnSubscribeProcessor.processRequest(UnSubscribeProcessor.java:212) ~[eventmesh-runtime-1.2.0-SNAPSHOT.jar:1.2.0-SNAPSHOT]
at org.apache.eventmesh.runtime.boot.AbrstractHTTPServer$HTTPHandler.lambda$processEventMeshRequest$1(AbrstractHTTPServer.java:320) ~[eventmesh-runtime-1.2.0-SNAPSHOT.jar:1.2.0-SNAPSHOT]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_261]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_261]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_261]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_261]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_261]
2021-05-11 13:51:04,248 DEBUG [eventMesh-http-asyncContext-9] http(UnSubscribeProcessor.java:123) - httpCommand={RES,POST/HTTP,requestCode=207,opaque=77,cost=381,header=org.apache.eventmesh.common.protocol.http.header.BaseResponseHeader@52dcd19,body=org.apache.eventmesh.common.protocol.http.body.BaseResponseBody@1125964}
2021-05-11 13:51:04,248 DEBUG [eventMesh-clientmanage-4] http(AbrstractHTTPServer.java:328) - httpCommand={RES,POST/HTTP,requestCode=207,opaque=77,cost=381,header=org.apache.eventmesh.common.protocol.http.header.BaseResponseHeader@52dcd19,body=org.apache.eventmesh.common.protocol.http.body.BaseResponseBody@1125964}
Root Cause Analysis:
-
The LiteConsumer.subscribe client API involves two HTTP POST requests to EventMesh Runtime: Heartbeat and Subscribe.
-
Heartbeat request is realized by HeartbeatProcessor, which will register the client to
eventMeshHttpServer.localClientInfoMapping(https://github.com/apache/incubator-eventmesh/blob/develop/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java#L132-L154) -
Subscribe request is realized by SubscribProcessor, which get the client from
eventMeshHttpServer.localClientInfoMappingand process the subscription against. (https://github.com/apache/incubator-eventmesh/blob/develop/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java#L109-L117) -
The problem is Heartbeat request and Subscribe request are handled by two separate Worker Threads inside EventMesh runtime. And there is a chance that Subscribe request is handled before Heartbeat request, in which the Client is not registered yet and hence NullPointerException is thrown in
SubscribeProcessoras in the logs above. -
Therefore there is a racing condition between worker threads handling Heartbeat request and Subscriber request. The same racing condition issue also observed in Unsubscribe Client API.
Fix Suggestion
As part of SubscribeProcessor.java logic, it should handle new Client registration if not yet done so (or removed due to heartbeat expired). Subscribe Client API should only invoke Subscribe HTTP POST request, not combining with Heartbeat request.
Same fix suggestion goes to Unsubscribe Client API.