-
Notifications
You must be signed in to change notification settings - Fork 2.2k
Closed
Closed
Copy link
Labels
Description
Search before asking
- I had searched in the issues and found no similar issues.
What happened
Described in the documentation:

I submit the Flink task to Yarn for execution through the following command
bin/start-seatunnel-flink-13-connector-v2.sh --config ./config/test.config --master yarn-per-job
test.config
env {
parallelism = 2
job.mode = "STREAMING"
# flink.execution.checkpointing.interval=5000
}
source {
Kafka {
schema = {
fields {
comment_num = string
insert_time = string
user_info = {
username = string,
age = string
}
}
}
topic = "test-topic"
consumer.group = "test-group"
bootstrap.servers = "xxxx"
kafka.config = {
client.id = client_1
max.poll.records = 500
auto.offset.reset = "earliest"
enable.auto.commit = "false"
}
result_table_name = "kafka_table"
}
}
sink {
Elasticsearch {
source_table_name = "kafka_table"
hosts = ["xxxx"]
index = "test-588"
}
}
I did not set the flink.execution.checkpointing.interval parameter in the configuration file,
The checkpoint.intercal parameter is not set in the Flink configuration file.
At this time, writing to ElasticSearch will lose data.
Because ElasticSearch Sink uses the parameter maxBatchSize to submit in batches, it will process uncommitted data through prepareCommit
@Override
public Optional<ElasticsearchCommitInfo> prepareCommit() {
bulkEsWithRetry(this.esRestClient, this.requestEsList);
return Optional.empty();
}
@Override
public void write(SeaTunnelRow element) {
if (RowKind.UPDATE_BEFORE.equals(element.getRowKind())) {
return;
}
String indexRequestRow = seaTunnelRowSerializer.serializeRow(element);
requestEsList.add(indexRequestRow);
if (requestEsList.size() >= maxBatchSize) {
bulkEsWithRetry(this.esRestClient, this.requestEsList);
}
}
This may be because the default value of checkpoint.interval is not set in Flink STREAMING mode in the code.

If this is a problem, please assign me.
SeaTunnel Version
2.3.5
SeaTunnel Config
seatunnel.yaml
seatunnel:
engine:
history-job-expire-minutes: 1440
backup-count: 1
queue-type: blockingqueue
print-execution-info-interval: 60
print-job-metrics-info-interval: 60
slot-service:
dynamic-slot: true
checkpoint:
interval: 10000
timeout: 60000
storage:
type: hdfs
max-retained: 3
plugin-config:
namespace: /tmp/seatunnel/checkpoint_snapshot
storage.type: hdfs
### Running Command
```shell
bin/start-seatunnel-flink-13-connector-v2.sh --config ./config/test.config --master yarn-per-job
Error Exception
When the data does not meet the maxBatchSize, the writer will not write the data. This will cause the data to not be flushed.
Zeta or Flink or Spark Version
No response
Java or Scala Version
No response
Screenshots
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct