Skip to content

Commit 07db169

Browse files
authored
[postgres] Close idle readers when snapshot finished (#2400)
1 parent b0e6492 commit 07db169

File tree

5 files changed

+49
-10
lines changed

5 files changed

+49
-10
lines changed

flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceBuilder.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,23 @@ public PostgresSourceBuilder<T> debeziumProperties(Properties properties) {
215215
return this;
216216
}
217217

218+
/**
219+
* scan.incremental.close-idle-reader.enabled
220+
*
221+
* <p>Whether to close idle readers at the end of the snapshot phase. This feature depends on
222+
* FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be
223+
* greater than or equal to 1.14, and the configuration <code>
224+
* 'execution.checkpointing.checkpoints-after-tasks-finish.enabled'</code> needs to be set to
225+
* true.
226+
*
227+
* <p>See more
228+
* https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished.
229+
*/
230+
public PostgresSourceBuilder<T> closeIdleReaders(boolean closeIdleReaders) {
231+
this.configFactory.closeIdleReaders(closeIdleReaders);
232+
return this;
233+
}
234+
218235
/**
219236
* The deserializer used to convert from consumed {@link
220237
* org.apache.kafka.connect.source.SourceRecord}.

flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.Properties;
2929
import java.util.UUID;
3030

31+
import static com.ververica.cdc.connectors.base.utils.EnvironmentUtils.checkSupportCheckpointsAfterTasksFinished;
3132
import static com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions.HEARTBEAT_INTERVAL;
3233
import static org.apache.flink.util.Preconditions.checkNotNull;
3334

@@ -51,6 +52,7 @@ public class PostgresSourceConfigFactory extends JdbcSourceConfigFactory {
5152
/** Creates a new {@link PostgresSourceConfig} for the given subtask {@code subtaskId}. */
5253
@Override
5354
public PostgresSourceConfig create(int subtaskId) {
55+
checkSupportCheckpointsAfterTasksFinished(closeIdleReaders);
5456
Properties props = new Properties();
5557
props.setProperty("connector.class", PostgresConnector.class.getCanonicalName());
5658
props.setProperty("plugin.name", pluginName);

flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLTableFactory.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.SCHEMA_NAME;
4343
import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.TABLE_NAME;
4444
import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.USERNAME;
45+
import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
4546
import static com.ververica.cdc.connectors.base.utils.ObjectUtils.doubleCompare;
4647
import static com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions.CHANGELOG_MODE;
4748
import static com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions.CHUNK_META_GROUP_SIZE;
@@ -108,6 +109,8 @@ public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context c
108109
String chunkKeyColumn =
109110
config.getOptional(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN).orElse(null);
110111

112+
boolean closeIdlerReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
113+
111114
if (enableParallelRead) {
112115
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
113116
validateIntegerOption(SCAN_SNAPSHOT_FETCH_SIZE, fetchSize, 1);
@@ -148,7 +151,8 @@ public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context c
148151
distributionFactorLower,
149152
heartbeatInterval,
150153
startupOptions,
151-
chunkKeyColumn);
154+
chunkKeyColumn,
155+
closeIdlerReaders);
152156
}
153157

154158
@Override
@@ -187,6 +191,7 @@ public Set<ConfigOption<?>> optionalOptions() {
187191
options.add(CONNECT_MAX_RETRIES);
188192
options.add(CONNECTION_POOL_SIZE);
189193
options.add(HEARTBEAT_INTERVAL);
194+
options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
190195
return options;
191196
}
192197

flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLTableSource.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ public class PostgreSQLTableSource implements ScanTableSource, SupportsReadingMe
8181
private final Duration heartbeatInterval;
8282
private final StartupOptions startupOptions;
8383
private final String chunkKeyColumn;
84+
private final boolean closeIdleReaders;
8485

