From fceb9864a3bd74010268c485e4bb320e674308fc Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Tue, 30 Jul 2024 16:17:01 +0530 Subject: [PATCH 1/4] added test with composite PK --- .../postgresql/PostgresConnectorIT.java | 46 +++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java index e0d38d5897f..5163a7cfbbe 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java @@ -2933,6 +2933,52 @@ public void testYBCustomChangesForUpdate() throws Exception { assertValueField(actualRecords.allRecordsInOrder().get(2), "after/bb", null); } + @Test + public void testTableWithCompositePrimaryKey() throws Exception { + TestHelper.dropDefaultReplicationSlot(); + TestHelper.execute(CREATE_TABLES_STMT); + TestHelper.execute("CREATE TABLE s1.test_composite_pk (id INT, text_col TEXT, first_name VARCHAR(60), age INT, PRIMARY KEY(id, text_col));"); + + final Configuration.Builder configBuilder = TestHelper.defaultConfig() + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER) + .with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "s1.test_composite_pk"); + + start(YugabyteDBConnector.class, configBuilder.build()); + assertConnectorIsRunning(); + waitForStreamingRunning(); + + TestHelper.execute("INSERT INTO s1.test_composite_pk VALUES (1, 'ffff-ffff', 'Vaibhav', 25);"); + TestHelper.execute("UPDATE s1.test_composite_pk SET first_name='Vaibhav K' WHERE id = 1 AND text_col='ffff-ffff';"); + TestHelper.execute("DELETE FROM s1.test_composite_pk;"); + + SourceRecords actualRecords = consumeRecordsByTopic(4 /* 1 + 1 + 1 + tombstone */); + List records = actualRecords.allRecordsInOrder(); + + assertThat(records.size()).isEqualTo(4); + + // Assert insert record. + assertValueField(records.get(0), "after/id/value", 1); + assertValueField(records.get(0), "after/text_col/value", "ffff-ffff"); + assertValueField(records.get(0), "after/first_name/value", "Vaibhav"); + assertValueField(records.get(0), "after/age/value", 25); + + // Assert update record. + assertValueField(records.get(1), "after/id/value", 1); + assertValueField(records.get(1), "after/text_col/value", "ffff-ffff"); + assertValueField(records.get(1), "after/first_name/value", "Vaibhav K"); + assertValueField(records.get(1), "after/age", null); + + // Assert delete record. + assertValueField(records.get(2), "before/id/value", 1); + assertValueField(records.get(2), "before/text_col/value", "ffff-ffff"); + assertValueField(records.get(2), "before/first_name/value", null); + assertValueField(records.get(2), "before/age/value", null); + assertValueField(records.get(2), "after", null); + + // Validate tombstone record. + assertTombstone(records.get(3)); + } + @Test public void shouldNotSkipMessagesWithoutChangeWithReplicaIdentityChange() throws Exception { testSkipMessagesWithoutChange(ReplicaIdentityInfo.ReplicaIdentity.CHANGE); From 2240aebc895dcf2a437182d18db373953367d0a6 Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Mon, 5 Aug 2024 12:57:50 +0530 Subject: [PATCH 2/4] added more tests for hstore and a negative scenario --- .../connector/postgresql/PostgresConnectorIT.java | 13 +++++++++++++ .../postgresql/YBRecordsStreamProducerIT.java | 11 +++++------ 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java index 5163a7cfbbe..5ba70630265 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java @@ -2979,6 +2979,19 @@ public void testTableWithCompositePrimaryKey() throws Exception { assertTombstone(records.get(3)); } + @Test + public void shouldNotWorkWithReplicaIdentityChangeAndPgOutput() throws Exception { + final Configuration.Builder configBuilder = TestHelper.defaultConfig() + .with(PostgresConnectorConfig.SLOT_NAME, ReplicationConnection.Builder.DEFAULT_SLOT_NAME) + .with(PostgresConnectorConfig.PLUGIN_NAME, LogicalDecoder.PGOUTPUT) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER) + .with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "s2.a"); + + start(YugabyteDBConnector.class, configBuilder.build(), (success, message, error) -> { + assertFalse(success); + }); + } + @Test public void shouldNotSkipMessagesWithoutChangeWithReplicaIdentityChange() throws Exception { testSkipMessagesWithoutChange(ReplicaIdentityInfo.ReplicaIdentity.CHANGE); diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/YBRecordsStreamProducerIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/YBRecordsStreamProducerIT.java index 1a4c115dcdb..d577cd860fa 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/YBRecordsStreamProducerIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/YBRecordsStreamProducerIT.java @@ -1352,7 +1352,6 @@ public void shouldHandleToastedJsonArrayColumn() throws Exception { Envelope.FieldName.AFTER); } - @Ignore("hstore not supported yet") @Test @FixFor("DBZ-6379") public void shouldHandleToastedHstoreInHstoreMapMode() throws Exception { @@ -1376,6 +1375,8 @@ public void shouldHandleToastedHstoreInHstoreMapMode() throws Exception { Envelope.FieldName.AFTER); statement = "UPDATE test_toast_table SET text = 'text';"; + LOGGER.info("VKVK test verified till here"); + consumer.expects(1); executeAndWait(statement); consumer.process(record -> { @@ -1384,12 +1385,10 @@ public void shouldHandleToastedHstoreInHstoreMapMode() throws Exception { assertEquals(Arrays.asList("id", "text", "col"), tbl.retrieveColumnNames()); }); }); - colValue.clear(); - colValue.put(DecoderDifferences.optionalToastedValuePlaceholder(), DecoderDifferences.optionalToastedValuePlaceholder()); + + // YB Note: Value for 'col' will not be present since replica identity is CHANGE. assertRecordSchemaAndValues(Arrays.asList( - new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "text"), - new SchemaAndValueField("col", SchemaBuilder.map(SchemaBuilder.STRING_SCHEMA, - SchemaBuilder.OPTIONAL_STRING_SCHEMA).optional().build(), colValue)), + new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "text")), consumer.remove(), Envelope.FieldName.AFTER); } From 0499763b3235d93d0030d8d7410ca97881313020 Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Mon, 5 Aug 2024 13:26:54 +0530 Subject: [PATCH 3/4] modified replica identity tests for pgoutput too --- .../postgresql/YugabyteReplicaIdentityIT.java | 136 +++++++++++++++--- 1 file changed, 113 insertions(+), 23 deletions(-) diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/YugabyteReplicaIdentityIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/YugabyteReplicaIdentityIT.java index 9d92f342bcd..8bd777c81ed 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/YugabyteReplicaIdentityIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/YugabyteReplicaIdentityIT.java @@ -2,6 +2,7 @@ import io.debezium.config.Configuration; import io.debezium.data.Envelope; +import io.debezium.data.VerifyRecord; import io.debezium.embedded.AbstractConnectorTest; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; @@ -61,11 +62,22 @@ public void after() { } @Test - public void shouldProduceOldValuesWithReplicaIdentityFull() throws Exception { + public void oldValuesWithReplicaIdentityFullForPgOutput() throws Exception { + shouldProduceOldValuesWithReplicaIdentityFull(PostgresConnectorConfig.LogicalDecoder.PGOUTPUT); + } + + @Test + public void oldValuesWithReplicaIdentityFullForYbOutput() throws Exception { + shouldProduceOldValuesWithReplicaIdentityFull(PostgresConnectorConfig.LogicalDecoder.YBOUTPUT); + } + + public void shouldProduceOldValuesWithReplicaIdentityFull(PostgresConnectorConfig.LogicalDecoder logicalDecoder) throws Exception { TestHelper.execute("ALTER TABLE s1.a REPLICA IDENTITY FULL;"); + TestHelper.execute("ALTER TABLE s2.a REPLICA IDENTITY FULL;"); Configuration config = TestHelper.defaultConfig() .with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER.getValue()) + .with(PostgresConnectorConfig.PLUGIN_NAME, logicalDecoder.getPostgresPluginName()) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE) .build(); start(YugabyteDBConnector.class, config); @@ -88,22 +100,44 @@ public void shouldProduceOldValuesWithReplicaIdentityFull() throws Exception { SourceRecord insertRecord = records.get(0); SourceRecord updateRecord = records.get(1); - YBVerifyRecord.isValidInsert(insertRecord, PK_FIELD, 1); - YBVerifyRecord.isValidUpdate(updateRecord, PK_FIELD, 1); + if (logicalDecoder.isYBOutput()) { + YBVerifyRecord.isValidInsert(insertRecord, PK_FIELD, 1); + YBVerifyRecord.isValidUpdate(updateRecord, PK_FIELD, 1); + } else { + VerifyRecord.isValidInsert(insertRecord, PK_FIELD, 1); + VerifyRecord.isValidUpdate(updateRecord, PK_FIELD, 1); + } Struct updateRecordValue = (Struct) updateRecord.value(); assertThat(updateRecordValue.get(Envelope.FieldName.AFTER)).isNotNull(); assertThat(updateRecordValue.get(Envelope.FieldName.BEFORE)).isNotNull(); - assertThat(updateRecordValue.getStruct(Envelope.FieldName.BEFORE).getStruct("aa").getInt32("value")).isEqualTo(1); - assertThat(updateRecordValue.getStruct(Envelope.FieldName.AFTER).getStruct("aa").getInt32("value")).isEqualTo(12345); + + if (logicalDecoder.isYBOutput()) { + assertThat(updateRecordValue.getStruct(Envelope.FieldName.BEFORE).getStruct("aa").getInt32("value")).isEqualTo(1); + assertThat(updateRecordValue.getStruct(Envelope.FieldName.AFTER).getStruct("aa").getInt32("value")).isEqualTo(12345); + } else { + assertThat(updateRecordValue.getStruct(Envelope.FieldName.BEFORE).get("aa")).isEqualTo(1); + assertThat(updateRecordValue.getStruct(Envelope.FieldName.AFTER).get("aa")).isEqualTo(12345); + } } @Test - public void shouldProduceExpectedValuesWithReplicaIdentityDefault() throws Exception { + public void replicaIdentityDefaultWithPgOutput() throws Exception { + shouldProduceExpectedValuesWithReplicaIdentityDefault(PostgresConnectorConfig.LogicalDecoder.PGOUTPUT); + } + + @Test + public void replicaIdentityDefaultWithYbOutput() throws Exception { + shouldProduceExpectedValuesWithReplicaIdentityDefault(PostgresConnectorConfig.LogicalDecoder.YBOUTPUT); + } + + public void shouldProduceExpectedValuesWithReplicaIdentityDefault(PostgresConnectorConfig.LogicalDecoder logicalDecoder) throws Exception { + TestHelper.execute("ALTER TABLE s1.a REPLICA IDENTITY DEFAULT;"); TestHelper.execute("ALTER TABLE s2.a REPLICA IDENTITY DEFAULT;"); Configuration config = TestHelper.defaultConfig() .with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER.getValue()) + .with(PostgresConnectorConfig.PLUGIN_NAME, logicalDecoder.getPostgresPluginName()) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE) .build(); start(YugabyteDBConnector.class, config); @@ -126,17 +160,28 @@ public void shouldProduceExpectedValuesWithReplicaIdentityDefault() throws Excep SourceRecord insertRecord = records.get(0); SourceRecord updateRecord = records.get(1); - YBVerifyRecord.isValidInsert(insertRecord, PK_FIELD, 1); - YBVerifyRecord.isValidUpdate(updateRecord, PK_FIELD, 1); + if (logicalDecoder.isYBOutput()) { + YBVerifyRecord.isValidInsert(insertRecord, PK_FIELD, 1); + YBVerifyRecord.isValidUpdate(updateRecord, PK_FIELD, 1); + } else { + VerifyRecord.isValidInsert(insertRecord, PK_FIELD, 1); + VerifyRecord.isValidUpdate(updateRecord, PK_FIELD, 1); + } Struct updateRecordValue = (Struct) updateRecord.value(); assertThat(updateRecordValue.get(Envelope.FieldName.AFTER)).isNotNull(); assertThat(updateRecordValue.get(Envelope.FieldName.BEFORE)).isNull(); // After field will have entries for all the columns. - assertThat(updateRecordValue.getStruct(Envelope.FieldName.AFTER).getStruct("pk").getInt32("value")).isEqualTo(1); - assertThat(updateRecordValue.getStruct(Envelope.FieldName.AFTER).getStruct("aa").getInt32("value")).isEqualTo(12345); - assertThat(updateRecordValue.getStruct(Envelope.FieldName.AFTER).getStruct("bb").getString("value")).isEqualTo("random text value"); + if (logicalDecoder.isYBOutput()) { + assertThat(updateRecordValue.getStruct(Envelope.FieldName.AFTER).getStruct("pk").getInt32("value")).isEqualTo(1); + assertThat(updateRecordValue.getStruct(Envelope.FieldName.AFTER).getStruct("aa").getInt32("value")).isEqualTo(12345); + assertThat(updateRecordValue.getStruct(Envelope.FieldName.AFTER).getStruct("bb").getString("value")).isEqualTo("random text value"); + } else { + assertThat(updateRecordValue.getStruct(Envelope.FieldName.AFTER).get("pk")).isEqualTo(1); + assertThat(updateRecordValue.getStruct(Envelope.FieldName.AFTER).get("aa")).isEqualTo(12345); + assertThat(updateRecordValue.getStruct(Envelope.FieldName.AFTER).get("bb")).isEqualTo("random text value"); + } } @Test @@ -200,6 +245,7 @@ public void shouldThrowExceptionWithReplicaIdentityNothingOnUpdatesAndDeletes() Details: https://www.postgresql.org/docs/current/logical-replication-publication.html */ + TestHelper.execute("ALTER TABLE s1.a REPLICA IDENTITY NOTHING;"); TestHelper.execute("ALTER TABLE s2.a REPLICA IDENTITY NOTHING;"); Configuration config = TestHelper.defaultConfig() @@ -235,10 +281,21 @@ public void shouldThrowExceptionWithReplicaIdentityNothingOnUpdatesAndDeletes() } @Test - public void shouldHaveBeforeImageForDeletesForReplicaIdentityFull() throws Exception { + public void beforeImageForDeleteWithReplicaIdentityFullAndPgOutput() throws Exception { + shouldHaveBeforeImageForDeletesForReplicaIdentityFull(PostgresConnectorConfig.LogicalDecoder.PGOUTPUT); + } + + @Test + public void beforeImageForDeleteWithReplicaIdentityFullAndYbOutput() throws Exception { + shouldHaveBeforeImageForDeletesForReplicaIdentityFull(PostgresConnectorConfig.LogicalDecoder.YBOUTPUT); + } + + public void shouldHaveBeforeImageForDeletesForReplicaIdentityFull(PostgresConnectorConfig.LogicalDecoder logicalDecoder) throws Exception { + TestHelper.execute("ALTER TABLE s1.a REPLICA IDENTITY FULL;"); TestHelper.execute("ALTER TABLE s2.a REPLICA IDENTITY FULL;"); Configuration config = TestHelper.defaultConfig() .with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER.getValue()) + .with(PostgresConnectorConfig.PLUGIN_NAME, logicalDecoder.getPostgresPluginName()) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE) .build(); start(YugabyteDBConnector.class, config); @@ -261,25 +318,47 @@ public void shouldHaveBeforeImageForDeletesForReplicaIdentityFull() throws Excep SourceRecord insertRecord = records.get(0); SourceRecord deleteRecord = records.get(1); - YBVerifyRecord.isValidInsert(insertRecord, PK_FIELD, 1); - YBVerifyRecord.isValidDelete(deleteRecord, PK_FIELD, 1); + if (logicalDecoder.isYBOutput()) { + YBVerifyRecord.isValidInsert(insertRecord, PK_FIELD, 1); + YBVerifyRecord.isValidDelete(deleteRecord, PK_FIELD, 1); + } else { + VerifyRecord.isValidInsert(insertRecord, PK_FIELD, 1); + VerifyRecord.isValidDelete(deleteRecord, PK_FIELD, 1); + } Struct deleteRecordValue = (Struct) deleteRecord.value(); assertThat(deleteRecordValue.get(Envelope.FieldName.AFTER)).isNull(); assertThat(deleteRecordValue.get(Envelope.FieldName.BEFORE)).isNotNull(); // Before field will have entries for all the columns. - assertThat(deleteRecordValue.getStruct(Envelope.FieldName.BEFORE).getStruct("pk").getInt32("value")).isEqualTo(1); - assertThat(deleteRecordValue.getStruct(Envelope.FieldName.BEFORE).getStruct("aa").getInt32("value")).isEqualTo(22); - assertThat(deleteRecordValue.getStruct(Envelope.FieldName.BEFORE).getStruct("bb").getString("value")).isEqualTo("random text value"); + if (logicalDecoder.isYBOutput()) { + assertThat(deleteRecordValue.getStruct(Envelope.FieldName.BEFORE).getStruct("pk").getInt32("value")).isEqualTo(1); + assertThat(deleteRecordValue.getStruct(Envelope.FieldName.BEFORE).getStruct("aa").getInt32("value")).isEqualTo(22); + assertThat(deleteRecordValue.getStruct(Envelope.FieldName.BEFORE).getStruct("bb").getString("value")).isEqualTo("random text value"); + } else { + assertThat(deleteRecordValue.getStruct(Envelope.FieldName.BEFORE).get("pk")).isEqualTo(1); + assertThat(deleteRecordValue.getStruct(Envelope.FieldName.BEFORE).get("aa")).isEqualTo(22); + assertThat(deleteRecordValue.getStruct(Envelope.FieldName.BEFORE).get("bb")).isEqualTo("random text value"); + } } @Test - public void shouldHaveBeforeImageForDeletesForReplicaIdentityDefault() throws Exception { + public void beforeImageForDeleteWithReplicaIdentityDefaultAndPgOutput() throws Exception { + shouldHaveBeforeImageForDeletesForReplicaIdentityDefault(PostgresConnectorConfig.LogicalDecoder.PGOUTPUT); + } + + @Test + public void beforeImageForDeleteWithReplicaIdentityDefaultAndYbOutput() throws Exception { + shouldHaveBeforeImageForDeletesForReplicaIdentityDefault(PostgresConnectorConfig.LogicalDecoder.YBOUTPUT); + } + + public void shouldHaveBeforeImageForDeletesForReplicaIdentityDefault(PostgresConnectorConfig.LogicalDecoder logicalDecoder) throws Exception { + TestHelper.execute("ALTER TABLE s1.a REPLICA IDENTITY DEFAULT;"); TestHelper.execute("ALTER TABLE s2.a REPLICA IDENTITY DEFAULT;"); Configuration config = TestHelper.defaultConfig() .with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER.getValue()) + .with(PostgresConnectorConfig.PLUGIN_NAME, logicalDecoder.getPostgresPluginName()) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE) .build(); start(YugabyteDBConnector.class, config); @@ -302,17 +381,28 @@ public void shouldHaveBeforeImageForDeletesForReplicaIdentityDefault() throws Ex SourceRecord insertRecord = records.get(0); SourceRecord deleteRecord = records.get(1); - YBVerifyRecord.isValidInsert(insertRecord, PK_FIELD, 1); - YBVerifyRecord.isValidDelete(deleteRecord, PK_FIELD, 1); + if (logicalDecoder.isYBOutput()) { + YBVerifyRecord.isValidInsert(insertRecord, PK_FIELD, 1); + YBVerifyRecord.isValidDelete(deleteRecord, PK_FIELD, 1); + } else { + VerifyRecord.isValidInsert(insertRecord, PK_FIELD, 1); + VerifyRecord.isValidDelete(deleteRecord, PK_FIELD, 1); + } Struct deleteRecordValue = (Struct) deleteRecord.value(); assertThat(deleteRecordValue.get(Envelope.FieldName.AFTER)).isNull(); assertThat(deleteRecordValue.get(Envelope.FieldName.BEFORE)).isNotNull(); // Before field will have entries only for the primary key columns. - assertThat(deleteRecordValue.getStruct(Envelope.FieldName.BEFORE).getStruct("pk").getInt32("value")).isEqualTo(1); - assertThat(deleteRecordValue.getStruct(Envelope.FieldName.BEFORE).getStruct("aa").getInt32("value")).isNull(); - assertThat(deleteRecordValue.getStruct(Envelope.FieldName.BEFORE).getStruct("bb").getString("value")).isNull(); + if (logicalDecoder.isYBOutput()) { + assertThat(deleteRecordValue.getStruct(Envelope.FieldName.BEFORE).getStruct("pk").getInt32("value")).isEqualTo(1); + assertThat(deleteRecordValue.getStruct(Envelope.FieldName.BEFORE).getStruct("aa").getInt32("value")).isNull(); + assertThat(deleteRecordValue.getStruct(Envelope.FieldName.BEFORE).getStruct("bb").getString("value")).isNull(); + } else { + assertThat(deleteRecordValue.getStruct(Envelope.FieldName.BEFORE).get("pk")).isEqualTo(1); + assertThat(deleteRecordValue.getStruct(Envelope.FieldName.BEFORE).get("aa")).isNull(); + assertThat(deleteRecordValue.getStruct(Envelope.FieldName.BEFORE).get("bb")).isNull(); + } } @Test From 95dd289fe585a4df9351ef2c005a0be2d8c719f2 Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Mon, 5 Aug 2024 16:17:13 +0530 Subject: [PATCH 4/4] added test to stream table with foreign key --- debezium-connector-postgres/YB_DEV_NOTES.md | 4 +-- .../postgresql/PostgresConnectorIT.java | 29 +++++++++++++++++++ 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/debezium-connector-postgres/YB_DEV_NOTES.md b/debezium-connector-postgres/YB_DEV_NOTES.md index 5190e5fae3f..359d16adbf4 100644 --- a/debezium-connector-postgres/YB_DEV_NOTES.md +++ b/debezium-connector-postgres/YB_DEV_NOTES.md @@ -5,7 +5,7 @@ Since the smart driver changes require us to build the debezium core as well, build can be completed using: ```bash -./mvnw clean install -Dquick +./mvnw clean install -Passembly -DskipITs -DskipTests -pl debezium-connector-postgres -am ``` ## Running tests @@ -13,6 +13,6 @@ Since the smart driver changes require us to build the debezium core as well, bu 1. Compile PG connector code from the root directory with the above command. 2. Start YugabyteDB instance using `yugabyted`: ```bash - ./bin/yugabyted start --ui=false --advertise_address=127.0.0.1 --master_flags="yb_enable_cdc_consistent_snapshot_streams=true,allowed_preview_flags_csv={yb_enable_cdc_consistent_snapshot_streams,ysql_yb_enable_replication_commands},ysql_yb_enable_replication_commands=true,ysql_TEST_enable_replication_slot_consumption=true" --tserver_flags="allowed_preview_flags_csv={yb_enable_cdc_consistent_snapshot_streams,ysql_yb_enable_replication_commands,cdcsdk_enable_dynamic_table_support},cdcsdk_enable_dynamic_table_support=true,cdcsdk_publication_list_refresh_interval_secs=3,ysql_yb_enable_replication_commands=true,yb_enable_cdc_consistent_snapshot_streams=true,ysql_TEST_enable_replication_slot_consumption=true,ysql_cdc_active_replication_slot_window_ms=0,ysql_sequence_cache_method=server" + ./bin/yugabyted start --ui=false --advertise_address=127.0.0.1 --master_flags="ysql_cdc_active_replication_slot_window_ms=0" --tserver_flags="allowed_preview_flags_csv={cdcsdk_enable_dynamic_table_support},cdcsdk_enable_dynamic_table_support=true,cdcsdk_publication_list_refresh_interval_secs=3,ysql_cdc_active_replication_slot_window_ms=0,ysql_sequence_cache_method=server" ``` 3. Run tests \ No newline at end of file diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java index 5ba70630265..709239155db 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java @@ -2992,6 +2992,35 @@ public void shouldNotWorkWithReplicaIdentityChangeAndPgOutput() throws Exception }); } + @Test + public void foreignKeyOnTheTableShouldNotCauseIssues() throws Exception { + TestHelper.execute(CREATE_TABLES_STMT); + TestHelper.execute("CREATE TABLE s1.department (dept_id INT PRIMARY KEY, dept_name TEXT);"); + TestHelper.execute("CREATE TABLE s1.users (id SERIAL PRIMARY KEY, name TEXT, dept_id INT, FOREIGN KEY (dept_id) REFERENCES s1.department(dept_id));"); + + final Configuration.Builder configBuilder = TestHelper.defaultConfig() + .with(PostgresConnectorConfig.SLOT_NAME, "slot_for_fk") + .with(PostgresConnectorConfig.PUBLICATION_NAME, "publication_for_fk") + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER) + .with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "s1.users"); + + start(YugabyteDBConnector.class, configBuilder.build()); + assertConnectorIsRunning(); + waitForStreamingRunning(); + + TestHelper.execute("INSERT INTO s1.department VALUES (11, 'Industrial equipments');"); + TestHelper.execute("INSERT INTO s1.users VALUES (1, 'Vaibhav', 11);"); + + SourceRecords records = consumeRecordsByTopic(1); + assertThat(records.allRecordsInOrder().size()).isEqualTo(1); + + SourceRecord record = records.allRecordsInOrder().get(0); + YBVerifyRecord.isValidInsert(record, "id", 1); + assertValueField(record, "after/id/value", 1); + assertValueField(record, "after/name/value", "Vaibhav"); + assertValueField(record, "after/dept_id/value", 11); + } + @Test public void shouldNotSkipMessagesWithoutChangeWithReplicaIdentityChange() throws Exception { testSkipMessagesWithoutChange(ReplicaIdentityInfo.ReplicaIdentity.CHANGE);