Skip to content

Racing Condition Observed after multiple subscribe / unsubscribe calls #344

@jinrongluo

Description

@jinrongluo

I am testing the Http LiteConsumer of EventMesh using Eventmesh-Test project.

Problem Description

I modify the org.apache.eventmesh.http.demo.sub.service.SubService.java to perform a serial of Subscribe / unsubscribe operations, as the following:

        liteConsumer = new LiteConsumer(eventMeshClientConfig);
        liteConsumer.start();
        liteConsumer.heartBeat(topicList, url);

        for(int i = 0; i < 10; i++) {
            liteConsumer.subscribe(topicList, url);
            Thread.sleep(3000);
            liteConsumer.unsubscribe(topicList, url);
            Thread.sleep(3000);
        }

I start the org.apache.eventmesh.http.demo.sub.SpringBootDemoApplication.java for the testing. During the SubService run, I observe the following exception in the EventMesh runtime log:

2021-05-11 13:51:00,848 DEBUG [eventMesh-http-worker-1] http(AbrstractHTTPServer.java:287) - httpCommand={REQ,POST/HTTP,requestCode=203,opaque=73,header=heartbeatRequestHeader={code=203,language=JAVA,version=V1,env=P,region=,idc=FT,dcn=FT0,sys=1234,pid=0,ip=127.0.0.1:60141,username=userName,passwd=password},body=heartbeatRequestBody={clientType=SUB}}
2021-05-11 13:51:00,855 DEBUG [eventMesh-http-worker-1] http(AbrstractHTTPServer.java:287) - httpCommand={REQ,POST/HTTP,requestCode=206,opaque=74,header=subscribeRequestHeader={code=206,language=JAVA,version=V1,env=P,region=,idc=FT,dcn=FT0,sys=1234,pid=0,ip=127.0.0.1:60141,username=userName,passwd=password},body=subscribeBody{url='http://10.48.245.117:8088/sub/test', topics=[FT0-e-80010001-01-1]}}
2021-05-11 13:51:01,078 INFO  [eventMesh-clientmanage-2] http(SubscribeProcessor.java:67) - cmd=SUBSCRIBE|http|client2eventMesh|from=127.0.0.1:60141|to=10.48.245.117
2021-05-11 13:51:01,080 INFO  [eventMesh-clientmanage-1] http(HeartBeatProcessor.java:62) - cmd=HEARTBEAT|http|client2eventMesh|from=127.0.0.1:60141|to=10.48.245.117
2021-05-11 13:51:01,315 ERROR [eventMesh-clientmanage-2] http(SubscribeProcessor.java:116) - group 1234-FT0 topic FT0-e-80010001-01-1 clients is empty
2021-05-11 13:51:01,315 ERROR [eventMesh-clientmanage-2] EventMeshHTTPServer(AbrstractHTTPServer.java:333) - process error
java.lang.NullPointerException: null
	at org.apache.eventmesh.runtime.core.protocol.http.processor.SubscribeProcessor.processRequest(SubscribeProcessor.java:120) ~[eventmesh-runtime-1.2.0-SNAPSHOT.jar:1.2.0-SNAPSHOT]
	at org.apache.eventmesh.runtime.boot.AbrstractHTTPServer$HTTPHandler.lambda$processEventMeshRequest$1(AbrstractHTTPServer.java:320) ~[eventmesh-runtime-1.2.0-SNAPSHOT.jar:1.2.0-SNAPSHOT]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_261]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_261]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_261]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_261]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_261]
