Skip to content

Message correlation on a non-interrupting message event is not idempotent #26926

@remcowesterhoud

Description

@remcowesterhoud

Describe the bug

When a message is published with a message id we claim that it is only processed once. This is not always true. When we have non-interrupting message event like in the process below there is a chance it gets correlated multiple times by a single message.

Image

To Reproduce

I've reproduced it in a test. For this test I had to "hack" some things in the code as I wasn't sure how else to do it.

First we need to modify the timeout and interval of the MessageObserver:

  public static final Duration SUBSCRIPTION_TIMEOUT = Duration.ofSeconds(1);
  public static final Duration SUBSCRIPTION_CHECK_INTERVAL = Duration.ofSeconds(1);

Next we need to make it possible to create a process instance on a specific partition in our test case. We add a method to the ProcessInstanceClient:

    public long createOnPartition(final int partitionId) {
      final long position =
          writer.writeCommandOnPartition(
              partitionId, ProcessInstanceCreationIntent.CREATE, processInstanceCreationRecord);

      final var resultingRecord = expectation.apply(position);
      return resultingRecord.getValue().getProcessInstanceKey();
    }

Finally we run the test case below:

  @Test
  public void test() throws InterruptedException { // TODO rename
    // given
    final var processId = Strings.newRandomValidBpmnId();
    final var messageName = "event_message";
    final var correlationKey = CORRELATION_KEYS.get(START_PARTITION_ID);
    final var eventSubProcessStartId = "eventSubProcessStart";
    engine
        .deployment()
        .withXmlResource(
            Bpmn.createExecutableProcess(processId)
                .eventSubProcess(
                    "subprocess",
                    s ->
                        s.startEvent(eventSubProcessStartId)
                            .interrupting(false)
                            .message(
                                m ->
                                    m.name(messageName)
                                        .zeebeCorrelationKeyExpression("correlationKey"))
                            .userTask()
                            .endEvent())
                .startEvent()
                .userTask()
                .endEvent()
                .done())
        .deploy();
    final var processInstanceKey =
        engine
            .processInstance()
            .ofBpmnProcessId(processId)
            .withVariable("correlationKey", correlationKey)
            .createOnPartition(2);

    // when
    engine.pauseProcessing(2);
    engine
        .message()
        .withId("messageId")
        .withName(messageName)
        .withCorrelationKey(correlationKey)
        .withTimeToLive(Duration.ofMinutes(30))
        .onPartition(1)
        .publish();

    // This sleep can replaced by waiting for multiple CORRELATE commands on partition 2
    Thread.sleep(3000);
    engine.resumeProcessing(2);

    // then
    RecordingExporter.processMessageSubscriptionRecords(ProcessMessageSubscriptionIntent.CREATED)
        .withMessageName(messageName)
        .withCorrelationKey(correlationKey)
        .withPartitionId(2)
        .await();
    assertThat(
            RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_ACTIVATED)
                .withProcessInstanceKey(processInstanceKey)
                .withElementId(eventSubProcessStartId)
                .limit(2)
                .count())
        .isEqualTo(1);
  }

The test will start a process instance on partition 2. The process will have a subscription for messages due to the non-interrupting message start event in the event subprocess. The correlation key we use ensures the messasge gets published to partition 1. It's important this is a different partition!

We simulate a scenario where partition 2 is very busy and it takes a while to process new commands. We do this by pausing processing on partition 2. This means new commands can be written to the log, but they won't be processed yet.

Whilst partition 2 is working through the backlog of commands on the queue, the PendingMessageSubscriptionChecker of partition 1 will keep sending new CORRELATE command to partition 2. These all end up on the log.

Once partition 2 catches up with the CORRELATE commands (we resume processing) it will process them all. Since the event is non-interrupting the subscription is not cleared after the first correlation. Instead all of correlations send by partition 1 correlate and trigger the message event. As a result a single published message is correlated multiple times to the same event.

Expected behavior

The non-interrupting event should only trigger a single time per message. Subsequent correlations on the log should not succeed.

Log/Stacktrace

Logged records from the test

1 C DPLY         CREATE         - #01-> -1    -1 - 
1 E PROC         CREATED        - #02->#01 P1K01 - process.xml -> "id-6a03..ac4affc" (version:1)
2 C DPLY         CREATE         - #01-> -1 P1K02 - process.xml
3 C DPLY         CREATE         - #01-> -1 P1K02 - process.xml
1 E DPLY         CREATED        - #03->#01 P1K02 - process.xml
1 E DSTR         STARTED        - #04->#01 P1K02 - DEPLOYMENT CREATE on partition 1
1 E DSTR         DISTRIBUTING   - #05->#01 P1K02 - DEPLOYMENT CREATE to partition 2
1 E DSTR         DISTRIBUTING   - #06->#01 P1K02 - DEPLOYMENT CREATE to partition 3
2 E PROC         CREATED        - #02->#01 P1K01 - process.xml -> "id-6a03..ac4affc" (version:1)
1 C DSTR         ACKNOWLEDGE    - #07-> -1 P1K02 - DEPLOYMENT CREATE for partition 2
2 E DPLY         CREATED        - #03->#01 P1K02 - process.xml
1 E DSTR         ACKNOWLEDGED   - #08->#07 P1K02 - DEPLOYMENT CREATE for partition 2
3 E PROC         CREATED        - #02->#01 P1K01 - process.xml -> "id-6a03..ac4affc" (version:1)
1 C DSTR         ACKNOWLEDGE    - #09-> -1 P1K02 - DEPLOYMENT CREATE for partition 3
3 E DPLY         CREATED        - #03->#01 P1K02 - process.xml
1 E DSTR         ACKNOWLEDGED   - #10->#09 P1K02 - DEPLOYMENT CREATE for partition 3
1 E DSTR         FINISHED       - #11->#09 P1K02 - DEPLOYMENT CREATE on partition 1
2 C CREA         CREATE         - #04-> -1    -1 - new <process "id-6a03..ac4affc"> (default start)  with variables: {correlationKey=item-2}
2 E VAR          CREATED        - #05->#04 P2K02 - correlationKey->"item-2" in <process [P2K01]>
1 C MSG_SUB      CREATE         - #12-> -1    -1 - "event_message" correlationKey: item-2 @[P2K01] in <process "id-6a03..ac4affc"[P2K01]> (no vars)
2 C PI           ACTIVATE       - #06->#04 P2K01 - PROCESS "id-6a03..ac4affc" in <process "id-6a03..ac4affc"[P2K01]>
2 E CREA         CREATED        - #07->#04 P2K03 - new <process "id-6a03..ac4affc"> (default start)  with variables: {correlationKey=item-2}
2 E PI           ACTIVATING     - #08->#04 P2K01 - PROCESS "id-6a03..ac4affc" in <process "id-6a03..ac4affc"[P2K01]>
2 E PI           ACTIVATED      - #09->#04 P2K01 - PROCESS "id-6a03..ac4affc" in <process "id-6a03..ac4affc"[P2K01]>
2 C PI           ACTIVATE       - #10->#04    -1 - START_EVENT "startEv..6ec9b27" in <process "id-6a03..ac4affc"[P2K01]>
2 E PI           ACTIVATING     - #11->#04 P2K04 - START_EVENT "startEv..6ec9b27" in <process "id-6a03..ac4affc"[P2K01]>
2 E PI           ACTIVATED      - #12->#04 P2K04 - START_EVENT "startEv..6ec9b27" in <process "id-6a03..ac4affc"[P2K01]>
2 C PI           COMPLETE       - #13->#04 P2K04 - START_EVENT "startEv..6ec9b27" in <process "id-6a03..ac4affc"[P2K01]>
2 E PI           COMPLETING     - #14->#04 P2K04 - START_EVENT "startEv..6ec9b27" in <process "id-6a03..ac4affc"[P2K01]>
2 E PROC_MSG_SUB CREATING       - #15->#04 P2K05 - "event_message" correlationKey: item-2 @[P2K01] in <process "id-6a03..ac4affc"[P2K01]> (no vars) (tenant: <default>)
2 E PI           COMPLETED      - #16->#04 P2K04 - START_EVENT "startEv..6ec9b27" in <process "id-6a03..ac4affc"[P2K01]>
2 E PI           SEQ_FLOW_TAKEN - #17->#04 P2K06 - SEQUENCE_FLOW "sequenc..a549079" in <process "id-6a03..ac4affc"[P2K01]>
2 C PI           ACTIVATE       - #18->#04 P2K07 - USER_TASK "userTas..96cb32f" in <process "id-6a03..ac4affc"[P2K01]>
2 E PI           ACTIVATING     - #19->#04 P2K07 - USER_TASK "userTas..96cb32f" in <process "id-6a03..ac4affc"[P2K01]>
2 E JOB          CREATED        - #20->#04 P2K08 - P2K08 "io.camunda.zeebe:userTask" @"userTas..96cb32f"[P2K07] (BPMN_ELEMENT), 1 retries, in <process "id-6a03..ac4affc"[P2K01]> (no vars)
2 E PI           ACTIVATED      - #21->#04 P2K07 - USER_TASK "userTas..96cb32f" in <process "id-6a03..ac4affc"[P2K01]>
1 E MSG_SUB      CREATED        - #13->#12 P1K03 - "event_message" correlationKey: item-2 @[P2K01] in <process "id-6a03..ac4affc"[P2K01]> (no vars)
2 C PROC_MSG_SUB CREATE         - #22-> -1    -1 - "event_message" @[P2K01] in <process ?[P2K01]> (no vars) (tenant: <default>)
1 C MSG          PUBLISH        - #14-> -1    -1 - "event_message" correlationKey: item-2 (no vars)
1 E MSG          PUBLISHED      - #15->#14 P1K04 - "event_message" correlationKey: item-2 (no vars)
2 C PROC_MSG_SUB CORRELATE      - #23-> -1    -1 - "event_message" (inter.) correlationKey: item-2 @[P2K01] in <process "id-6a03..ac4affc"[P2K01]> (no vars) (tenant: <default>)
1 E MSG_SUB      CORRELATING    - #16->#14 P1K03 - "event_message" correlationKey: item-2 @[P2K01] in <process "id-6a03..ac4affc"[P2K01]> (no vars)
2 C PROC_MSG_SUB CORRELATE      - #24-> -1    -1 - "event_message" (inter.) correlationKey: item-2 @[P2K01] in <process "id-6a03..ac4affc"[P2K01]> (no vars) (tenant: <default>)
2 C PROC_MSG_SUB CORRELATE      - #25-> -1    -1 - "event_message" (inter.) correlationKey: item-2 @[P2K01] in <process "id-6a03..ac4affc"[P2K01]> (no vars) (tenant: <default>)
2 C PROC_MSG_SUB CORRELATE      - #26-> -1    -1 - "event_message" (inter.) correlationKey: item-2 @[P2K01] in <process "id-6a03..ac4affc"[P2K01]> (no vars) (tenant: <default>)
2 E PROC_MSG_SUB CREATED        - #27->#22 P2K05 - "event_message" correlationKey: item-2 @[P2K01] in <process "id-6a03..ac4affc"[P2K01]> (no vars) (tenant: <default>)
1 C MSG_SUB      CORRELATE      - #17-> -1    -1 - "event_message" (inter.) @[P2K01] in <process "id-6a03..ac4affc"[P2K01]> (no vars)
2 E PROC_MSG_SUB CORRELATED     - #28->#23 P2K05 - "event_message" correlationKey: item-2 @[P2K01] in <process "id-6a03..ac4affc"[P2K01]> (no vars) (tenant: <default>)
2 E PROC_EVNT    TRIGGERING     - #29->#23 P2K09 -  @"eventSu..ssStart"[P2K01] in <process P1K01[P2K01]> (no vars)
2 C PI           ACTIVATE       - #30->#23    -1 - EVENT_SUB_PROCESS "subprocess" in <process "id-6a03..ac4affc"[P2K01]>
2 E PI           ACTIVATING     - #31->#23 P2K10 - EVENT_SUB_PROCESS "subprocess" in <process "id-6a03..ac4affc"[P2K01]>
2 E PI           ACTIVATED      - #32->#23 P2K10 - EVENT_SUB_PROCESS "subprocess" in <process "id-6a03..ac4affc"[P2K01]>
2 C PI           ACTIVATE       - #33->#23    -1 - START_EVENT "eventSu..ssStart" in <process "id-6a03..ac4affc"[P2K01]>
2 E PI           ACTIVATING     - #34->#23 P2K11 - START_EVENT "eventSu..ssStart" in <process "id-6a03..ac4affc"[P2K01]>
2 E PI           ACTIVATED      - #35->#23 P2K11 - START_EVENT "eventSu..ssStart" in <process "id-6a03..ac4affc"[P2K01]>
2 C PI           COMPLETE       - #36->#23 P2K11 - START_EVENT "eventSu..ssStart" in <process "id-6a03..ac4affc"[P2K01]>
2 E PI           COMPLETING     - #37->#23 P2K11 - START_EVENT "eventSu..ssStart" in <process "id-6a03..ac4affc"[P2K01]>
2 E PROC_EVNT    TRIGGERED      - #38->#23 P2K09 -  @"eventSu..ssStart"[P2K11] in <process P1K01[P2K01]> (no vars)
2 E PI           COMPLETED      - #39->#23 P2K11 - START_EVENT "eventSu..ssStart" in <process "id-6a03..ac4affc"[P2K01]>
2 E PI           SEQ_FLOW_TAKEN - #40->#23 P2K12 - SEQUENCE_FLOW "sequenc..36f2c26" in <process "id-6a03..ac4affc"[P2K01]>
2 C PI           ACTIVATE       - #41->#23 P2K13 - USER_TASK "userTas..62687fb" in <process "id-6a03..ac4affc"[P2K01]>
2 E PI           ACTIVATING     - #42->#23 P2K13 - USER_TASK "userTas..62687fb" in <process "id-6a03..ac4affc"[P2K01]>
2 E JOB          CREATED        - #43->#23 P2K14 - P2K14 "io.camunda.zeebe:userTask" @"userTas..62687fb"[P2K13] (BPMN_ELEMENT), 1 retries, in <process "id-6a03..ac4affc"[P2K01]> (no vars)
2 E PI           ACTIVATED      - #44->#23 P2K13 - USER_TASK "userTas..62687fb" in <process "id-6a03..ac4affc"[P2K01]>
2 E PROC_MSG_SUB CORRELATED     - #45->#24 P2K05 - "event_message" correlationKey: item-2 @[P2K01] in <process "id-6a03..ac4affc"[P2K01]> (no vars) (tenant: <default>)
2 E PROC_EVNT    TRIGGERING     - #46->#24 P2K15 -  @"eventSu..ssStart"[P2K01] in <process P1K01[P2K01]> (no vars)
2 C PI           ACTIVATE       - #47->#24    -1 - EVENT_SUB_PROCESS "subprocess" in <process "id-6a03..ac4affc"[P2K01]>
2 E PI           ACTIVATING     - #48->#24 P2K16 - EVENT_SUB_PROCESS "subprocess" in <process "id-6a03..ac4affc"[P2K01]>
2 E PI           ACTIVATED      - #49->#24 P2K16 - EVENT_SUB_PROCESS "subprocess" in <process "id-6a03..ac4affc"[P2K01]>
2 C PI           ACTIVATE       - #50->#24    -1 - START_EVENT "eventSu..ssStart" in <process "id-6a03..ac4affc"[P2K01]>
2 E PI           ACTIVATING     - #51->#24 P2K17 - START_EVENT "eventSu..ssStart" in <process "id-6a03..ac4affc"[P2K01]>
1 C MSG_SUB      CORRELATE      - #18-> -1    -1 - "event_message" (inter.) @[P2K01] in <process "id-6a03..ac4affc"[P2K01]> (no vars)
1 E MSG_SUB      CORRELATED     - #19->#17 P1K03 - "event_message" correlationKey: item-2 @[P2K01] in <process "id-6a03..ac4affc"[P2K01]> (no vars)
2 E PI           ACTIVATED      - #52->#24 P2K17 - START_EVENT "eventSu..ssStart" in <process "id-6a03..ac4affc"[P2K01]>
2 C PI           COMPLETE       - #53->#24 P2K17 - START_EVENT "eventSu..ssStart" in <process "id-6a03..ac4affc"[P2K01]>
1 E MSG_SUB      CORRELATED     - #20->#18 P1K03 - "event_message" correlationKey: item-2 @[P2K01] in <process "id-6a03..ac4affc"[P2K01]> (no vars)
2 E PI           COMPLETING     - #54->#24 P2K17 - START_EVENT "eventSu..ssStart" in <process "id-6a03..ac4affc"[P2K01]>
2 E PROC_EVNT    TRIGGERED      - #55->#24 P2K15 -  @"eventSu..ssStart"[P2K17] in <process P1K01[P2K01]> (no vars)
2 E PI           COMPLETED      - #56->#24 P2K17 - START_EVENT "eventSu..ssStart" in <process "id-6a03..ac4affc"[P2K01]>
2 E PI           SEQ_FLOW_TAKEN - #57->#24 P2K18 - SEQUENCE_FLOW "sequenc..36f2c26" in <process "id-6a03..ac4affc"[P2K01]>
2 C PI           ACTIVATE       - #58->#24 P2K19 - USER_TASK "userTas..62687fb" in <process "id-6a03..ac4affc"[P2K01]>
2 E PI           ACTIVATING     - #59->#24 P2K19 - USER_TASK "userTas..62687fb" in <process "id-6a03..ac4affc"[P2K01]>
2 E JOB          CREATED        - #60->#24 P2K20 - P2K20 "io.camunda.zeebe:userTask" @"userTas..62687fb"[P2K19] (BPMN_ELEMENT), 1 retries, in <process "id-6a03..ac4affc"[P2K01]> (no vars)
2 E PI           ACTIVATED      - #61->#24 P2K19 - USER_TASK "userTas..62687fb" in <process "id-6a03..ac4affc"[P2K01]>
1 C MSG_SUB      CORRELATE      - #21-> -1    -1 - "event_message" (inter.) @[P2K01] in <process "id-6a03..ac4affc"[P2K01]> (no vars)
1 E MSG_SUB      CORRELATED     - #22->#21 P1K03 - "event_message" correlationKey: item-2 @[P2K01] in <process "id-6a03..ac4affc"[P2K01]> (no vars)
2 E PROC_MSG_SUB CORRELATED     - #62->#25 P2K05 - "event_message" correlationKey: item-2 @[P2K01] in <process "id-6a03..ac4affc"[P2K01]> (no vars) (tenant: <default>)
2 E PROC_EVNT    TRIGGERING     - #63->#25 P2K21 -  @"eventSu..ssStart"[P2K01] in <process P1K01[P2K01]> (no vars)
2 C PI           ACTIVATE       - #64->#25    -1 - EVENT_SUB_PROCESS "subprocess" in <process "id-6a03..ac4affc"[P2K01]>
2 E PI           ACTIVATING     - #65->#25 P2K22 - EVENT_SUB_PROCESS "subprocess" in <process "id-6a03..ac4affc"[P2K01]>
2 E PI           ACTIVATED      - #66->#25 P2K22 - EVENT_SUB_PROCESS "subprocess" in <process "id-6a03..ac4affc"[P2K01]>
2 C PI           ACTIVATE       - #67->#25    -1 - START_EVENT "eventSu..ssStart" in <process "id-6a03..ac4affc"[P2K01]>
2 E PI           ACTIVATING     - #68->#25 P2K23 - START_EVENT "eventSu..ssStart" in <process "id-6a03..ac4affc"[P2K01]>
2 E PI           ACTIVATED      - #69->#25 P2K23 - START_EVENT "eventSu..ssStart" in <process "id-6a03..ac4affc"[P2K01]>
2 C PI           COMPLETE       - #70->#25 P2K23 - START_EVENT "eventSu..ssStart" in <process "id-6a03..ac4affc"[P2K01]>
2 E PI           COMPLETING     - #71->#25 P2K23 - START_EVENT "eventSu..ssStart" in <process "id-6a03..ac4affc"[P2K01]>
2 E PROC_EVNT    TRIGGERED      - #72->#25 P2K21 -  @"eventSu..ssStart"[P2K23] in <process P1K01[P2K01]> (no vars)
2 E PI           COMPLETED      - #73->#25 P2K23 - START_EVENT "eventSu..ssStart" in <process "id-6a03..ac4affc"[P2K01]>
2 E PI           SEQ_FLOW_TAKEN - #74->#25 P2K24 - SEQUENCE_FLOW "sequenc..36f2c26" in <process "id-6a03..ac4affc"[P2K01]>
2 C PI           ACTIVATE       - #75->#25 P2K25 - USER_TASK "userTas..62687fb" in <process "id-6a03..ac4affc"[P2K01]>
2 E PI           ACTIVATING     - #76->#25 P2K25 - USER_TASK "userTas..62687fb" in <process "id-6a03..ac4affc"[P2K01]>
2 E JOB          CREATED        - #77->#25 P2K26 - P2K26 "io.camunda.zeebe:userTask" @"userTas..62687fb"[P2K25] (BPMN_ELEMENT), 1 retries, in <process "id-6a03..ac4affc"[P2K01]> (no vars)
2 E PI           ACTIVATED      - #78->#25 P2K25 - USER_TASK "userTas..62687fb" in <process "id-6a03..ac4affc"[P2K01]>
2 E PROC_MSG_SUB CORRELATED     - #79->#26 P2K05 - "event_message" correlationKey: item-2 @[P2K01] in <process "id-6a03..ac4affc"[P2K01]> (no vars) (tenant: <default>)
1 C MSG_SUB      CORRELATE      - #23-> -1    -1 - "event_message" (inter.) @[P2K01] in <process "id-6a03..ac4affc"[P2K01]> (no vars)
2 E PROC_EVNT    TRIGGERING     - #80->#26 P2K27 -  @"eventSu..ssStart"[P2K01] in <process P1K01[P2K01]> (no vars)
2 C PI           ACTIVATE       - #81->#26    -1 - EVENT_SUB_PROCESS "subprocess" in <process "id-6a03..ac4affc"[P2K01]>
2 E PI           ACTIVATING     - #82->#26 P2K28 - EVENT_SUB_PROCESS "subprocess" in <process "id-6a03..ac4affc"[P2K01]>
2 E PI           ACTIVATED      - #83->#26 P2K28 - EVENT_SUB_PROCESS "subprocess" in <process "id-6a03..ac4affc"[P2K01]>
2 C PI           ACTIVATE       - #84->#26    -1 - START_EVENT "eventSu..ssStart" in <process "id-6a03..ac4affc"[P2K01]>
2 E PI           ACTIVATING     - #85->#26 P2K29 - START_EVENT "eventSu..ssStart" in <process "id-6a03..ac4affc"[P2K01]>
2 E PI           ACTIVATED      - #86->#26 P2K29 - START_EVENT "eventSu..ssStart" in <process "id-6a03..ac4affc"[P2K01]>
2 C PI           COMPLETE       - #87->#26 P2K29 - START_EVENT "eventSu..ssStart" in <process "id-6a03..ac4affc"[P2K01]>
2 E PI           COMPLETING     - #88->#26 P2K29 - START_EVENT "eventSu..ssStart" in <process "id-6a03..ac4affc"[P2K01]>
2 E PROC_EVNT    TRIGGERED      - #89->#26 P2K27 -  @"eventSu..ssStart"[P2K29] in <process P1K01[P2K01]> (no vars)
2 E PI           COMPLETED      - #90->#26 P2K29 - START_EVENT "eventSu..ssStart" in <process "id-6a03..ac4affc"[P2K01]>
1 E MSG_SUB      CORRELATED     - #24->#23 P1K03 - "event_message" correlationKey: item-2 @[P2K01] in <process "id-6a03..ac4affc"[P2K01]> (no vars)
2 E PI           SEQ_FLOW_TAKEN - #91->#26 P2K30 - SEQUENCE_FLOW "sequenc..36f2c26" in <process "id-6a03..ac4affc"[P2K01]>
2 C PI           ACTIVATE       - #92->#26 P2K31 - USER_TASK "userTas..62687fb" in <process "id-6a03..ac4affc"[P2K01]>
2 E PI           ACTIVATING     - #93->#26 P2K31 - USER_TASK "userTas..62687fb" in <process "id-6a03..ac4affc"[P2K01]>
2 E JOB          CREATED        - #94->#26 P2K32 - P2K32 "io.camunda.zeebe:userTask" @"userTas..62687fb"[P2K31] (BPMN_ELEMENT), 1 retries, in <process "id-6a03..ac4affc"[P2K01]> (no vars)
2 E PI           ACTIVATED      - #95->#26 P2K31 - USER_TASK "userTas..62687fb" in <process "id-6a03..ac4affc"[P2K01]>

In these record we see that:

  • P1 created a message subscription (position 13)
  • P1 receives a published message (position 15)
  • P2 has 4 CORRELATE commands on the log (position 23, 24, 25, 26). This is due to the PendingMessageSubscriptionChecker
  • P2 resumes processing and triggers the message event 4 times (position 38, 55, 72, 89)

This problem arose in a support case: https://jira.camunda.com/browse/SUPPORT-25086. We received some logging that clearly showcases that the extra correlations are triggered by the PendingMessageSubscriptionChecker:

2025-01-09 13:42:29.355 [Broker-2] [zb-fs-workers-1] [Exporter-3] INFO 
      io.camunda.zeebe.broker.exporter.trace - Exporting record with key 6755399441584969 at position 1258771 on partition 3 of type EVENT with value type PROCESS_MESSAGE_SUBSCRIPTION and intent CORRELATED
2025-01-09 13:42:32.139 [Broker-2] [zb-fs-workers-1] [Exporter-3] INFO 
      io.camunda.zeebe.broker.exporter.trace - Exporting record with key 6755399441584969 at position 1259289 on partition 3 of type EVENT with value type PROCESS_MESSAGE_SUBSCRIPTION and intent CORRELATED

This specific log occurred 194 times for the key 6755399441584969, but with different positions. This indicates that the same message is correlated multiple times. Aside from that there's logging like:

2025-01-09 14:51:40.615 [Broker-2] [zb-actors-0] [AsyncProcessingScheduleActor-3] WARN 
      io.camunda.zeebe.broker.logstreams - Expected to find a subscription with key 11258999068657018 and message name ..., but none found. The state is inconsistent.
2025-01-09 14:51:40.615 [Broker-2] [zb-actors-0] [AsyncProcessingScheduleActor-3] WARN 
      io.camunda.zeebe.broker.logstreams - Expected to find a subscription with key 9007199254944655 and message name ..., but none found. The state is inconsistent.
2025-01-09 14:51:40.615 [Broker-2] [zb-actors-0] [AsyncProcessingScheduleActor-3] WARN 
      io.camunda.zeebe.broker.logstreams - Expected to find a subscription with key 2251799813878699 and message name ..., but none found. The state is inconsistent.

The only place where we log this message is here. This method is called in 1 place, by the PendingMessageSubscriptionChecker.

Environment:

  • Zeebe Version: 8.5.4 & 8.5.7, but likely exists on all versions.

Support cases:

Metadata

Metadata

Assignees

Type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions