Skip to content

[Bug] [seatunnel-connectors-v2] [connector-kafka] In the KafkaSource code, the elementsQueue is not assigned a size, resulting in an OutOfMemory (OOM) exception. #8956

@mrzhugit

Description

@mrzhugit

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

Deploy a SeaTunnel Engine separated mode cluster with one master and one worker. Configure it with 8 cores and 12GB of memory. Submit a stream job where data flows from Kafka to HDFS. There are 10 million pieces of data in the Kafka topic.

env {
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 10000
  read_limit.rows_per_second=1
}

After the task is started, the worker's memory usage will continue to rise. It will increase from 200M to 5G within 5 minutes. If the job is stopped through the api, the memory usage will not be released. If the task is restored to the RUNNING state again, the memory will continue to rise until an OutOfMemory (OOM) occurs. The worker node is restarted. read_limit.rows_per_second=1 does not actually limit the data reading from kafka. Looking at the createReader method of the KafkaSource class of org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSource, elementsQueue = new LinkedBlockingQueue<>(); the size is not specified. It is guessed that this is the cause of the memory overflow. Data reading is not truly associated with the read_limit.rows_per_second limit.


部署 SeaTunnel Engine 分离模式集群, 一个master,一个worker. 配置为8核,12G内存. 提交一个stream job, 数据从kafka 到 hdfs. kafka 的topic中有1000w条数据.

env {
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 10000
  read_limit.rows_per_second=1
}

任务启动后worker内存使用会内持续上升, 5分钟内从200M到5G. 如果通过api stop job, 内存使用不会释放.
如果再次恢复任务到RUNNING状态,内存会持续上升,直到OOM. worker节点重启.
read_limit.rows_per_second=1 实际并未限制住从kafka的数据读取.
翻看org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSource 类的 createReader方法中 elementsQueue = new LinkedBlockingQueue<>(); 未指定大小,猜测是此处导致的内存溢出.
数据读取未真正与 read_limit.rows_per_second 限制关联.

SeaTunnel Version

2.3.9

SeaTunnel Config

null

Running Command

null

Error Exception

OOM

Zeta or Flink or Spark Version

Zeta

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    Status

    Done

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions