-
Notifications
You must be signed in to change notification settings - Fork 2.2k
[Fix][Flink-starter]Hotfix flink checkpoint default value #7040
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Fix][Flink-starter]Hotfix flink checkpoint default value #7040
Conversation
…angshenghang/seatunnel into hotfix-flink-checkpoint-default-value
| Options.key("checkpoint.interval") | ||
| .longType() | ||
| .noDefaultValue() | ||
| .defaultValue(10000L) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, we should not change the default value. Because no checkpoint.interval value and have default value is different. The behavior of the two is different.
| break; | ||
| } | ||
| } | ||
| CheckpointConfig checkpointConfig = environment.getCheckpointConfig(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about doing some refactoring? Currently there are two copies of the same processing logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Hisoka-X Are you saying that these two modules have duplicate code ?

There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okey, I will optimize it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Hisoka-X The modification is completed, please check it , thk
Hisoka-X
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for update!
| protected void setCheckpoint() { | ||
| super.setCheckpoint(); | ||
| CheckpointConfig checkpointConfig = environment.getCheckpointConfig(); | ||
| if (config.hasPath(EnvCommonOptions.CHECKPOINT_TIMEOUT.key())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not move set checkpoint timeout to abstract environment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Hisoka-X flink13-starter use config.hasPath(EnvCommonOptions.CHECKPOINT_TIMEOUT.key())
flink-starter-common use EnvironmentUtil.hasPathAndWaring(config, EnvCommonOptions.CHECKPOINT_TIMEOUT.key())
The method used in the previous code was different. In the flink-starter-common model, a log prompt
log.warn(
"the parameter '{}' will be deprecated, please use the 'flink.' prefix with the flink official configuration item to set it",
configKey);
The developers may have designed it this way, so I didn't make any changes.
If they need to be unified, I can modify it
| protected void setCheckpoint() { | ||
| super.setCheckpoint(); | ||
| CheckpointConfig checkpointConfig = environment.getCheckpointConfig(); | ||
| if (EnvironmentUtil.hasPathAndWaring(config, EnvCommonOptions.CHECKPOINT_TIMEOUT.key())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it should be same with another runtime environment. I believe it was a mistake.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it should be same with another runtime environment. I believe it was a mistake.
#7040 (comment)
Can you help me confirm this? If there is any problem, I will modify them to be the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Please use if (config.hasPath(EnvCommonOptions.CHECKPOINT_TIMEOUT.key())) {.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Please use
if (config.hasPath(EnvCommonOptions.CHECKPOINT_TIMEOUT.key())) {.
@Hisoka-X modify completed
fix #6986
Check list
New License Guide