Skip to content

Commit d3473de

Browse files
[FLINK-36076][minor][cdc-runtime] Set isSchemaChangeApplying as volatile for thread safe consideration
This closes apache#3556.
1 parent 7f08c6c commit d3473de

File tree

3 files changed

+7
-5
lines changed

3 files changed

+7
-5
lines changed

flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,8 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
9797
// Build Source Operator
9898
DataSourceTranslator sourceTranslator = new DataSourceTranslator();
9999
DataStream<Event> stream =
100-
sourceTranslator.translate(pipelineDef.getSource(), env, pipelineDef.getConfig());
100+
sourceTranslator.translate(
101+
pipelineDef.getSource(), env, pipelineDef.getConfig(), parallelism);
101102

102103
// Build PreTransformOperator for processing Schema Event
103104
TransformTranslator transformTranslator = new TransformTranslator();

flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.flink.cdc.common.event.Event;
2424
import org.apache.flink.cdc.common.factories.DataSourceFactory;
2525
import org.apache.flink.cdc.common.factories.FactoryHelper;
26-
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
2726
import org.apache.flink.cdc.common.source.DataSource;
2827
import org.apache.flink.cdc.common.source.EventSourceProvider;
2928
import org.apache.flink.cdc.common.source.FlinkSourceFunctionProvider;
@@ -41,12 +40,14 @@
4140
public class DataSourceTranslator {
4241

4342
public DataStreamSource<Event> translate(
44-
SourceDef sourceDef, StreamExecutionEnvironment env, Configuration pipelineConfig) {
43+
SourceDef sourceDef,
44+
StreamExecutionEnvironment env,
45+
Configuration pipelineConfig,
46+
int sourceParallelism) {
4547
// Create data source
4648
DataSource dataSource = createDataSource(sourceDef, env, pipelineConfig);
4749

4850
// Get source provider
49-
final int sourceParallelism = pipelineConfig.get(PipelineOptions.PIPELINE_PARALLELISM);
5051
EventSourceProvider eventSourceProvider = dataSource.getEventSourceProvider();
5152
if (eventSourceProvider instanceof FlinkSourceProvider) {
5253
// Source

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public class SchemaRegistryRequestHandler implements Closeable {
9494
private final Set<Integer> flushedSinkWriters;
9595

9696
/** Status of the execution of current schema change request. */
97-
private boolean isSchemaChangeApplying;
97+
private volatile boolean isSchemaChangeApplying;
9898
/** Executor service to execute schema change. */
9999
private final ExecutorService schemaChangeThreadPool;
100100

0 commit comments

Comments
 (0)