Skip to content

[Bug] [connector-v2-rabbit] Rabbit Checkpoint exception in Flink mode #7098

@zhangshenghang

Description

@zhangshenghang

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

Flink 1.15.3 runs rabbit e2e test, the running configuration file is rabbitmq-to-rabbitmq.conf
image
When checkpoint is enabled, the snapshotState method of the org.apache.seatunnel.connectors.seatunnel.rabbitmq.source.RabbitmqSplitEnumerator class will directly return null
image
It will cause Flink enumeratorCheckpoint in SourceCorrdinator is Null
image
The program will eventually report a Checkpoint exception
image
The correct approach is to return a RabbitSourceState object instead of null

This problem affects my PR #7040 Build failed

Please assign this issue to me and I will fix it. thx

SeaTunnel Version

2.3.5

SeaTunnel Config

rabbitmq-torabbitmq.conf

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

env {
  parallelism = 1
  job.mode = "STREAMING"
  #checkpoint.interval=5000
  execution.checkpoint.interval=5000
}

source {
  RabbitMQ {
    host = "localhost"
    port = 5672
    virtual_host = "/"
    username = "guest"
    password = "guest"
    queue_name = "test"
    for_e2e_testing = true
    schema = {
      fields {
        id = bigint
        c_map = "map<string, smallint>"
        c_array = "array<tinyint>"
        c_string = string
        c_boolean = boolean
        c_tinyint = tinyint
        c_smallint = smallint
        c_int = int
        c_bigint = bigint
        c_float = float
        c_double = double
        c_decimal = "decimal(2, 1)"
        c_bytes = bytes
        c_date = date
        c_timestamp = timestamp
      }
    }
  }
}

transform {
}

sink {
  RabbitMQ {
    host = "localhost"
    port = 5672
    virtual_host = "/"
    username = "guest"
    password = "guest"
    queue_name = "test1"
  }
}

Running Command

Run Flink Example or automatically run e2e tests

Error Exception

WARN  org.apache.flink.runtime.checkpoint.CheckpointFailureManager - Failed to trigger or complete checkpoint 1 for job 9a061a0f2dd38089b2684598fdd6bf0d. (0 consecutive failed attempts so far)
org.apache.flink.runtime.checkpoint.CheckpointException: Trigger checkpoint failure.
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$getCheckpointException$19(CheckpointCoordinator.java:2162) ~[flink-runtime-1.15.3.jar:1.15.3]
	at java.util.Optional.orElseGet(Optional.java:267) ~[?:1.8.0_392]
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.getCheckpointException(CheckpointCoordinator.java:2161) ~[flink-runtime-1.15.3.jar:1.15.3]
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:930) ~[flink-runtime-1.15.3.jar:1.15.3]
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:908) ~[flink-runtime-1.15.3.jar:1.15.3]
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$startTriggeringCheckpoint$7(CheckpointCoordinator.java:636) ~[flink-runtime-1.15.3.jar:1.15.3]
	at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) [?:1.8.0_392]
	at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811) [?:1.8.0_392]
	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) [?:1.8.0_392]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_392]
	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [?:1.8.0_392]
	at java.util.concurrent.FutureTask.run(FutureTask.java) [?:1.8.0_392]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_392]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_392]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_392]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_392]
	at java.lang.Thread.run(Thread.java:750) [?:1.8.0_392]
Caused by: java.lang.NullPointerException
	at org.apache.flink.runtime.source.coordinator.SourceCoordinator.writeCheckpointBytes(SourceCoordinator.java:463) ~[flink-runtime-1.15.3.jar:1.15.3]
	at org.apache.flink.runtime.source.coordinator.SourceCoordinator.toBytes(SourceCoordinator.java:447) ~[flink-runtime-1.15.3.jar:1.15.3]
	at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$checkpointCoordinator$6(SourceCoordinator.java:321) ~[flink-runtime-1.15.3.jar:1.15.3]
	at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$9(SourceCoordinator.java:406) ~[flink-runtime-1.15.3.jar:1.15.3]
	at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40) ~[flink-core-1.15.3.jar:1.15.3]
	... 8 more

Zeta or Flink or Spark Version

Flink 1.15.3

Java or Scala Version

1.8

Screenshots

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions