-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Description
Search before asking
- I searched in the issues and found nothing similar.
Flink version
1.16.0
Flink CDC version
2.4.2
Database and its version
AWS RDS - PostgreSQL 11.22 on x86_64-pc-linux-gnu, compiled by gcc (GCC) 7.3.1 20180712 (Red Hat 7.3.1-12), 64-bit
Minimal reproduce step
See original bug - https://issues.redhat.com/browse/DBZ-5398
Create table in postgres and add a unique index that uses a function like coalesce
create table test_tbl
(
id text not null,
parent_id text,
cnt integer default 0,
created_at timestamp default LOCALTIMESTAMP,
updated_at timestamp default LOCALTIMESTAMP
);
create unique index test_tbl_idx
on test_tbl (id, COALESCE(parent_id, ''::text));
Start the flink application to receive events from this table
What did you expect to see?
Expected Flink application to receive the data.
What did you see instead?
The connector throws this error at startup
14:51:09.530 [debezium-postgresconnector-postgres_cdc_source-change-event-source-coordinator] ERROR io.debezium.pipeline.ErrorHandler - Producer failure
java.lang.IllegalArgumentException: The column "COALESCE(parent_id, ''::text)" is referenced as PRIMARY KEY, but a matching column is not defined in table "public.test_tbl"!
at io.debezium.relational.TableEditorImpl.lambda$updatePrimaryKeys$0(TableEditorImpl.java:106) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
at java.util.ArrayList.removeIf(ArrayList.java:1702) ~[?:?]
at java.util.ArrayList.removeIf(ArrayList.java:1690) ~[?:?]
at io.debezium.relational.TableEditorImpl.updatePrimaryKeys(TableEditorImpl.java:102) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
at io.debezium.relational.TableEditorImpl.create(TableEditorImpl.java:267) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
at io.debezium.relational.Tables.lambda$overwriteTable$2(Tables.java:192) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
at io.debezium.util.FunctionalReadWriteLock.write(FunctionalReadWriteLock.java:84) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
at io.debezium.relational.Tables.overwriteTable(Tables.java:186) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
at io.debezium.jdbc.JdbcConnection.readSchema(JdbcConnection.java:1214) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
at io.debezium.connector.postgresql.PostgresSchema.refresh(PostgresSchema.java:87) ~[debezium-connector-postgres-1.9.7.Final.jar:1.9.7.Final]
at io.debezium.connector.postgresql.PostgresTaskContext.refreshSchema(PostgresTaskContext.java:68) ~[debezium-connector-postgres-1.9.7.Final.jar:1.9.7.Final]
at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.init(PostgresStreamingChangeEventSource.java:118) ~[flink-connector-postgres-cdc-2.4.2.jar:1.9.7.Final]
at io.debezium.pipeline.ChangeEventSourceCoordinator.initStreamEvents(ChangeEventSourceCoordinator.java:182) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:172) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:141) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:109) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at java.lang.Thread.run(Thread.java:829) [?:?]
14:51:10.004 [debezium-engine] ERROR com.ververica.cdc.debezium.internal.Handover - Reporting error:
org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:50) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:116) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: java.lang.IllegalArgumentException: The column "COALESCE(parent_id, ''::text)" is referenced as PRIMARY KEY, but a matching column is not defined in table "public.test_tbl"!
at io.debezium.relational.TableEditorImpl.lambda$updatePrimaryKeys$0(TableEditorImpl.java:106) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
at java.util.ArrayList.removeIf(ArrayList.java:1702) ~[?:?]
at java.util.ArrayList.removeIf(ArrayList.java:1690) ~[?:?]
at io.debezium.relational.TableEditorImpl.updatePrimaryKeys(TableEditorImpl.java:102) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
at io.debezium.relational.TableEditorImpl.create(TableEditorImpl.java:267) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
at io.debezium.relational.Tables.lambda$overwriteTable$2(Tables.java:192) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
at io.debezium.util.FunctionalReadWriteLock.write(FunctionalReadWriteLock.java:84) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
at io.debezium.relational.Tables.overwriteTable(Tables.java:186) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
at io.debezium.jdbc.JdbcConnection.readSchema(JdbcConnection.java:1214) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
at io.debezium.connector.postgresql.PostgresSchema.refresh(PostgresSchema.java:87) ~[debezium-connector-postgres-1.9.7.Final.jar:1.9.7.Final]
at io.debezium.connector.postgresql.PostgresTaskContext.refreshSchema(PostgresTaskContext.java:68) ~[debezium-connector-postgres-1.9.7.Final.jar:1.9.7.Final]
at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.init(PostgresStreamingChangeEventSource.java:118) ~[flink-connector-postgres-cdc-2.4.2.jar:1.9.7.Final]
at io.debezium.pipeline.ChangeEventSourceCoordinator.initStreamEvents(ChangeEventSourceCoordinator.java:182) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:172) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:141) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:109) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
... 5 more
Anything else?
The fix is already implemented in Debezium (debezium/debezium#3718), however it is applied only in 2.x version which cannot be used in this connector as it is a major change.
Also, this connector modifies Debezium classes and this fix is in one of those classes. Applying the same fix to these classes should solve this problem.
Are you willing to submit a PR?
- I'm willing to submit a PR!