8586
// --------------------------------------------------------------------------------------------
8687
// Mutable attributes
@@ -116,7 +117,8 @@ public PostgreSQLTableSource(
116117
double distributionFactorLower,
117118
Duration heartbeatInterval,
118119
StartupOptions startupOptions,
119-
@Nullable String chunkKeyColumn) {
120+
@Nullable String chunkKeyColumn,
121+
boolean closeIdleReaders) {
120122
this.physicalSchema = physicalSchema;
121123
this.port = port;
122124
this.hostname = checkNotNull(hostname);
@@ -144,6 +146,7 @@ public PostgreSQLTableSource(
144146
// Mutable attributes
145147
this.producedDataType = physicalSchema.toPhysicalRowDataType();
146148
this.metadataKeys = Collections.emptyList();
149+
this.closeIdleReaders = closeIdleReaders;
147150
}
148151

149152
@Override
@@ -202,6 +205,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
202205
.startupOptions(startupOptions)
203206
.chunkKeyColumn(chunkKeyColumn)
204207
.heartbeatInterval(heartbeatInterval)
208+
.closeIdleReaders(closeIdleReaders)
205209
.build();
206210
return SourceProvider.of(parallelSource);
207211
} else {
@@ -266,7 +270,8 @@ public DynamicTableSource copy() {
266270
distributionFactorLower,
267271
heartbeatInterval,
268272
startupOptions,
269-
chunkKeyColumn);
273+
chunkKeyColumn,
274+
closeIdleReaders);
270275
source.metadataKeys = metadataKeys;
271276
source.producedDataType = producedDataType;
272277
return source;
@@ -306,7 +311,8 @@ public boolean equals(Object o) {
306311
&& Objects.equals(distributionFactorLower, that.distributionFactorLower)
307312
&& Objects.equals(heartbeatInterval, that.heartbeatInterval)
308313
&& Objects.equals(startupOptions, that.startupOptions)
309-
&& Objects.equals(chunkKeyColumn, that.chunkKeyColumn);
314+
&& Objects.equals(chunkKeyColumn, that.chunkKeyColumn)
315+
&& Objects.equals(closeIdleReaders, that.closeIdleReaders);
310316
}
311317

312318
@Override
@@ -337,7 +343,8 @@ public int hashCode() {
337343
distributionFactorLower,
338344
heartbeatInterval,
339345
startupOptions,
340-
chunkKeyColumn);
346+
chunkKeyColumn,
347+
closeIdleReaders);
341348
}
342349

343350
@Override

flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import java.util.Map;
5050
import java.util.Properties;
5151

52+
import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
5253
import static com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions.CHUNK_META_GROUP_SIZE;
5354
import static com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions.CONNECTION_POOL_SIZE;
5455
import static com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions.CONNECT_MAX_RETRIES;
@@ -110,6 +111,8 @@ public class PostgreSQLTableFactoryTest {
110111
private static final String MY_SCHEMA = "public";
111112
private static final String MY_SLOT_NAME = "flinktest";
112113
private static final Properties PROPERTIES = new Properties();
114+
private static final boolean SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT =
115+
SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue();
113116

114117
@Test
115118
public void testCommonProperties() {
@@ -142,7 +145,8 @@ public void testCommonProperties() {
142145
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
143146
HEARTBEAT_INTERVAL.defaultValue(),
144147
StartupOptions.initial(),
145-
null);
148+
null,
149+
SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT);
146150
assertEquals(expectedSource, actualSource);
147151
}
148152

@@ -182,7 +186,8 @@ public void testOptionalProperties() {
182186
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
183187
HEARTBEAT_INTERVAL.defaultValue(),
184188
StartupOptions.initial(),
185-
null);
189+
null,
190+
SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT);
186191
assertEquals(expectedSource, actualSource);
187192
}
188193

@@ -222,7 +227,8 @@ public void testMetadataColumns() {
222227
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
223228
HEARTBEAT_INTERVAL.defaultValue(),
224229
StartupOptions.initial(),
225-
null);
230+
null,
231+
SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT);
226232
expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
227233
expectedSource.metadataKeys =
228234
Arrays.asList("op_ts", "database_name", "schema_name", "table_name");
@@ -272,7 +278,8 @@ public void testEnableParallelReadSource() {
272278
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
273279
HEARTBEAT_INTERVAL.defaultValue(),
274280
StartupOptions.initial(),
275-
null);
281+
null,
282+
SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT);
276283
assertEquals(expectedSource, actualSource);
277284
}
278285

@@ -312,7 +319,8 @@ public void testStartupFromLatestOffset() {
312319
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
313320
HEARTBEAT_INTERVAL.defaultValue(),
314321
StartupOptions.latest(),
315-
null);
322+
null,
323+
SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT);
316324
assertEquals(expectedSource, actualSource);
317325
}
318326

0 commit comments

Comments
 (0)