Skip to content

[Bug] Apply DBZ-5398 postgres connector fix  #2710

@sandeepkdeva

Description

@sandeepkdeva

Search before asking

  • I searched in the issues and found nothing similar.

Flink version

1.16.0

Flink CDC version

2.4.2

Database and its version

AWS RDS - PostgreSQL 11.22 on x86_64-pc-linux-gnu, compiled by gcc (GCC) 7.3.1 20180712 (Red Hat 7.3.1-12), 64-bit

Minimal reproduce step

See original bug - https://issues.redhat.com/browse/DBZ-5398

Create table in postgres and add a unique index that uses a function like coalesce

create table test_tbl
(
    id   text                             not null,
    parent_id      text,
    cnt     integer   default 0,
    created_at  timestamp default LOCALTIMESTAMP,
    updated_at    timestamp default LOCALTIMESTAMP
);

create unique index test_tbl_idx
    on test_tbl (id, COALESCE(parent_id, ''::text));

Start the flink application to receive events from this table

What did you expect to see?

Expected Flink application to receive the data.

What did you see instead?

The connector throws this error at startup

14:51:09.530 [debezium-postgresconnector-postgres_cdc_source-change-event-source-coordinator] ERROR io.debezium.pipeline.ErrorHandler - Producer failure
java.lang.IllegalArgumentException: The column "COALESCE(parent_id, ''::text)" is referenced as PRIMARY KEY, but a matching column is not defined in table "public.test_tbl"!
	at io.debezium.relational.TableEditorImpl.lambda$updatePrimaryKeys$0(TableEditorImpl.java:106) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
	at java.util.ArrayList.removeIf(ArrayList.java:1702) ~[?:?]
	at java.util.ArrayList.removeIf(ArrayList.java:1690) ~[?:?]
	at io.debezium.relational.TableEditorImpl.updatePrimaryKeys(TableEditorImpl.java:102) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
	at io.debezium.relational.TableEditorImpl.create(TableEditorImpl.java:267) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
	at io.debezium.relational.Tables.lambda$overwriteTable$2(Tables.java:192) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
	at io.debezium.util.FunctionalReadWriteLock.write(FunctionalReadWriteLock.java:84) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
	at io.debezium.relational.Tables.overwriteTable(Tables.java:186) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
	at io.debezium.jdbc.JdbcConnection.readSchema(JdbcConnection.java:1214) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
	at io.debezium.connector.postgresql.PostgresSchema.refresh(PostgresSchema.java:87) ~[debezium-connector-postgres-1.9.7.Final.jar:1.9.7.Final]
	at io.debezium.connector.postgresql.PostgresTaskContext.refreshSchema(PostgresTaskContext.java:68) ~[debezium-connector-postgres-1.9.7.Final.jar:1.9.7.Final]
	at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.init(PostgresStreamingChangeEventSource.java:118) ~[flink-connector-postgres-cdc-2.4.2.jar:1.9.7.Final]
	at io.debezium.pipeline.ChangeEventSourceCoordinator.initStreamEvents(ChangeEventSourceCoordinator.java:182) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
	at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:172) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
	at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:141) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
	at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:109) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
	at java.lang.Thread.run(Thread.java:829) [?:?]
14:51:10.004 [debezium-engine] ERROR com.ververica.cdc.debezium.internal.Handover - Reporting error:
org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
	at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:50) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
	at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:116) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
	at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: java.lang.IllegalArgumentException: The column "COALESCE(parent_id, ''::text)" is referenced as PRIMARY KEY, but a matching column is not defined in table "public.test_tbl"!
	at io.debezium.relational.TableEditorImpl.lambda$updatePrimaryKeys$0(TableEditorImpl.java:106) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
	at java.util.ArrayList.removeIf(ArrayList.java:1702) ~[?:?]
	at java.util.ArrayList.removeIf(ArrayList.java:1690) ~[?:?]
	at io.debezium.relational.TableEditorImpl.updatePrimaryKeys(TableEditorImpl.java:102) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
	at io.debezium.relational.TableEditorImpl.create(TableEditorImpl.java:267) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
	at io.debezium.relational.Tables.lambda$overwriteTable$2(Tables.java:192) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
	at io.debezium.util.FunctionalReadWriteLock.write(FunctionalReadWriteLock.java:84) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
	at io.debezium.relational.Tables.overwriteTable(Tables.java:186) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
	at io.debezium.jdbc.JdbcConnection.readSchema(JdbcConnection.java:1214) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
	at io.debezium.connector.postgresql.PostgresSchema.refresh(PostgresSchema.java:87) ~[debezium-connector-postgres-1.9.7.Final.jar:1.9.7.Final]
	at io.debezium.connector.postgresql.PostgresTaskContext.refreshSchema(PostgresTaskContext.java:68) ~[debezium-connector-postgres-1.9.7.Final.jar:1.9.7.Final]
	at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.init(PostgresStreamingChangeEventSource.java:118) ~[flink-connector-postgres-cdc-2.4.2.jar:1.9.7.Final]
	at io.debezium.pipeline.ChangeEventSourceCoordinator.initStreamEvents(ChangeEventSourceCoordinator.java:182) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
	at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:172) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
	at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:141) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
	at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:109) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
	... 5 more

Anything else?

The fix is already implemented in Debezium (debezium/debezium#3718), however it is applied only in 2.x version which cannot be used in this connector as it is a major change.
Also, this connector modifies Debezium classes and this fix is in one of those classes. Applying the same fix to these classes should solve this problem.

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions