Skip to content

Commit

Permalink
fixed task id issue
Browse files Browse the repository at this point in the history
  • Loading branch information
vaibhav-yb committed Oct 17, 2024
1 parent 0aad529 commit 103b25e
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ protected void executeChangeEventSources(CdcSourceTaskContext taskContext, Snaps
final PostgresOffsetContext previousOffset = previousOffsets.getTheOnlyOffset();

previousLogContext.set(taskContext.configureLoggingContext(
String.format("snapshot|{}", taskContext.getTaskId()), partition));
String.format("snapshot|%s", taskContext.getTaskId()), partition));
SnapshotResult<PostgresOffsetContext> snapshotResult = doSnapshot(snapshotSource, context, partition, previousOffset);

getSignalProcessor(previousOffsets).ifPresent(s -> s.setContext(snapshotResult.getOffset()));
Expand All @@ -96,7 +96,7 @@ protected void executeChangeEventSources(CdcSourceTaskContext taskContext, Snaps
}
LOGGER.info("Transitioning to streaming");
previousLogContext.set(taskContext.configureLoggingContext(
String.format("streaming|{}", taskContext.getTaskId()), partition));
String.format("streaming|%s", taskContext.getTaskId()), partition));
streamEvents(context, partition, snapshotResult.getOffset());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1216,6 +1216,7 @@ protected SourceInfoStructMaker<? extends AbstractSourceInfo> getSourceInfoStruc
SNAPSHOT_MODE,
SNAPSHOT_MODE_CLASS,
YB_CONSISTENT_SNAPSHOT,
PRIMARY_KEY_HASH_COLUMNS,
HSTORE_HANDLING_MODE,
BINARY_HANDLING_MODE,
SCHEMA_NAME_ADJUSTMENT_MODE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1116,11 +1116,25 @@ public void shouldHaveBeforeImageOfUpdatedRow() throws InterruptedException {
assertThat(updateRecordValue.getStruct(Envelope.FieldName.AFTER).getStruct("aa").getInt32("value")).isEqualTo(404);
}

@Test
public void shouldFailIfNoPrimaryKeyHashColumnSpecifiedWithSnapshotModeParallel() throws Exception {
Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.PARALLEL.getValue())
.with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.test")
.with(PostgresConnectorConfig.PRIMARY_KEY_HASH_COLUMNS, "");

start(YugabyteDBConnector.class, configBuilder.build(), (success, message, error) -> {
assertFalse(success);
assertThat(message.contains("primary.key.hash.columns cannot be empty when snapshot.mode is 'parallel'")).isTrue();
});
}

@Test
public void shouldFailIfParallelSnapshotRunWithMultipleTables() throws Exception {
Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.PARALLEL.getValue())
.with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.test,public.test2");
.with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.test,public.test2")
.with(PostgresConnectorConfig.PRIMARY_KEY_HASH_COLUMNS, "id");

start(YugabyteDBConnector.class, configBuilder.build(), (success, message, error) -> {
assertFalse(success);
Expand All @@ -1133,7 +1147,8 @@ public void shouldFailIfParallelSnapshotRunWithMultipleTables() throws Exception
public void shouldFailWithSnapshotModeParallelIfNoTableIncludeListProvided() throws Exception {
Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.PARALLEL.getValue())
.with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "");
.with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "")
.with(PostgresConnectorConfig.PRIMARY_KEY_HASH_COLUMNS, "id");

start(YugabyteDBConnector.class, configBuilder.build(), (success, message, error) -> {
assertFalse(success);
Expand All @@ -1147,7 +1162,8 @@ public void shouldFailIfSnapshotModeParallelHasPublicationAutoCreateModeAllTable
Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.PARALLEL.getValue())
.with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.test")
.with(PostgresConnectorConfig.PUBLICATION_AUTOCREATE_MODE, PostgresConnectorConfig.AutoCreateMode.ALL_TABLES);
.with(PostgresConnectorConfig.PUBLICATION_AUTOCREATE_MODE, PostgresConnectorConfig.AutoCreateMode.ALL_TABLES)
.with(PostgresConnectorConfig.PRIMARY_KEY_HASH_COLUMNS, "id");;

start(YugabyteDBConnector.class, configBuilder.build(), (success, message, error) -> {
assertFalse(success);
Expand Down

0 comments on commit 103b25e

Please sign in to comment.