diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresPartition.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresPartition.java index 4e28d658bd8..d1ee3609de1 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresPartition.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresPartition.java @@ -21,15 +21,17 @@ public class PostgresPartition extends AbstractPartition implements Partition { private static final String SERVER_PARTITION_KEY = "server"; private final String serverName; + private final int taskId; - public PostgresPartition(String serverName, String databaseName) { + public PostgresPartition(String serverName, String databaseName, int taskId) { super(databaseName); this.serverName = serverName; + this.taskId = taskId; } @Override public Map getSourcePartition() { - return Collect.hashMapOf(SERVER_PARTITION_KEY, serverName); + return Collect.hashMapOf(SERVER_PARTITION_KEY, getPartitionIdentificationKey()); } @Override @@ -54,6 +56,10 @@ public String toString() { return "PostgresPartition [sourcePartition=" + getSourcePartition() + "]"; } + public String getPartitionIdentificationKey() { + return String.format("%s_%d", serverName, taskId); + } + static class Provider implements Partition.Provider { private final PostgresConnectorConfig connectorConfig; private final Configuration taskConfig; @@ -66,7 +72,8 @@ static class Provider implements Partition.Provider { @Override public Set getPartitions() { return Collections.singleton(new PostgresPartition( - connectorConfig.getLogicalName(), taskConfig.getString(DATABASE_NAME.name()))); + connectorConfig.getLogicalName(), taskConfig.getString(DATABASE_NAME.name()), + connectorConfig.taskId())); } } } diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresPartitionTest.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresPartitionTest.java index 1a12573ab91..201f32792ef 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresPartitionTest.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresPartitionTest.java @@ -11,11 +11,11 @@ public class PostgresPartitionTest extends AbstractPartitionTest