diff --git a/.github/workflows/yb-confluent-package.yml b/.github/workflows/yb-confluent-package.yml new file mode 100644 index 00000000000..2b8fc6fe5e0 --- /dev/null +++ b/.github/workflows/yb-confluent-package.yml @@ -0,0 +1,56 @@ +name: Create package for Confluent + +on: + workflow_dispatch: + inputs: + version: + description: "Version of the connector to be packaged" + required: true + type: string + isSnapshotBuild: + description: "Snapshot build?" + required: true + type: boolean + default: false + +permissions: write-all + +jobs: + build: + name: "Create YugabyteDBConnector package for" + runs-on: ubuntu-latest + steps: + - name: Checkout Action + uses: actions/checkout@v4 + + - name: Set up Java 17 + uses: actions/setup-java@v4 + with: + distribution: 'temurin' + java-version: 17 + + - name: Set version for release + run: ./mvnw versions:set -DnewVersion=${{ inputs.version }} + + - name: Compile jar file + run: ./mvnw clean install -Dquick -pl debezium-connector-postgres -pl debezium-bom -pl support/ide-configs -am + - name: Create GitHub release + id: create_release + uses: actions/create-release@v1 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + with: + tag_name: ${{ inputs.version }} + release_name: Release ${{ inputs.version }} + draft: true + prerelease: ${{ inputs.isSnapshotBuild }} + - name: Upload zip package GitHub release + id: upload-zip-package + uses: actions/upload-release-asset@v1 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + with: + upload_url: ${{ steps.create_release.outputs.upload_url }} + asset_path: ./debezium-connector-postgres/target/components/packages/yugabyte-debezium-connector-yugabytedb-${{ inputs.version }}.zip + asset_name: yugabyte-debezium-connector-yugabytedb-${{ inputs.version }}.zip + asset_content_type: application/zip diff --git a/debezium-connector-postgres/logos/yugabytedb.png b/debezium-connector-postgres/logos/yugabytedb.png new file mode 100644 index 00000000000..e7cd9dedd1d Binary files /dev/null and b/debezium-connector-postgres/logos/yugabytedb.png differ diff --git a/debezium-connector-postgres/pom.xml b/debezium-connector-postgres/pom.xml index ad7e81020ab..f81d9dd06b1 100644 --- a/debezium-connector-postgres/pom.xml +++ b/debezium-connector-postgres/pom.xml @@ -166,6 +166,70 @@ + + io.confluent + 0.12.0 + kafka-connect-maven-plugin + + + + kafka-connect + + + Kafka Connect YugabyteDB + https://docs.yugabyte.com/preview/explore/change-data-capture/using-logical-replication/yugabytedb-connector/ + + The YugabyteDB Connector is based on the Debezium API, and captures row-level changes in the schemas of a YugabyteDB database using the PostgreSQL replication protocol. + + The first time it connects to a YugabyteDB server, the connector takes a consistent snapshot of all schemas. After that snapshot is complete, the connector continuously captures row-level changes that insert, update, and delete database content, and that were committed to a YugabyteDB database. + + The connector generates data change event records and streams them to Kafka topics. For each table, the default behavior is that the connector streams all generated events to a separate Kafka topic for that table. Applications and services consume data change event records from that topic. + + logos/yugabytedb.png + + Yugabyte Inc. + Yugabyte supports the YugabyteDB source connector. + http://support.yugabyte.com/ + logos/yugabytedb.png + + yugabyte + organization + Yugabyte Inc. + https://www.yugabyte.com// + logos/yugabytedb.png + + quay.io/yugabyte + ybdb-debezium + ${project.version} + + https://github.com/yugabyte/debezium/tree/ybdb-debezium-2.5.2/debezium-connector-postgres + + + source + + + + Yugabyte + yugabytedb + source + cdc + wal + replication + + + + YugabyteDB 2024.1.x + + + + atLeastOnce + + + true + + + + org.apache.maven.plugins maven-assembly-plugin diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeEventSourceCoordinator.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeEventSourceCoordinator.java index 7d25a68fa42..708a6b234fb 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeEventSourceCoordinator.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeEventSourceCoordinator.java @@ -73,7 +73,8 @@ protected void executeChangeEventSources(CdcSourceTaskContext taskContext, Snaps final PostgresPartition partition = previousOffsets.getTheOnlyPartition(); final PostgresOffsetContext previousOffset = previousOffsets.getTheOnlyOffset(); - previousLogContext.set(taskContext.configureLoggingContext("snapshot", partition)); + previousLogContext.set(taskContext.configureLoggingContext( + String.format("snapshot|%s", taskContext.getTaskId()), partition)); SnapshotResult snapshotResult = doSnapshot(snapshotSource, context, partition, previousOffset); getSignalProcessor(previousOffsets).ifPresent(s -> s.setContext(snapshotResult.getOffset())); @@ -94,7 +95,8 @@ protected void executeChangeEventSources(CdcSourceTaskContext taskContext, Snaps } } LOGGER.info("Transitioning to streaming"); - previousLogContext.set(taskContext.configureLoggingContext("streaming", partition)); + previousLogContext.set(taskContext.configureLoggingContext( + String.format("streaming|%s", taskContext.getTaskId()), partition)); streamEvents(context, partition, snapshotResult.getOffset()); } } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java index f342426dce3..7d70a05260a 100755 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java @@ -14,6 +14,7 @@ import java.util.regex.Pattern; import io.debezium.DebeziumException; +import io.debezium.connector.postgresql.snapshot.ParallelSnapshotter; import io.debezium.data.Envelope; import io.debezium.heartbeat.Heartbeat; import io.debezium.heartbeat.HeartbeatConnectionProvider; @@ -212,6 +213,11 @@ public enum SnapshotMode implements EnumeratedValue { */ INITIAL_ONLY("initial_only", (c) -> new InitialOnlySnapshotter()), + /** + * Perform a snapshot using parallel tasks. + */ + PARALLEL("parallel", (c) -> new ParallelSnapshotter()), + /** * Inject a custom snapshotter, which allows for more control over snapshots. */ @@ -983,6 +989,27 @@ public static AutoCreateMode parse(String value, String defaultValue) { public static final Field SOURCE_INFO_STRUCT_MAKER = CommonConnectorConfig.SOURCE_INFO_STRUCT_MAKER .withDefault(PostgresSourceInfoStructMaker.class.getName()); + public static final Field TASK_ID = Field.create("task.id") + .withDisplayName("ID of the connector task") + .withType(Type.INT) + .withDefault(0) + .withImportance(Importance.LOW) + .withDescription("Internal use only"); + + public static final Field PRIMARY_KEY_HASH_COLUMNS = Field.create("primary.key.hash.columns") + .withDisplayName("Comma separated primary key fields") + .withType(Type.STRING) + .withImportance(Importance.LOW) + .withDescription("A comma separated value having all the hash components of the primary key") + .withValidation((config, field, output) -> { + if (config.getString(SNAPSHOT_MODE).equalsIgnoreCase("parallel") && config.getString(field, "").isEmpty()) { + output.accept(field, "", "primary.key.hash.columns cannot be empty when snapshot.mode is 'parallel'"); + return 1; + } + + return 0; + }); + private final LogicalDecodingMessageFilter logicalDecodingMessageFilter; private final HStoreHandlingMode hStoreHandlingMode; private final IntervalHandlingMode intervalHandlingMode; @@ -1108,6 +1135,14 @@ public boolean isFlushLsnOnSource() { return flushLsnOnSource; } + public int taskId() { + return getConfig().getInteger(TASK_ID); + } + + public String primaryKeyHashColumns() { + return getConfig().getString(PRIMARY_KEY_HASH_COLUMNS); + } + @Override public byte[] getUnavailableValuePlaceholder() { String placeholder = getConfig().getString(UNAVAILABLE_VALUE_PLACEHOLDER); @@ -1181,6 +1216,7 @@ protected SourceInfoStructMaker getSourceInfoStruc SNAPSHOT_MODE, SNAPSHOT_MODE_CLASS, YB_CONSISTENT_SNAPSHOT, + PRIMARY_KEY_HASH_COLUMNS, HSTORE_HANDLING_MODE, BINARY_HANDLING_MODE, SCHEMA_NAME_ADJUSTMENT_MODE, diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java index 38d961b8b0d..99eaca22b00 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java @@ -109,7 +109,7 @@ public ChangeEventSourceCoordinator st final PostgresValueConverter valueConverter = valueConverterBuilder.build(typeRegistry); schema = new PostgresSchema(connectorConfig, defaultValueConverter, topicNamingStrategy, valueConverter); - this.taskContext = new PostgresTaskContext(connectorConfig, schema, topicNamingStrategy); + this.taskContext = new PostgresTaskContext(connectorConfig, schema, topicNamingStrategy, connectorConfig.taskId()); final Offsets previousOffsets = getPreviousOffsets( new PostgresPartition.Provider(connectorConfig, config), new PostgresOffsetContext.Loader(connectorConfig)); final Clock clock = Clock.system(); diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresPartition.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresPartition.java index 4e28d658bd8..d1ee3609de1 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresPartition.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresPartition.java @@ -21,15 +21,17 @@ public class PostgresPartition extends AbstractPartition implements Partition { private static final String SERVER_PARTITION_KEY = "server"; private final String serverName; + private final int taskId; - public PostgresPartition(String serverName, String databaseName) { + public PostgresPartition(String serverName, String databaseName, int taskId) { super(databaseName); this.serverName = serverName; + this.taskId = taskId; } @Override public Map getSourcePartition() { - return Collect.hashMapOf(SERVER_PARTITION_KEY, serverName); + return Collect.hashMapOf(SERVER_PARTITION_KEY, getPartitionIdentificationKey()); } @Override @@ -54,6 +56,10 @@ public String toString() { return "PostgresPartition [sourcePartition=" + getSourcePartition() + "]"; } + public String getPartitionIdentificationKey() { + return String.format("%s_%d", serverName, taskId); + } + static class Provider implements Partition.Provider { private final PostgresConnectorConfig connectorConfig; private final Configuration taskConfig; @@ -66,7 +72,8 @@ static class Provider implements Partition.Provider { @Override public Set getPartitions() { return Collections.singleton(new PostgresPartition( - connectorConfig.getLogicalName(), taskConfig.getString(DATABASE_NAME.name()))); + connectorConfig.getLogicalName(), taskConfig.getString(DATABASE_NAME.name()), + connectorConfig.taskId())); } } } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresTaskContext.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresTaskContext.java index f9b1f711202..721c408681d 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresTaskContext.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresTaskContext.java @@ -52,6 +52,18 @@ protected PostgresTaskContext(PostgresConnectorConfig config, PostgresSchema sch this.schema = schema; } + protected PostgresTaskContext(PostgresConnectorConfig config, PostgresSchema schema, TopicNamingStrategy topicNamingStrategy, int taskId) { + super(config.getContextName(), config.getLogicalName(), String.valueOf(taskId), config.getCustomMetricTags(), Collections::emptySet); + + this.config = config; + if (config.xminFetchInterval().toMillis() > 0) { + this.refreshXmin = ElapsedTimeStrategy.constant(Clock.SYSTEM, config.xminFetchInterval().toMillis()); + } + this.topicNamingStrategy = topicNamingStrategy; + assert schema != null; + this.schema = schema; + } + protected TopicNamingStrategy topicNamingStrategy() { return topicNamingStrategy; } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java index 335d73bfb53..2491e98d8c0 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java @@ -7,6 +7,7 @@ package io.debezium.connector.postgresql; import java.sql.SQLException; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -61,10 +62,81 @@ public void start(Map props) { @Override public List> taskConfigs(int maxTasks) { + if (props == null) { + return Collections.emptyList(); + } + + if (props.containsKey(PostgresConnectorConfig.SNAPSHOT_MODE.name()) + && props.get(PostgresConnectorConfig.SNAPSHOT_MODE.name()) + .equalsIgnoreCase(PostgresConnectorConfig.SnapshotMode.PARALLEL.getValue())) { + LOGGER.info("Initialising parallel snapshot consumption"); + + final String tableIncludeList = props.get(PostgresConnectorConfig.TABLE_INCLUDE_LIST.name()); + // Perform basic validations. + validateSingleTableProvidedForParallelSnapshot(tableIncludeList); + + // Publication auto create mode should not be for all tables. + if (props.containsKey(PostgresConnectorConfig.PUBLICATION_AUTOCREATE_MODE.name()) + && props.get(PostgresConnectorConfig.PUBLICATION_AUTOCREATE_MODE.name()) + .equalsIgnoreCase(PostgresConnectorConfig.AutoCreateMode.ALL_TABLES.getValue())) { + throw new DebeziumException("Snapshot mode parallel is not supported with publication.autocreate.mode all_tables, " + + "use publication.autocreate.mode=filtered"); + } + + // Add configuration for select override. + props.put(PostgresConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE.name(), tableIncludeList); + + return getConfigForParallelSnapshotConsumption(maxTasks); + } + + // YB Note: Only applicable when snapshot mode is not parallel. // this will always have just one task with the given list of properties return props == null ? Collections.emptyList() : Collections.singletonList(new HashMap<>(props)); } + protected void validateSingleTableProvidedForParallelSnapshot(String tableIncludeList) throws DebeziumException { + if (tableIncludeList == null) { + throw new DebeziumException("No table provided, provide a table in the table.include.list"); + } else if (tableIncludeList.contains(",")) { + // This might indicate the presence of multiple tables in the include list, we do not want that. + throw new DebeziumException("parallel snapshot consumption is only supported with one table at a time"); + } + } + + protected List> getConfigForParallelSnapshotConsumption(int maxTasks) { + List> taskConfigs = new ArrayList<>(); + + final long upperBoundExclusive = 64 * 1024; + final long rangeSize = upperBoundExclusive / maxTasks; + + for (int i = 0; i < maxTasks; ++i) { + Map taskProps = new HashMap<>(this.props); + + taskProps.put(PostgresConnectorConfig.TASK_ID.name(), String.valueOf(i)); + + long lowerBound = i * rangeSize; + long upperBound = (i == maxTasks - 1) ? upperBoundExclusive - 1 : (lowerBound + rangeSize - 1); + + LOGGER.info("Using query for task {}: {}", i, getQueryForParallelSnapshotSelect(lowerBound, upperBound)); + + taskProps.put( + PostgresConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE.name() + "." + taskProps.get(PostgresConnectorConfig.TABLE_INCLUDE_LIST.name()), + getQueryForParallelSnapshotSelect(lowerBound, upperBound) + ); + + taskConfigs.add(taskProps); + } + + return taskConfigs; + } + + protected String getQueryForParallelSnapshotSelect(long lowerBound, long upperBound) { + return String.format("SELECT * FROM %s WHERE yb_hash_code(%s) >= %d AND yb_hash_code(%s) <= %d", + props.get(PostgresConnectorConfig.TABLE_INCLUDE_LIST.name()), + props.get(PostgresConnectorConfig.PRIMARY_KEY_HASH_COLUMNS.name()), lowerBound, + props.get(PostgresConnectorConfig.PRIMARY_KEY_HASH_COLUMNS.name()), upperBound); + } + @Override public void stop() { this.props = null; diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/ParallelSnapshotter.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/ParallelSnapshotter.java new file mode 100644 index 00000000000..e4fdabf23a4 --- /dev/null +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/ParallelSnapshotter.java @@ -0,0 +1,46 @@ +package io.debezium.connector.postgresql.snapshot; + +import io.debezium.connector.postgresql.PostgresConnectorConfig; +import io.debezium.connector.postgresql.spi.OffsetState; +import io.debezium.connector.postgresql.spi.SlotState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Snapshotter class to take snapshot using parallel tasks. + * + * @author Vaibhav Kushwaha (vkushwaha@yugabyte.com) + */ +public class ParallelSnapshotter extends QueryingSnapshotter { + private final static Logger LOGGER = LoggerFactory.getLogger(ParallelSnapshotter.class); + private OffsetState sourceInfo; + + @Override + public void init(PostgresConnectorConfig config, OffsetState sourceInfo, SlotState slotState) { + super.init(config, sourceInfo, slotState); + this.sourceInfo = sourceInfo; + + LOGGER.info("Initialised ParallelSnapshotter for task {}", config.taskId()); + } + + @Override + public boolean shouldStream() { + return false; + } + + @Override + public boolean shouldSnapshot() { + if (sourceInfo == null) { + LOGGER.info("Taking parallel snapshot for new datasource"); + return true; + } + else if (sourceInfo.snapshotInEffect()) { + LOGGER.info("Found previous incomplete snapshot"); + return true; + } + else { + LOGGER.info("Previous snapshot completed, no snapshot will be performed"); + return false; + } + } +} diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java index 7361f8a7c53..e60d9d7205f 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java @@ -1123,6 +1123,62 @@ public void shouldHaveBeforeImageOfUpdatedRow() throws InterruptedException { assertThat(updateRecordValue.getStruct(Envelope.FieldName.AFTER).getStruct("aa").getInt32("value")).isEqualTo(404); } + @Test + public void shouldFailIfNoPrimaryKeyHashColumnSpecifiedWithSnapshotModeParallel() throws Exception { + Configuration.Builder configBuilder = TestHelper.defaultConfig() + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.PARALLEL.getValue()) + .with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.test") + .with(PostgresConnectorConfig.PRIMARY_KEY_HASH_COLUMNS, ""); + + start(YugabyteDBConnector.class, configBuilder.build(), (success, message, error) -> { + assertFalse(success); + assertThat(message.contains("primary.key.hash.columns cannot be empty when snapshot.mode is 'parallel'")).isTrue(); + }); + } + + @Test + public void shouldFailIfParallelSnapshotRunWithMultipleTables() throws Exception { + Configuration.Builder configBuilder = TestHelper.defaultConfig() + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.PARALLEL.getValue()) + .with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.test,public.test2") + .with(PostgresConnectorConfig.PRIMARY_KEY_HASH_COLUMNS, "id"); + + start(YugabyteDBConnector.class, configBuilder.build(), (success, message, error) -> { + assertFalse(success); + + assertThat(error.getMessage().contains("parallel snapshot consumption is only supported with one table at a time")).isTrue(); + }); + } + + @Test + public void shouldFailWithSnapshotModeParallelIfNoTableIncludeListProvided() throws Exception { + Configuration.Builder configBuilder = TestHelper.defaultConfig() + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.PARALLEL.getValue()) + .with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "") + .with(PostgresConnectorConfig.PRIMARY_KEY_HASH_COLUMNS, "id"); + + start(YugabyteDBConnector.class, configBuilder.build(), (success, message, error) -> { + assertFalse(success); + + assertThat(error.getMessage().contains("No table provided, provide a table in the table.include.list")).isTrue(); + }); + } + + @Test + public void shouldFailIfSnapshotModeParallelHasPublicationAutoCreateModeAllTables() throws Exception { + Configuration.Builder configBuilder = TestHelper.defaultConfig() + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.PARALLEL.getValue()) + .with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.test") + .with(PostgresConnectorConfig.PUBLICATION_AUTOCREATE_MODE, PostgresConnectorConfig.AutoCreateMode.ALL_TABLES) + .with(PostgresConnectorConfig.PRIMARY_KEY_HASH_COLUMNS, "id");; + + start(YugabyteDBConnector.class, configBuilder.build(), (success, message, error) -> { + assertFalse(success); + + assertThat(error.getMessage().contains("Snapshot mode parallel is not supported with publication.autocreate.mode all_tables")).isTrue(); + }); + } + @Test public void shouldResumeSnapshotIfFailingMidstream() throws Exception { // insert another set of rows so we can stop at certain point diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresPartitionTest.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresPartitionTest.java index 1a12573ab91..201f32792ef 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresPartitionTest.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresPartitionTest.java @@ -11,11 +11,11 @@ public class PostgresPartitionTest extends AbstractPartitionTest