2021-05-11 13:51:01,315 DEBUG [eventMesh-clientmanage-1] http(AbrstractHTTPServer.java:328) - httpCommand={RES,POST/HTTP,requestCode=203,opaque=73,cost=468,header=org.apache.eventmesh.common.protocol.http.header.BaseResponseHeader@185e56de,body=org.apache.eventmesh.common.protocol.http.body.BaseResponseBody@71864bc6}
2021-05-11 13:51:01,315 DEBUG [eventMesh-http-asyncContext-7] http(HeartBeatProcessor.java:164) - httpCommand={RES,POST/HTTP,requestCode=203,opaque=73,cost=468,header=org.apache.eventmesh.common.protocol.http.header.BaseResponseHeader@185e56de,body=org.apache.eventmesh.common.protocol.http.body.BaseResponseBody@71864bc6}
2021-05-11 13:51:01,376 INFO  [eventMesh-tcp-scheduler-1] appMonitor(EventMeshTcpMonitor.java:143) - {"protocol":"tcp","s":"retryQueueSize","t":"0"}
2021-05-11 13:51:02,376 INFO  [eventMesh-tcp-scheduler-5] appMonitor(EventMeshTcpMonitor.java:143) - {"protocol":"tcp","s":"retryQueueSize","t":"0"}
2021-05-11 13:51:03,378 INFO  [eventMesh-tcp-scheduler-4] appMonitor(EventMeshTcpMonitor.java:143) - {"protocol":"tcp","s":"retryQueueSize","t":"0"}
2021-05-11 13:51:03,862 DEBUG [eventMesh-http-worker-1] http(AbrstractHTTPServer.java:287) - httpCommand={REQ,POST/HTTP,requestCode=203,opaque=76,header=heartbeatRequestHeader={code=203,language=JAVA,version=V1,env=P,region=,idc=FT,dcn=FT0,sys=1234,pid=0,ip=127.0.0.1:60141,username=userName,passwd=password},body=heartbeatRequestBody={clientType=SUB}}
2021-05-11 13:51:03,868 DEBUG [eventMesh-http-worker-1] http(AbrstractHTTPServer.java:287) - httpCommand={REQ,POST/HTTP,requestCode=207,opaque=77,header=subscribeRequestHeader={code=207,language=JAVA,version=V1,env=P,region=,idc=FT,dcn=FT0,sys=1234,pid=0,ip=127.0.0.1:60141,username=userName,passwd=password},body=unSubscribeRequestBody{url='http://10.48.245.117:8088/sub/test', topics=[FT0-e-80010001-01-1]}}
2021-05-11 13:51:04,043 INFO  [eventMesh-clientmanage-3] http(HeartBeatProcessor.java:62) - cmd=HEARTBEAT|http|client2eventMesh|from=127.0.0.1:60141|to=10.48.245.117
2021-05-11 13:51:04,047 INFO  [eventMesh-clientmanage-4] http(UnSubscribeProcessor.java:71) - cmd=UNSUBSCRIBE|http|client2eventMesh|from=127.0.0.1:60141|to=10.48.245.117
2021-05-11 13:51:04,242 DEBUG [eventMesh-clientmanage-3] http(AbrstractHTTPServer.java:328) - httpCommand={RES,POST/HTTP,requestCode=203,opaque=76,cost=380,header=org.apache.eventmesh.common.protocol.http.header.BaseResponseHeader@2f95daec,body=org.apache.eventmesh.common.protocol.http.body.BaseResponseBody@74bd4bb1}
2021-05-11 13:51:04,242 DEBUG [eventMesh-http-asyncContext-8] http(HeartBeatProcessor.java:164) - httpCommand={RES,POST/HTTP,requestCode=203,opaque=76,cost=380,header=org.apache.eventmesh.common.protocol.http.header.BaseResponseHeader@2f95daec,body=org.apache.eventmesh.common.protocol.http.body.BaseResponseBody@74bd4bb1}
2021-05-11 13:51:04,247 WARN  [eventMesh-clientmanage-4] http(UnSubscribeProcessor.java:140) - client {"consumerGroup":"1234-FT0","dcn":"FT0","env":"P","idc":"FT","ip":"127.0.0.1:60141","lastUpTime":1620755464242,"pid":"0","sys":"1234","topic":"FT0-e-80010001-01-1","url":"http://10.48.245.117:8088/sub/test"} start unsubscribe
2021-05-11 13:51:04,247 INFO  [eventMesh-clientmanage-4] ConsumerManager(ConsumerManager.java:280) - onChange event:consumerGroupStateEvent={consumerGroup=1234-FT0,action=NEW}
2021-05-11 13:51:04,247 ERROR [eventMesh-clientmanage-4] ConsumerManager(ConsumerManager.java:296) - onChange event:consumerGroupStateEvent={consumerGroup=1234-FT0,action=NEW} err
java.lang.NullPointerException: null
	at org.apache.eventmesh.runtime.core.protocol.http.push.HTTPMessageHandler.<init>(HTTPMessageHandler.java:63) ~[eventmesh-runtime-1.2.0-SNAPSHOT.jar:1.2.0-SNAPSHOT]
	at org.apache.eventmesh.runtime.core.protocol.http.consumer.EventMeshConsumer.init(EventMeshConsumer.java:78) ~[eventmesh-runtime-1.2.0-SNAPSHOT.jar:1.2.0-SNAPSHOT]
	at org.apache.eventmesh.runtime.core.protocol.http.consumer.ConsumerGroupManager.init(ConsumerGroupManager.java:44) ~[eventmesh-runtime-1.2.0-SNAPSHOT.jar:1.2.0-SNAPSHOT]
	at org.apache.eventmesh.runtime.core.protocol.http.consumer.ConsumerManager.addConsumer(ConsumerManager.java:211) ~[eventmesh-runtime-1.2.0-SNAPSHOT.jar:1.2.0-SNAPSHOT]
	at org.apache.eventmesh.runtime.core.protocol.http.consumer.ConsumerManager.onChange(ConsumerManager.java:282) ~[eventmesh-runtime-1.2.0-SNAPSHOT.jar:1.2.0-SNAPSHOT]
	at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source) ~[?:?]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_261]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_261]
	at com.google.common.eventbus.Subscriber.invokeSubscriberMethod(Subscriber.java:87) ~[guava-29.0-jre.jar:?]
	at com.google.common.eventbus.Subscriber$SynchronizedSubscriber.invokeSubscriberMethod(Subscriber.java:144) ~[guava-29.0-jre.jar:?]
	at com.google.common.eventbus.Subscriber$1.run(Subscriber.java:72) ~[guava-29.0-jre.jar:?]
	at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30) ~[guava-29.0-jre.jar:?]
	at com.google.common.eventbus.Subscriber.dispatchEvent(Subscriber.java:67) ~[guava-29.0-jre.jar:?]
	at com.google.common.eventbus.Dispatcher$PerThreadQueuedDispatcher.dispatch(Dispatcher.java:108) ~[guava-29.0-jre.jar:?]
	at com.google.common.eventbus.EventBus.post(EventBus.java:212) ~[guava-29.0-jre.jar:?]
	at org.apache.eventmesh.runtime.core.protocol.http.consumer.ConsumerManager.notifyConsumerManager(ConsumerManager.java:164) ~[eventmesh-runtime-1.2.0-SNAPSHOT.jar:1.2.0-SNAPSHOT]
	at org.apache.eventmesh.runtime.core.protocol.http.processor.UnSubscribeProcessor.processRequest(UnSubscribeProcessor.java:212) ~[eventmesh-runtime-1.2.0-SNAPSHOT.jar:1.2.0-SNAPSHOT]
	at org.apache.eventmesh.runtime.boot.AbrstractHTTPServer$HTTPHandler.lambda$processEventMeshRequest$1(AbrstractHTTPServer.java:320) ~[eventmesh-runtime-1.2.0-SNAPSHOT.jar:1.2.0-SNAPSHOT]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_261]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_261]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_261]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_261]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_261]
2021-05-11 13:51:04,248 DEBUG [eventMesh-http-asyncContext-9] http(UnSubscribeProcessor.java:123) - httpCommand={RES,POST/HTTP,requestCode=207,opaque=77,cost=381,header=org.apache.eventmesh.common.protocol.http.header.BaseResponseHeader@52dcd19,body=org.apache.eventmesh.common.protocol.http.body.BaseResponseBody@1125964}
2021-05-11 13:51:04,248 DEBUG [eventMesh-clientmanage-4] http(AbrstractHTTPServer.java:328) - httpCommand={RES,POST/HTTP,requestCode=207,opaque=77,cost=381,header=org.apache.eventmesh.common.protocol.http.header.BaseResponseHeader@52dcd19,body=org.apache.eventmesh.common.protocol.http.body.BaseResponseBody@1125964}

Root Cause Analysis:

  1. The LiteConsumer.subscribe client API involves two HTTP POST requests to EventMesh Runtime: Heartbeat and Subscribe.

  2. Heartbeat request is realized by HeartbeatProcessor, which will register the client to eventMeshHttpServer.localClientInfoMapping (https://github.com/apache/incubator-eventmesh/blob/develop/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java#L132-L154)

  3. Subscribe request is realized by SubscribProcessor, which get the client from eventMeshHttpServer.localClientInfoMapping and process the subscription against. (https://github.com/apache/incubator-eventmesh/blob/develop/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java#L109-L117)

  4. The problem is Heartbeat request and Subscribe request are handled by two separate Worker Threads inside EventMesh runtime. And there is a chance that Subscribe request is handled before Heartbeat request, in which the Client is not registered yet and hence NullPointerException is thrown in SubscribeProcessor as in the logs above.

  5. Therefore there is a racing condition between worker threads handling Heartbeat request and Subscriber request. The same racing condition issue also observed in Unsubscribe Client API.

Fix Suggestion

As part of SubscribeProcessor.java logic, it should handle new Client registration if not yet done so (or removed due to heartbeat expired). Subscribe Client API should only invoke Subscribe HTTP POST request, not combining with Heartbeat request.

Same fix suggestion goes to Unsubscribe Client API.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions