diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java index fefcaf1a5c8..aa631cbcc1e 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java @@ -91,6 +91,7 @@ public class PostgresStreamingChangeEventSource implements StreamingChangeEventS * For DEBUGGING */ private OptionalLong lastTxnidForWhichCommitSeen = OptionalLong.empty(); + private long recordCount = 0; public PostgresStreamingChangeEventSource(PostgresConnectorConfig connectorConfig, Snapshotter snapshotter, PostgresConnection connection, PostgresEventDispatcher dispatcher, ErrorHandler errorHandler, Clock clock, @@ -297,6 +298,8 @@ private void processReplicationMessages(PostgresPartition partition, PostgresOff LOGGER.debug("Processing BEGIN with end LSN {} and txnid {}", lsn, message.getTransactionId()); } else { LOGGER.debug("Processing COMMIT with end LSN {} and txnid {}", lsn, message.getTransactionId()); + LOGGER.debug("Record count in the txn {} is {}", message.getTransactionId(), recordCount); + recordCount = 0; } OptionalLong currentTxnid = message.getTransactionId(); @@ -352,6 +355,7 @@ else if (message.getOperation() == Operation.MESSAGE) { // DML event else { LOGGER.trace("Processing DML event with lsn {} and lastCompletelyProcessedLsn {}", lsn, lastCompletelyProcessedLsn); + ++recordCount; TableId tableId = null; if (message.getOperation() != Operation.NOOP) {