-
Notifications
You must be signed in to change notification settings - Fork 641
Description
EventMesh SDK's Http LiteConsumer is using HttpClient to make a RPC to EventMesh Runtime. But the HttpClient is static and shared by all LiteConsumer instances (as Class variable). There are two problems for using static variable here:
-
when multiple LiteConsumer instances are sharing the same HttClient, when one LiteConsumer shutdown(), the other LiteConsumer cannot make any RPC calls. That's because shutdown() will invoke httpClient.close();
-
I also observed racing condition of using this HttpClient. That's because the HttpClient is not close() and clean up its resource until LiteConsumer.shutdown(). This HttpClient is managed by PooledHttpClientManager, and it is meant to be used by multi-thread process, and needs to close() after finishing using it.
To reproduce the racing condition issue. I slightly modify the eventmesh-test project's org.apache.eventmesh.http.demo.sub.service.SubService class, with the followings
liteConsumer = new LiteConsumer(eventMeshClientConfig);
liteConsumer.start();
liteConsumer.subscribe(topicList, url);
Thread.sleep(30000);
liteConsumer.unsubscribe(topicList, url);
Thread.sleep(30000);
liteConsumer.subscribe(topicList, url);
Thread.sleep(30000);
liteConsumer.unsubscribe(topicList, url);
The relevant log is attach in a file.
When examine the response time coming from the four RPC calls above. we can see the racing condition is happening.
- liteConsumer.subscribe(topicList, url); The response time is matching with the Log time
2021-05-26 12:32:24,995 DEBUG [main] LiteConsumer(LiteConsumer.java:136) - subscribe message by await, targetEventMesh:http://127.0.0.1:10105, cost:2131ms, subscribeParam:{"body":{"topic":"["FT0-e-80010001-01-1"]","url":"http://192.168.0.12:8088/sub/test"},"headers":{"Language":"JAVA","Username":"userName","Version":"1.0","Ip":"192.168.0.12","Idc":"FT","Pid":"0","Env":"P","Sys":"1234","Code":"206","Dcn":"FT0","Passwd":"password"},"httpMethod":{},"logger":{"debugEnabled":true,"errorEnabled":true,"infoEnabled":true,"name":"org.apache.eventmesh.client.http.http.RequestParam","traceEnabled":false,"warnEnabled":true},"queryParams":"","timeout":3000}, rtn:{"resTime":1622046744916,"retCode":0,"retMsg":"success"}
1622046744916 = 2021-05-26 12:32:24.916
- liteConsumer.unsubscribe(topicList, url); The response time is NOT matching with the Log time, it is from the last RPC.
2021-05-26 12:32:55,042 DEBUG [main] LiteConsumer(LiteConsumer.java:251) - unSubscribe message by await, targetEventMesh:http://127.0.0.1:10105, cost:0ms, unSubscribeParam:{"body":{"topic":"["FT0-e-80010001-01-1"]","url":"http://192.168.0.12:8088/sub/test"},"headers":{"Language":"JAVA","Username":"userName","Version":"1.0","Ip":"192.168.0.12","Idc":"FT","Pid":"0","Env":"P","Sys":"1234","Code":"207","Dcn":"FT0","Passwd":"password"},"httpMethod":{},"logger":{"debugEnabled":true,"errorEnabled":true,"infoEnabled":true,"name":"org.apache.eventmesh.client.http.http.RequestParam","traceEnabled":false,"warnEnabled":true},"queryParams":"","timeout":3000}, rtn:{"resTime":1622046744916,"retCode":0,"retMsg":"success"}
1622046744916 = 2021-05-26 12:32:24.916
- liteConsumer.subscribe(topicList, url); The response time is NOT matching with the Log time, it is from the last RPC.
2021-05-26 12:33:25,057 DEBUG [main] LiteConsumer(LiteConsumer.java:136) - subscribe message by await, targetEventMesh:http://127.0.0.1:10105, cost:0ms, subscribeParam:{"body":{"topic":"["FT0-e-80010001-01-1"]","url":"http://192.168.0.12:8088/sub/test"},"headers":{"Language":"JAVA","Username":"userName","Version":"1.0","Ip":"192.168.0.12","Idc":"FT","Pid":"0","Env":"P","Sys":"1234","Code":"206","Dcn":"FT0","Passwd":"password"},"httpMethod":{},"logger":{"debugEnabled":true,"errorEnabled":true,"infoEnabled":true,"name":"org.apache.eventmesh.client.http.http.RequestParam","traceEnabled":false,"warnEnabled":true},"queryParams":"","timeout":3000}, rtn:{"resTime":1622046775571,"retCode":0,"retMsg":"success"}
1622046775571 = 2021-05-26 12:32:55.571
- liteConsumer.unsubscribe(topicList, url); The response time is NOT matching with the Log time, it is from the last RPC.
2021-05-26 12:33:55,088 DEBUG [main] LiteConsumer(LiteConsumer.java:251) - unSubscribe message by await, targetEventMesh:http://127.0.0.1:10105, cost:15ms, unSubscribeParam:{"body":{"topic":"["FT0-e-80010001-01-1"]","url":"http://192.168.0.12:8088/sub/test"},"headers":{"Language":"JAVA","Username":"userName","Version":"1.0","Ip":"192.168.0.12","Idc":"FT","Pid":"0","Env":"P","Sys":"1234","Code":"207","Dcn":"FT0","Passwd":"password"},"httpMethod":{},"logger":{"debugEnabled":true,"errorEnabled":true,"infoEnabled":true,"name":"org.apache.eventmesh.client.http.http.RequestParam","traceEnabled":false,"warnEnabled":true},"queryParams":"","timeout":3000}, rtn:{"resTime":1622046775571,"retCode":0,"retMsg":"success"}
1622046775571 = 2021-05-26 12:32:55.571