Skip to content

[Bug] [connector-rabbitmq] Rabbitmq source connector requires config options for the sink connector. #9616

@jan-gajewski-tuttle

Description

@jan-gajewski-tuttle

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

After configuring the rabbitmq source connector with all the required fields from the docs the connector failed with

Exception in org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask@17a9cf6c
org.apache.seatunnel.connectors.seatunnel.rabbitmq.exception.RabbitmqConnectorException: ErrorCode:[RABBITMQ-02], ErrorDescription:[create rabbitmq client failed] - Error while create RMQ client with seatunnel_queue at rabbitmq
        at org.apache.seatunnel.connectors.seatunnel.rabbitmq.client.RabbitmqClient.<init>(RabbitmqClient.java:69) ~[connector-rabbitmq-2.3.11.jar:2.3.11]
        at org.apache.seatunnel.connectors.seatunnel.rabbitmq.source.RabbitmqSourceReader.<init>(RabbitmqSourceReader.java:82) ~[connector-rabbitmq-2.3.11.jar:2.3.11]
        at org.apache.seatunnel.connectors.seatunnel.rabbitmq.source.RabbitmqSource.createReader(RabbitmqSource.java:81) ~[connector-rabbitmq-2.3.11.jar:2.3.11]
        at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.init(SourceFlowLifeCycle.java:124) ~[seatunnel-starter.jar:2.3.11]
        at org.apache.seatunnel.engine.server.task.SeaTunnelTask.init(SeaTunnelTask.java:131) ~[seatunnel-starter.jar:2.3.11]
        at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.init(SourceSeaTunnelTask.java:72) ~[seatunnel-starter.jar:2.3.11]
        at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:692) [seatunnel-starter.jar:2.3.11]
        at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1023) [seatunnel-starter.jar:2.3.11]
        at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43) [seatunnel-starter.jar:2.3.11]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_342]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_342]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_342]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_342]
        at java.lang.Thread.run(Thread.java:750) [?:1.8.0_342]
Caused by: java.lang.NullPointerException
        at org.apache.seatunnel.connectors.seatunnel.rabbitmq.client.RabbitmqClient.declareQueueDefaults(RabbitmqClient.java:199) ~[connector-rabbitmq-2.3.11.jar:2.3.11]
        at org.apache.seatunnel.connectors.seatunnel.rabbitmq.client.RabbitmqClient.setupQueue(RabbitmqClient.java:192) ~[connector-rabbitmq-2.3.11.jar:2.3.11]
        at org.apache.seatunnel.connectors.seatunnel.rabbitmq.client.RabbitmqClient.<init>(RabbitmqClient.java:65) ~[connector-rabbitmq-2.3.11.jar:2.3.11]
        ... 13 more

After adding

    durable = true
    exclusive = false
    auto_delete = false

to the config, the Job runs as expected.

I think the changes that were intended for the sink in #7365, somehow leaked code into the source connector. Specifically the new declareQueueDefaults.

If this is intended for the source connector, then the source connector docs are missing these options, although I don't think these should be required for the source connector?

SeaTunnel Version

2.3.11

SeaTunnel Config

env {
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 2000
}

source {
  RabbitMQ {
    plugin_output = "rabbitmq_source"
    host = "rabbitmq"
    port = 5672
    virtual_host = "/"
    username = "admin"
    password = "admin"
    queue_name = "seatunnel_queue"
    schema = {
        fields {
            <my-field-name> = "string"
        }
    }
  }
}

sink {
  Console {
  }
}

Running Command

./bin/seatunnel.sh -m local -c config/my.template

Error Exception

Exception in org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask@17a9cf6c
org.apache.seatunnel.connectors.seatunnel.rabbitmq.exception.RabbitmqConnectorException: ErrorCode:[RABBITMQ-02], ErrorDescription:[create rabbitmq client failed] - Error while create RMQ client with seatunnel_queue at rabbitmq
        at org.apache.seatunnel.connectors.seatunnel.rabbitmq.client.RabbitmqClient.<init>(RabbitmqClient.java:69) ~[connector-rabbitmq-2.3.11.jar:2.3.11]
        at org.apache.seatunnel.connectors.seatunnel.rabbitmq.source.RabbitmqSourceReader.<init>(RabbitmqSourceReader.java:82) ~[connector-rabbitmq-2.3.11.jar:2.3.11]
        at org.apache.seatunnel.connectors.seatunnel.rabbitmq.source.RabbitmqSource.createReader(RabbitmqSource.java:81) ~[connector-rabbitmq-2.3.11.jar:2.3.11]
        at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.init(SourceFlowLifeCycle.java:124) ~[seatunnel-starter.jar:2.3.11]
        at org.apache.seatunnel.engine.server.task.SeaTunnelTask.init(SeaTunnelTask.java:131) ~[seatunnel-starter.jar:2.3.11]
        at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.init(SourceSeaTunnelTask.java:72) ~[seatunnel-starter.jar:2.3.11]
        at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:692) [seatunnel-starter.jar:2.3.11]
        at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1023) [seatunnel-starter.jar:2.3.11]
        at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43) [seatunnel-starter.jar:2.3.11]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_342]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_342]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_342]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_342]
        at java.lang.Thread.run(Thread.java:750) [?:1.8.0_342]
Caused by: java.lang.NullPointerException
        at org.apache.seatunnel.connectors.seatunnel.rabbitmq.client.RabbitmqClient.declareQueueDefaults(RabbitmqClient.java:199) ~[connector-rabbitmq-2.3.11.jar:2.3.11]
        at org.apache.seatunnel.connectors.seatunnel.rabbitmq.client.RabbitmqClient.setupQueue(RabbitmqClient.java:192) ~[connector-rabbitmq-2.3.11.jar:2.3.11]
        at org.apache.seatunnel.connectors.seatunnel.rabbitmq.client.RabbitmqClient.<init>(RabbitmqClient.java:65) ~[connector-rabbitmq-2.3.11.jar:2.3.11]
        ... 13 more

Zeta or Flink or Spark Version

default shipped with the 2.3.11 docker image

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions