-
Notifications
You must be signed in to change notification settings - Fork 2.2k
Closed
Labels
Description
Search before asking
- I had searched in the issues and found no similar issues.
What happened
After configuring the rabbitmq source connector with all the required fields from the docs the connector failed with
Exception in org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask@17a9cf6c
org.apache.seatunnel.connectors.seatunnel.rabbitmq.exception.RabbitmqConnectorException: ErrorCode:[RABBITMQ-02], ErrorDescription:[create rabbitmq client failed] - Error while create RMQ client with seatunnel_queue at rabbitmq
at org.apache.seatunnel.connectors.seatunnel.rabbitmq.client.RabbitmqClient.<init>(RabbitmqClient.java:69) ~[connector-rabbitmq-2.3.11.jar:2.3.11]
at org.apache.seatunnel.connectors.seatunnel.rabbitmq.source.RabbitmqSourceReader.<init>(RabbitmqSourceReader.java:82) ~[connector-rabbitmq-2.3.11.jar:2.3.11]
at org.apache.seatunnel.connectors.seatunnel.rabbitmq.source.RabbitmqSource.createReader(RabbitmqSource.java:81) ~[connector-rabbitmq-2.3.11.jar:2.3.11]
at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.init(SourceFlowLifeCycle.java:124) ~[seatunnel-starter.jar:2.3.11]
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.init(SeaTunnelTask.java:131) ~[seatunnel-starter.jar:2.3.11]
at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.init(SourceSeaTunnelTask.java:72) ~[seatunnel-starter.jar:2.3.11]
at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:692) [seatunnel-starter.jar:2.3.11]
at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1023) [seatunnel-starter.jar:2.3.11]
at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43) [seatunnel-starter.jar:2.3.11]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_342]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_342]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_342]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_342]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_342]
Caused by: java.lang.NullPointerException
at org.apache.seatunnel.connectors.seatunnel.rabbitmq.client.RabbitmqClient.declareQueueDefaults(RabbitmqClient.java:199) ~[connector-rabbitmq-2.3.11.jar:2.3.11]
at org.apache.seatunnel.connectors.seatunnel.rabbitmq.client.RabbitmqClient.setupQueue(RabbitmqClient.java:192) ~[connector-rabbitmq-2.3.11.jar:2.3.11]
at org.apache.seatunnel.connectors.seatunnel.rabbitmq.client.RabbitmqClient.<init>(RabbitmqClient.java:65) ~[connector-rabbitmq-2.3.11.jar:2.3.11]
... 13 more
After adding
durable = true
exclusive = false
auto_delete = false
to the config, the Job runs as expected.
I think the changes that were intended for the sink in #7365, somehow leaked code into the source connector. Specifically the new declareQueueDefaults.
If this is intended for the source connector, then the source connector docs are missing these options, although I don't think these should be required for the source connector?
SeaTunnel Version
2.3.11
SeaTunnel Config
env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 2000
}
source {
RabbitMQ {
plugin_output = "rabbitmq_source"
host = "rabbitmq"
port = 5672
virtual_host = "/"
username = "admin"
password = "admin"
queue_name = "seatunnel_queue"
schema = {
fields {
<my-field-name> = "string"
}
}
}
}
sink {
Console {
}
}
Running Command
./bin/seatunnel.sh -m local -c config/my.templateError Exception
Exception in org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask@17a9cf6c
org.apache.seatunnel.connectors.seatunnel.rabbitmq.exception.RabbitmqConnectorException: ErrorCode:[RABBITMQ-02], ErrorDescription:[create rabbitmq client failed] - Error while create RMQ client with seatunnel_queue at rabbitmq
at org.apache.seatunnel.connectors.seatunnel.rabbitmq.client.RabbitmqClient.<init>(RabbitmqClient.java:69) ~[connector-rabbitmq-2.3.11.jar:2.3.11]
at org.apache.seatunnel.connectors.seatunnel.rabbitmq.source.RabbitmqSourceReader.<init>(RabbitmqSourceReader.java:82) ~[connector-rabbitmq-2.3.11.jar:2.3.11]
at org.apache.seatunnel.connectors.seatunnel.rabbitmq.source.RabbitmqSource.createReader(RabbitmqSource.java:81) ~[connector-rabbitmq-2.3.11.jar:2.3.11]
at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.init(SourceFlowLifeCycle.java:124) ~[seatunnel-starter.jar:2.3.11]
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.init(SeaTunnelTask.java:131) ~[seatunnel-starter.jar:2.3.11]
at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.init(SourceSeaTunnelTask.java:72) ~[seatunnel-starter.jar:2.3.11]
at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:692) [seatunnel-starter.jar:2.3.11]
at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1023) [seatunnel-starter.jar:2.3.11]
at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43) [seatunnel-starter.jar:2.3.11]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_342]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_342]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_342]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_342]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_342]
Caused by: java.lang.NullPointerException
at org.apache.seatunnel.connectors.seatunnel.rabbitmq.client.RabbitmqClient.declareQueueDefaults(RabbitmqClient.java:199) ~[connector-rabbitmq-2.3.11.jar:2.3.11]
at org.apache.seatunnel.connectors.seatunnel.rabbitmq.client.RabbitmqClient.setupQueue(RabbitmqClient.java:192) ~[connector-rabbitmq-2.3.11.jar:2.3.11]
at org.apache.seatunnel.connectors.seatunnel.rabbitmq.client.RabbitmqClient.<init>(RabbitmqClient.java:65) ~[connector-rabbitmq-2.3.11.jar:2.3.11]
... 13 more
Zeta or Flink or Spark Version
default shipped with the 2.3.11 docker image
Java or Scala Version
No response
Screenshots
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct