Skip to content

Conversation

@zhangshenghang
Copy link
Member

fix #6986

  • Set Flink default Checkpoint time
  • In Flink BATCH mode, there is no need to set Checkpoint because it is useless.

Check list

Options.key("checkpoint.interval")
.longType()
.noDefaultValue()
.defaultValue(10000L)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, we should not change the default value. Because no checkpoint.interval value and have default value is different. The behavior of the two is different.

break;
}
}
CheckpointConfig checkpointConfig = environment.getCheckpointConfig();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about doing some refactoring? Currently there are two copies of the same processing logic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Hisoka-X Are you saying that these two modules have duplicate code ?
image

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okey, I will optimize it

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Hisoka-X The modification is completed, please check it , thk

@Hisoka-X
Copy link
Member

Hisoka-X commented Jul 8, 2024

cc @TyrantLucifer

Copy link
Member

@Hisoka-X Hisoka-X left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for update!

protected void setCheckpoint() {
super.setCheckpoint();
CheckpointConfig checkpointConfig = environment.getCheckpointConfig();
if (config.hasPath(EnvCommonOptions.CHECKPOINT_TIMEOUT.key())) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not move set checkpoint timeout to abstract environment?

Copy link
Member Author

@zhangshenghang zhangshenghang Jul 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Hisoka-X flink13-starter use config.hasPath(EnvCommonOptions.CHECKPOINT_TIMEOUT.key())
flink-starter-common use EnvironmentUtil.hasPathAndWaring(config, EnvCommonOptions.CHECKPOINT_TIMEOUT.key())

The method used in the previous code was different. In the flink-starter-common model, a log prompt

log.warn(
                    "the parameter '{}' will be deprecated, please use the 'flink.' prefix with the flink official configuration item to set it",
                    configKey);

The developers may have designed it this way, so I didn't make any changes.
If they need to be unified, I can modify it

protected void setCheckpoint() {
super.setCheckpoint();
CheckpointConfig checkpointConfig = environment.getCheckpointConfig();
if (EnvironmentUtil.hasPathAndWaring(config, EnvCommonOptions.CHECKPOINT_TIMEOUT.key())) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be same with another runtime environment. I believe it was a mistake.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be same with another runtime environment. I believe it was a mistake.

#7040 (comment)
Can you help me confirm this? If there is any problem, I will modify them to be the same.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Please use if (config.hasPath(EnvCommonOptions.CHECKPOINT_TIMEOUT.key())) {.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Please use if (config.hasPath(EnvCommonOptions.CHECKPOINT_TIMEOUT.key())) {.

@Hisoka-X modify completed

@EricJoy2048 EricJoy2048 merged commit 93c63d1 into apache:dev Jul 9, 2024
hawk9821 pushed a commit to hawk9821/seatunnel that referenced this pull request Jul 10, 2024
Thomas-HuWei pushed a commit to zilliztech/seatunnel that referenced this pull request Jul 10, 2024
Thomas-HuWei pushed a commit to zilliztech/seatunnel that referenced this pull request Jul 10, 2024
litiliu pushed a commit to litiliu/seatunnel that referenced this pull request Jul 12, 2024
hawk9821 pushed a commit to hawk9821/seatunnel that referenced this pull request Jul 13, 2024
chaorongzhi pushed a commit to chaorongzhi/seatunnel that referenced this pull request Aug 21, 2024
@zhangshenghang zhangshenghang deleted the hotfix-flink-checkpoint-default-value branch September 6, 2024 02:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] [seatunnel-flink-starter] If checkpoint is not set, data will be lost

3 participants