Skip to content

[Bug] Unable to Consume Messages using AsyncSubscribe in RabbitMQ Storage Plugin #4394

@Pil0tXia

Description

@Pil0tXia

Search before asking

  • I had searched in the issues and found no similar issues.

Environment

Windows

EventMesh version

master

What happened

When using RabbitMQ as the storage plugin, the org.apache.eventmesh.tcp.demo.pub.cloudevents.AsyncPublish can place messages into the queue. However, the org.apache.eventmesh.tcp.demo.sub.cloudevents.AsyncSubscribe is unable to consume the messages.

The RabbitMQ storage plugin was not authored by xwm1992.

How to reproduce

image
image
image

Debug logs

org.apache.eventmesh.tcp.demo.pub.cloudevents.AsyncPublish:

2023-08-23 19:00:42,022 INFO  [main] CloudEventTCPSubClient(CloudEventTCPSubClient.java:73) - SimpleSubClientImpl|997|started!
2023-08-23 19:00:42,028 INFO  [main] AsyncPublish(AsyncPublish.java:58) - begin send async msg[0]: CloudEvent{id='ebe0443b-d6cd-49aa-8b14-46932343ed92', source=/, type='cloudevents', datacontenttype='application/cloudevents+json', subject='TEST-TOPIC-TCP-ASYNC', data=BytesCloudEventData{value=[123, 34, 99, 111, 110, 116, 101, 110, 116, 34, 58, 34, 116, 101, 115, 116, 65, 115, 121, 110, 99, 77, 101, 115, 115, 97, 103, 101, 34, 125]}, extensions={ttl=30000}}
2023-08-23 19:00:42,050 INFO  [main] CloudEventTCPPubClient(CloudEventTCPPubClient.java:108) - SimplePubClientImpl cloud event|997|publish|send|type=ASYNC_MESSAGE_TO_SERVER|protocol=cloudevents|msg=org.apache.eventmesh.common.protocol.tcp.Package@25f4878b
2023-08-23 19:00:42,050 INFO  [main] RequestContext(RequestContext.java:76) - _RequestContext|create|key=7750722003
2023-08-23 19:00:42,055 DEBUG [nioEventLoopGroup-2-1] Codec(Codec.java:62) - Encoder pkg={"header":{"cmd":"ASYNC_MESSAGE_TO_SERVER","code":0,"seq":"7750722003","properties":{"protocolversion":"1.0","protocoldesc":"tcp","protocoltype":"cloudevents"},"command":"ASYNC_MESSAGE_TO_SERVER"},"body":"eyJzcGVjdmVyc2lvbiI6IjEuMCIsImlkIjoiZWJlMDQ0M2ItZDZjZC00OWFhLThiMTQtNDY5MzIzNDNlZDkyIiwic291cmNlIjoiLyIsInR5cGUiOiJjbG91ZGV2ZW50cyIsImRhdGFjb250ZW50dHlwZSI6ImFwcGxpY2F0aW9uL2Nsb3VkZXZlbnRzK2pzb24iLCJzdWJqZWN0IjoiVEVTVC1UT1BJQy1UQ1AtQVNZTkMiLCJ0dGwiOiIzMDAwMCIsImRhdGEiOnsiY29udGVudCI6InRlc3RBc3luY01lc3NhZ2UifX0="}
2023-08-23 19:00:42,172 DEBUG [nioEventLoopGroup-2-1] Codec(Codec.java:150) - Decode headerJson={"cmd":"ASYNC_MESSAGE_TO_SERVER_ACK","code":0,"desc":"success","seq":"7750722003","properties":{},"command":"ASYNC_MESSAGE_TO_SERVER_ACK"}
2023-08-23 19:00:42,172 DEBUG [nioEventLoopGroup-2-1] Codec(Codec.java:162) - Decode bodyJson={"data":{"node":{"content":"testAsyncMessage"}},"id":"ebe0443b-d6cd-49aa-8b14-46932343ed92","source":"/","type":"cloudevents","subject":"TEST-TOPIC-TCP-ASYNC","dataContentType":"application/cloudevents+json","specVersion":"V1","extensionNames":["protocolversion","reqeventmesh2mqtimestamp","reqsendeventmeship","protocoldesc","protocoltype","ttl","reqc2eventmeshtimestamp"],"attributeNames":["datacontenttype","subject","specversion","id","source","type"]}
2023-08-23 19:00:42,173 INFO  [nioEventLoopGroup-2-1] AbstractEventMeshTCPPubHandler(AbstractEventMeshTCPPubHandler.java:45) - SimplePubClientImpl|receive|msg=org.apache.eventmesh.common.protocol.tcp.Package@4ae1b36b
2023-08-23 19:00:43,187 INFO  [main] AsyncPublish(AsyncPublish.java:58) - begin send async msg[1]: CloudEvent{id='ea09364e-e3ac-49b6-bcc4-31f61e432156', source=/, type='cloudevents', datacontenttype='application/cloudevents+json', subject='TEST-TOPIC-TCP-ASYNC', data=BytesCloudEventData{value=[123, 34, 99, 111, 110, 116, 101, 110, 116, 34, 58, 34, 116, 101, 115, 116, 65, 115, 121, 110, 99, 77, 101, 115, 115, 97, 103, 101, 34, 125]}, extensions={ttl=30000}}
2023-08-23 19:00:43,187 INFO  [main] CloudEventTCPPubClient(CloudEventTCPPubClient.java:108) - SimplePubClientImpl cloud event|997|publish|send|type=ASYNC_MESSAGE_TO_SERVER|protocol=cloudevents|msg=org.apache.eventmesh.common.protocol.tcp.Package@4e423aa2
2023-08-23 19:00:43,187 INFO  [main] RequestContext(RequestContext.java:76) - _RequestContext|create|key=1345627544
2023-08-23 19:00:43,187 DEBUG [nioEventLoopGroup-2-1] Codec(Codec.java:62) - Encoder pkg={"header":{"cmd":"ASYNC_MESSAGE_TO_SERVER","code":0,"seq":"1345627544","properties":{"protocolversion":"1.0","protocoldesc":"tcp","protocoltype":"cloudevents"},"command":"ASYNC_MESSAGE_TO_SERVER"},"body":"eyJzcGVjdmVyc2lvbiI6IjEuMCIsImlkIjoiZWEwOTM2NGUtZTNhYy00OWI2LWJjYzQtMzFmNjFlNDMyMTU2Iiwic291cmNlIjoiLyIsInR5cGUiOiJjbG91ZGV2ZW50cyIsImRhdGFjb250ZW50dHlwZSI6ImFwcGxpY2F0aW9uL2Nsb3VkZXZlbnRzK2pzb24iLCJzdWJqZWN0IjoiVEVTVC1UT1BJQy1UQ1AtQVNZTkMiLCJ0dGwiOiIzMDAwMCIsImRhdGEiOnsiY29udGVudCI6InRlc3RBc3luY01lc3NhZ2UifX0="}
2023-08-23 19:00:43,192 DEBUG [nioEventLoopGroup-2-1] Codec(Codec.java:150) - Decode headerJson={"cmd":"ASYNC_MESSAGE_TO_SERVER_ACK","code":0,"desc":"success","seq":"1345627544","properties":{},"command":"ASYNC_MESSAGE_TO_SERVER_ACK"}
2023-08-23 19:00:43,192 DEBUG [nioEventLoopGroup-2-1] Codec(Codec.java:162) - Decode bodyJson={"data":{"node":{"content":"testAsyncMessage"}},"id":"ea09364e-e3ac-49b6-bcc4-31f61e432156","source":"/","type":"cloudevents","subject":"TEST-TOPIC-TCP-ASYNC","dataContentType":"application/cloudevents+json","specVersion":"V1","extensionNames":["protocolversion","reqeventmesh2mqtimestamp","reqsendeventmeship","protocoldesc","protocoltype","ttl","reqc2eventmeshtimestamp"],"attributeNames":["datacontenttype","subject","specversion","id","source","type"]}
2023-08-23 19:00:43,192 INFO  [nioEventLoopGroup-2-1] AbstractEventMeshTCPPubHandler(AbstractEventMeshTCPPubHandler.java:45) - SimplePubClientImpl|receive|msg=org.apache.eventmesh.common.protocol.tcp.Package@1b2e16dc

org.apache.eventmesh.tcp.demo.sub.cloudevents.AsyncSubscribe:

2023-08-23 19:00:43,479 DEBUG [nioEventLoopGroup-3-1] Codec(Codec.java:62) - Encoder pkg={"header":{"cmd":"HELLO_REQUEST","code":0,"seq":"3280786264","properties":{},"command":"HELLO_REQUEST"},"body":{"env":"test","subsystem":"5017","path":"/data/app/umg_proxy","pid":42893,"host":"localhost","port":9362,"version":"2.0.11","username":"PU4283","password":"21524617","idc":"FT","group":"EventmeshTestGroup","purpose":"sub","unack":0}}
2023-08-23 19:00:43,484 DEBUG [nioEventLoopGroup-3-1] Codec(Codec.java:150) - Decode headerJson={"cmd":"HELLO_RESPONSE","code":0,"desc":"success","seq":"3280786264","properties":{},"command":"HELLO_RESPONSE"}
2023-08-23 19:00:43,484 INFO  [nioEventLoopGroup-3-1] AbstractEventMeshTCPSubHandler(AbstractEventMeshTCPSubHandler.java:48) - |receive|type=HELLO_RESPONSE|msg=org.apache.eventmesh.common.protocol.tcp.Package@3e758221
2023-08-23 19:00:43,485 ERROR [nioEventLoopGroup-3-1] AbstractEventMeshTCPSubHandler(AbstractEventMeshTCPSubHandler.java:66) - msg ignored|HELLO_RESPONSE|org.apache.eventmesh.common.protocol.tcp.Package@3e758221
2023-08-23 19:00:43,485 INFO  [main] CloudEventTCPSubClient(CloudEventTCPSubClient.java:73) - SimpleSubClientImpl|745|started!
2023-08-23 19:00:43,489 INFO  [main] RequestContext(RequestContext.java:76) - _RequestContext|create|key=1162117433
2023-08-23 19:00:43,497 DEBUG [nioEventLoopGroup-3-1] Codec(Codec.java:62) - Encoder pkg={"header":{"cmd":"SUBSCRIBE_REQUEST","code":0,"seq":"1162117433","properties":{},"command":"SUBSCRIBE_REQUEST"},"body":{"topicList":[{"topic":"TEST-TOPIC-TCP-ASYNC","mode":"CLUSTERING","type":"ASYNC"}]}}
2023-08-23 19:00:43,538 DEBUG [nioEventLoopGroup-3-1] Codec(Codec.java:150) - Decode headerJson={"cmd":"SUBSCRIBE_RESPONSE","code":0,"desc":"success","seq":"1162117433","properties":{},"command":"SUBSCRIBE_RESPONSE"}
2023-08-23 19:00:43,539 INFO  [nioEventLoopGroup-3-1] AbstractEventMeshTCPSubHandler(AbstractEventMeshTCPSubHandler.java:48) - |receive|type=SUBSCRIBE_RESPONSE|msg=org.apache.eventmesh.common.protocol.tcp.Package@6b701d42
2023-08-23 19:00:43,539 ERROR [nioEventLoopGroup-3-1] AbstractEventMeshTCPSubHandler(AbstractEventMeshTCPSubHandler.java:66) - msg ignored|SUBSCRIBE_RESPONSE|org.apache.eventmesh.common.protocol.tcp.Package@6b701d42
2023-08-23 19:00:43,539 INFO  [main] RequestContext(RequestContext.java:76) - _RequestContext|create|key=8573122138
2023-08-23 19:00:43,540 DEBUG [nioEventLoopGroup-3-1] Codec(Codec.java:62) - Encoder pkg={"header":{"cmd":"LISTEN_REQUEST","code":0,"seq":"8573122138","properties":{},"command":"LISTEN_REQUEST"}}
2023-08-23 19:00:43,631 DEBUG [nioEventLoopGroup-3-1] Codec(Codec.java:150) - Decode headerJson={"cmd":"LISTEN_RESPONSE","code":0,"desc":"success","seq":"8573122138","properties":{},"command":"LISTEN_RESPONSE"}
2023-08-23 19:00:43,631 INFO  [nioEventLoopGroup-3-1] AbstractEventMeshTCPSubHandler(AbstractEventMeshTCPSubHandler.java:48) - |receive|type=LISTEN_RESPONSE|msg=org.apache.eventmesh.common.protocol.tcp.Package@13383e23
2023-08-23 19:00:43,631 ERROR [nioEventLoopGroup-3-1] AbstractEventMeshTCPSubHandler(AbstractEventMeshTCPSubHandler.java:66) - msg ignored|LISTEN_RESPONSE|org.apache.eventmesh.common.protocol.tcp.Package@13383e23
2023-08-23 19:01:13,482 INFO  [TCPClientScheduler-1] RequestContext(RequestContext.java:76) - _RequestContext|create|key=8077210517

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workinghelp wantedExtra attention is needed

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions