-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DBZ-PGYB][yugabyte/yuyabyte-db#24204] Changes to support LSN types with replication slot #162
base: ybdb-debezium-2.5.2
Are you sure you want to change the base?
Conversation
...tgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java
Outdated
Show resolved
Hide resolved
...tgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java
Outdated
Show resolved
Hide resolved
...tgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java
Outdated
Show resolved
Hide resolved
...tgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java
Show resolved
Hide resolved
...tgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java
Show resolved
Hide resolved
...tgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java
Show resolved
Hide resolved
...tgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java
Show resolved
Hide resolved
resumeLsn.set(walPosition.resumeFromLsn(lsn, message).orElse(null)); | ||
|
||
if (resumeLsn.get() == null) { | ||
LOGGER.info("Resume LSN is null"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when is this possible? Please out comment
|
||
if (this.connectorConfig.slotLsnType().isHybridTime()) { | ||
if (message.getOperation() == Operation.COMMIT) { | ||
LOGGER.info("Adding '{}' as lsn to the commit times queue", Lsn.valueOf(lsn.asLong() - 1)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Info?
@@ -463,18 +518,30 @@ public void commitOffset(Map<String, ?> partition, Map<String, ?> offset) { | |||
final Lsn changeLsn = Lsn.valueOf((Long) offset.get(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY)); | |||
final Lsn lsn = (commitLsn != null) ? commitLsn : changeLsn; | |||
|
|||
LOGGER.debug("Received offset commit request on commit LSN '{}' and change LSN '{}'", commitLsn, changeLsn); | |||
LOGGER.info("Received offset commit request on commit LSN '{}' and change LSN '{}'", commitLsn, changeLsn); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All these info is for debugging now otherwise it will create lots of logs
...tgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java
Show resolved
Hide resolved
@suranjan There are certain logs that I have kept for the debugging runs, I will ensure to remove/convert them back to DEBUG before merging this. |
YugabyteDB logical replication now supports creating replication slots with two types of LSN:
SEQUENCE
- This is a monotonic increasing number that will determine the record in global order within the context of a slot. However, this LSN can’t be compared across two LSN’s of different slots.HYBRID_TIME
- This will mean that the LSN will be denoted by theHybridTime
of the transaction commit record. All the records of the transaction that is streamed will have the same LSN as that of the commit record. The user has to ensure that the changes of a transaction are applied in totality and the acknowledgement is sent only if the commit record of a transaction is processed. With this mode, the LSN value can be compared across the different slots.To ensure that the connector also supports streaming for both LSN types, this PR introduces the following changes:
slot.lsn.type
which accepts two parameters i.e.SEQUENCE
orHYBRID_TIME
with the default beingSEQUENCE
a. Note that this property only accepts parameters in uppercase.
This closes yugabyte/yuyabyte-db#24204