Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DBZ-PGYB] Add tests to enhance test suite for YugabyteDBConnector #150

Merged
merged 4 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions debezium-connector-postgres/YB_DEV_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
Since the smart driver changes require us to build the debezium core as well, build can be completed using:

```bash
./mvnw clean install -Dquick
./mvnw clean install -Passembly -DskipITs -DskipTests -pl debezium-connector-postgres -am
```

## Running tests

1. Compile PG connector code from the root directory with the above command.
2. Start YugabyteDB instance using `yugabyted`:
```bash
./bin/yugabyted start --ui=false --advertise_address=127.0.0.1 --master_flags="yb_enable_cdc_consistent_snapshot_streams=true,allowed_preview_flags_csv={yb_enable_cdc_consistent_snapshot_streams,ysql_yb_enable_replication_commands},ysql_yb_enable_replication_commands=true,ysql_TEST_enable_replication_slot_consumption=true" --tserver_flags="allowed_preview_flags_csv={yb_enable_cdc_consistent_snapshot_streams,ysql_yb_enable_replication_commands,cdcsdk_enable_dynamic_table_support},cdcsdk_enable_dynamic_table_support=true,cdcsdk_publication_list_refresh_interval_secs=3,ysql_yb_enable_replication_commands=true,yb_enable_cdc_consistent_snapshot_streams=true,ysql_TEST_enable_replication_slot_consumption=true,ysql_cdc_active_replication_slot_window_ms=0,ysql_sequence_cache_method=server"
./bin/yugabyted start --ui=false --advertise_address=127.0.0.1 --master_flags="ysql_cdc_active_replication_slot_window_ms=0" --tserver_flags="allowed_preview_flags_csv={cdcsdk_enable_dynamic_table_support},cdcsdk_enable_dynamic_table_support=true,cdcsdk_publication_list_refresh_interval_secs=3,ysql_cdc_active_replication_slot_window_ms=0,ysql_sequence_cache_method=server"
```
3. Run tests
Original file line number Diff line number Diff line change
Expand Up @@ -2933,6 +2933,94 @@ public void testYBCustomChangesForUpdate() throws Exception {
assertValueField(actualRecords.allRecordsInOrder().get(2), "after/bb", null);
}

@Test
public void testTableWithCompositePrimaryKey() throws Exception {
TestHelper.dropDefaultReplicationSlot();
TestHelper.execute(CREATE_TABLES_STMT);
TestHelper.execute("CREATE TABLE s1.test_composite_pk (id INT, text_col TEXT, first_name VARCHAR(60), age INT, PRIMARY KEY(id, text_col));");

final Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER)
.with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "s1.test_composite_pk");

start(YugabyteDBConnector.class, configBuilder.build());
assertConnectorIsRunning();
waitForStreamingRunning();

TestHelper.execute("INSERT INTO s1.test_composite_pk VALUES (1, 'ffff-ffff', 'Vaibhav', 25);");
TestHelper.execute("UPDATE s1.test_composite_pk SET first_name='Vaibhav K' WHERE id = 1 AND text_col='ffff-ffff';");
TestHelper.execute("DELETE FROM s1.test_composite_pk;");

SourceRecords actualRecords = consumeRecordsByTopic(4 /* 1 + 1 + 1 + tombstone */);
List<SourceRecord> records = actualRecords.allRecordsInOrder();

assertThat(records.size()).isEqualTo(4);

// Assert insert record.
assertValueField(records.get(0), "after/id/value", 1);
assertValueField(records.get(0), "after/text_col/value", "ffff-ffff");
assertValueField(records.get(0), "after/first_name/value", "Vaibhav");
assertValueField(records.get(0), "after/age/value", 25);

// Assert update record.
assertValueField(records.get(1), "after/id/value", 1);
assertValueField(records.get(1), "after/text_col/value", "ffff-ffff");
assertValueField(records.get(1), "after/first_name/value", "Vaibhav K");
assertValueField(records.get(1), "after/age", null);

// Assert delete record.
assertValueField(records.get(2), "before/id/value", 1);
assertValueField(records.get(2), "before/text_col/value", "ffff-ffff");
assertValueField(records.get(2), "before/first_name/value", null);
assertValueField(records.get(2), "before/age/value", null);
assertValueField(records.get(2), "after", null);

// Validate tombstone record.
assertTombstone(records.get(3));
}

@Test
public void shouldNotWorkWithReplicaIdentityChangeAndPgOutput() throws Exception {
final Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SLOT_NAME, ReplicationConnection.Builder.DEFAULT_SLOT_NAME)
.with(PostgresConnectorConfig.PLUGIN_NAME, LogicalDecoder.PGOUTPUT)
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER)
.with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "s2.a");

start(YugabyteDBConnector.class, configBuilder.build(), (success, message, error) -> {
assertFalse(success);
});
}

@Test
public void foreignKeyOnTheTableShouldNotCauseIssues() throws Exception {
TestHelper.execute(CREATE_TABLES_STMT);
TestHelper.execute("CREATE TABLE s1.department (dept_id INT PRIMARY KEY, dept_name TEXT);");
TestHelper.execute("CREATE TABLE s1.users (id SERIAL PRIMARY KEY, name TEXT, dept_id INT, FOREIGN KEY (dept_id) REFERENCES s1.department(dept_id));");

final Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SLOT_NAME, "slot_for_fk")
.with(PostgresConnectorConfig.PUBLICATION_NAME, "publication_for_fk")
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER)
.with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "s1.users");

start(YugabyteDBConnector.class, configBuilder.build());
assertConnectorIsRunning();
waitForStreamingRunning();

TestHelper.execute("INSERT INTO s1.department VALUES (11, 'Industrial equipments');");
TestHelper.execute("INSERT INTO s1.users VALUES (1, 'Vaibhav', 11);");

SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.allRecordsInOrder().size()).isEqualTo(1);

SourceRecord record = records.allRecordsInOrder().get(0);
YBVerifyRecord.isValidInsert(record, "id", 1);
assertValueField(record, "after/id/value", 1);
assertValueField(record, "after/name/value", "Vaibhav");
assertValueField(record, "after/dept_id/value", 11);
}

@Test
public void shouldNotSkipMessagesWithoutChangeWithReplicaIdentityChange() throws Exception {
testSkipMessagesWithoutChange(ReplicaIdentityInfo.ReplicaIdentity.CHANGE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1352,7 +1352,6 @@ public void shouldHandleToastedJsonArrayColumn() throws Exception {
Envelope.FieldName.AFTER);
}

@Ignore("hstore not supported yet")
@Test
@FixFor("DBZ-6379")
public void shouldHandleToastedHstoreInHstoreMapMode() throws Exception {
Expand All @@ -1376,6 +1375,8 @@ public void shouldHandleToastedHstoreInHstoreMapMode() throws Exception {
Envelope.FieldName.AFTER);
statement = "UPDATE test_toast_table SET text = 'text';";

LOGGER.info("VKVK test verified till here");

consumer.expects(1);
executeAndWait(statement);
consumer.process(record -> {
Expand All @@ -1384,12 +1385,10 @@ public void shouldHandleToastedHstoreInHstoreMapMode() throws Exception {
assertEquals(Arrays.asList("id", "text", "col"), tbl.retrieveColumnNames());
});
});
colValue.clear();
colValue.put(DecoderDifferences.optionalToastedValuePlaceholder(), DecoderDifferences.optionalToastedValuePlaceholder());

// YB Note: Value for 'col' will not be present since replica identity is CHANGE.
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "text"),
new SchemaAndValueField("col", SchemaBuilder.map(SchemaBuilder.STRING_SCHEMA,
SchemaBuilder.OPTIONAL_STRING_SCHEMA).optional().build(), colValue)),
new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "text")),
consumer.remove(),
Envelope.FieldName.AFTER);
}
Expand Down
Loading