-
Notifications
You must be signed in to change notification settings - Fork 2.2k
Closed
Labels
Description
Search before asking
- I had searched in the issues and found no similar issues.
What happened
Flink 1.15.3 runs rabbit e2e test, the running configuration file is rabbitmq-to-rabbitmq.conf

When checkpoint is enabled, the snapshotState method of the org.apache.seatunnel.connectors.seatunnel.rabbitmq.source.RabbitmqSplitEnumerator class will directly return null

It will cause Flink enumeratorCheckpoint in SourceCorrdinator is Null

The program will eventually report a Checkpoint exception

The correct approach is to return a RabbitSourceState object instead of null
This problem affects my PR #7040 Build failed
Please assign this issue to me and I will fix it. thx
SeaTunnel Version
2.3.5
SeaTunnel Config
rabbitmq-torabbitmq.conf
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
env {
parallelism = 1
job.mode = "STREAMING"
#checkpoint.interval=5000
execution.checkpoint.interval=5000
}
source {
RabbitMQ {
host = "localhost"
port = 5672
virtual_host = "/"
username = "guest"
password = "guest"
queue_name = "test"
for_e2e_testing = true
schema = {
fields {
id = bigint
c_map = "map<string, smallint>"
c_array = "array<tinyint>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(2, 1)"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
}
}
}
}
transform {
}
sink {
RabbitMQ {
host = "localhost"
port = 5672
virtual_host = "/"
username = "guest"
password = "guest"
queue_name = "test1"
}
}
Running Command
Run Flink Example or automatically run e2e testsError Exception
WARN org.apache.flink.runtime.checkpoint.CheckpointFailureManager - Failed to trigger or complete checkpoint 1 for job 9a061a0f2dd38089b2684598fdd6bf0d. (0 consecutive failed attempts so far)
org.apache.flink.runtime.checkpoint.CheckpointException: Trigger checkpoint failure.
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$getCheckpointException$19(CheckpointCoordinator.java:2162) ~[flink-runtime-1.15.3.jar:1.15.3]
at java.util.Optional.orElseGet(Optional.java:267) ~[?:1.8.0_392]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.getCheckpointException(CheckpointCoordinator.java:2161) ~[flink-runtime-1.15.3.jar:1.15.3]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:930) ~[flink-runtime-1.15.3.jar:1.15.3]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:908) ~[flink-runtime-1.15.3.jar:1.15.3]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$startTriggeringCheckpoint$7(CheckpointCoordinator.java:636) ~[flink-runtime-1.15.3.jar:1.15.3]
at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) [?:1.8.0_392]
at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811) [?:1.8.0_392]
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) [?:1.8.0_392]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_392]
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [?:1.8.0_392]
at java.util.concurrent.FutureTask.run(FutureTask.java) [?:1.8.0_392]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_392]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_392]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_392]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_392]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_392]
Caused by: java.lang.NullPointerException
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.writeCheckpointBytes(SourceCoordinator.java:463) ~[flink-runtime-1.15.3.jar:1.15.3]
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.toBytes(SourceCoordinator.java:447) ~[flink-runtime-1.15.3.jar:1.15.3]
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$checkpointCoordinator$6(SourceCoordinator.java:321) ~[flink-runtime-1.15.3.jar:1.15.3]
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$9(SourceCoordinator.java:406) ~[flink-runtime-1.15.3.jar:1.15.3]
at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40) ~[flink-core-1.15.3.jar:1.15.3]
... 8 more
Zeta or Flink or Spark Version
Flink 1.15.3
Java or Scala Version
1.8
Screenshots
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct