Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Postgres database replication #933

Closed
jorritsandbrink opened this issue Feb 5, 2024 · 13 comments
Closed

Postgres database replication #933

jorritsandbrink opened this issue Feb 5, 2024 · 13 comments
Assignees
Labels
enhancement New feature or request support This issue is monitored by Solution Engineer

Comments

@jorritsandbrink
Copy link
Collaborator

jorritsandbrink commented Feb 5, 2024

Feature description

dlt should support Postgres database replication. This should be efficiently done, using a CDC-based approach leveraging the transaction log (WAL)—not by querying the actual data directly.

Implementation should be as generic as possible, so database replication support for other databases (e.g. SQL Server) can easily be added later.

Are you a dlt user?

None

Use case

Multiple users have requested this in the Slack community, e.g. here.

Proposed solution

  • Use Postgres' logical decoding functionality
  • Do batch CDC, not streaming CDC
    • Start Python process, process all new changes since last time, terminate process.
    • Not an always-running Python process that streams changes into the destination as they occur in the source.
  • State is maintained in Postgres (on the replication slot), not by dlt
  • Use psycopg2's support for replication, which dlt already uses for postgres connections
  • Use pgoutput, so users do not have to install an output plugin such as wal2json
  • Use this message decoder to decode binary messages (or at least use it as an example)
    • I tried this and it works. I also tried pypgoutput but that seems to only work on Linux because of this issue.
  • Create a Postgres CDC resource that yields change events from decoded messages (see below)
  • Initial load:
    • In the first run, we create a replication slot
    • When creating the replication slot, a snapshot gets exported
    • Using that snapshot, we select the entire table and do a full load
    • In subsequent runs we read changes from the replication slot and apply them incrementally
    • Using this approach, the transition from initial load to incremental processing is synchronized an no changes are missed

The above is Postgres-specific logic that needs to be integrated into new generic functionality for database replication / CDC processing that will be implemented in another ticket (@rudolfix will create). I think we need something along the lines of:

  • An abstraction for "change events"—each change event has at least:
    • An operation type: insert, update, or delete
    • A log sequence number (LSN)
    • Column names, types, and values for insert and update operations
    • Primary key values for delete operations
  • An abstraction for "CDC resources"— a special type of resource that yields change events
  • To extend the current logic for the merge write disposition to handle deletes and partial updates
  • Do we need a new write dispostion or can we use merge and recognize we're handling a CDC resource instead of a regular resource?
  • Do we fit this into our current sql_database source and sql_table resource somehow? Postgres CDC doesn't really fit there very naturally because it doesn't use SQLAlchemy and it doesn't persist state for incremental processing.

Related issues

No response

@rudolfix
Copy link
Collaborator

rudolfix commented Feb 5, 2024

@jorritsandbrink a few comments from my side:

  1. this ticket may go to verified sources sql_database at some point - we have code there that does full and incremental table replication and reflects the database tables with sql alchemy
  2. looking at singer tap (https://github.com/singer-io/tap-postgres/blob/master/tap_postgres/sync_strategies/logical_replication.py) - I wonder if we can use duckdb or pyarrow to consume WAL file (is it really always json?) and avoid row by row processing in Python
  3. you'll need to update the core lib ticket incoming :)

@rudolfix rudolfix moved this from Todo to In Progress in dlt core library Feb 6, 2024
@rudolfix
Copy link
Collaborator

rudolfix commented Feb 7, 2024

Overall it looks good!

  1. How do you initialize replication? I think we need to perform a full load first and then start replication from a slot. People do not keep WALs from the moment database was created... If we do not have correct initial state, we'll be updating and deleting non existing records
  2. Super cool we can support pgout in Python! One remark: wal2json will be way faster because the json decoding happens on the server. so maybe can also support it. Could be an addon after implementing a first version
  3. We need to be perfect with data types. A new column can be added to a table during the replication. You can emit a column definition with the data (add mark function to emit resource hints from decorated function #938), You'll need to write a mapper from postgres types into dlt types (or use sql alchemy for that)
  4. above: we'll need to use decimals, datetimes, time etc. python types if we want to emit json messages

engine upgrade:

  1. I think adding a separate write disposition for the replication makes sense
  2. I think primary key / unique must always be present for the replication to be set up, right
  3. A special column with the update type is the way to go, You can hardcode a special column name or you could add another column hint (ie "update_type") to recognize the field with this type
  4. I think we can use MERGE sql command to easily execute updates for most of the destinations
  5. my take is to create separate resource for postgres replication in sql_database.

@rudolfix
Copy link
Collaborator

rudolfix commented Feb 7, 2024

One more note regarding the replication state management.
If you rely on the slot to manage replication state, then if we extract the batch and loading of the batch fails and we retry - we are losing data.
For our stream resources (kinesis / kafka) we take a batch of messages, save the offset and set messages as consumed from that offset on the next extract run. this makes sure that the data was loaded or in case of error we can survive a wipeout of local runner instance.

is similar thing possible? probably we'll need to keep LSN in the state but can we tell postgres to consume slot on the next run?

@jorritsandbrink
Copy link
Collaborator Author

psycopg2 has a send_feedback(flush_lsn=<max_lsn_in_batch>) function we can use to flush the state on the replication slot. I see two ways:

  1. Flush the state at the end of a run, after dlt has succesfully processed a batch—does not require persisted state in dlt
  2. Flush the state at the beginning of the next run, similar to Kinesis/Kafka—requires persisted state in dlt

@rudolfix which has your preference?

@rudolfix
Copy link
Collaborator

rudolfix commented Feb 7, 2024

I prefer (2) as a default. and we can implement a helper function that does (1). it can take instance of the dlt resource. and each resource exposes its state so we can flush after run (ie. when your destination does not support state sync or you disable it)

@jorritsandbrink
Copy link
Collaborator Author

@rudolfix
Copy link
Collaborator

rudolfix commented Feb 8, 2024

Added some notes on how the initial load will be handled.

this snapshot thing is neat

@rudolfix
Copy link
Collaborator

@jorritsandbrink we've got a requirement from one of the users to be able to easily select columns for the replication. the reason is to be able to unselect sensitive data

that may be implemented in source/resource itself or by providing a map transform (add_map) that drops selected columns and demonstrating that

@codingcyclist
Copy link
Collaborator

Hey :) I see there is a lot of amazing stuff in the making here 🚀 . I think I'm the mysterious "user" that @rudolfix mentioned in his previous comment ;) and I'm really excited that we at Taktile will be among the first to try out DB replication via DLT.

As a follow-up on a conversation that I had with @matthauskrzykowski on Monday and also to tie up some loose ends from a DM thread that I had with Marcin, I briefly wanted to comment here and summarize the requirements from our point of view. As far as I can tell from the conversation above, all of them are are already covered (I still left one open question at the end)

  • Supported DB engine(s): Postgres (Aurora + “plain” AWS RDS)
  • Replication mode: “Batch-CDC”
    • WAL gets consumed in batches, not streamed in real time
    • WAL gets flushed after every successful pipeline run.
    • WAL does not get flushed for failed runs
  • Schema evolution: To prevent leaking sensitive information into the destination, we need control over the columns that get synced to the destination (e.g. via column-ignore schema contracts).
  • Output encoding: either pgoutput or wal2json (both would work for us)
  • Auth-mechanism: username + password
  • Handling of deletions: we’d like to handle deletions from the source as “soft deletes” in the destination (i.e. keep the row, but mark it as “is_deleted”)

Open question on schema evolution: Would we be able to "back-fill/re-sync" columns that were part of the initial source table, but not part of the initial schema contract?
image

@jorritsandbrink
Copy link
Collaborator Author

Thanks for neatly listing your requirements @codingcyclist!

Most of your points are indeed within the scope of this issue. I just wanted to respond to two of your points:

Schema evolution: To prevent leaking sensitive information into the destination, we need control over the columns that get synced to the destination (e.g. via column-ignore schema contracts).

Do you want to be able to provide a list of columns that should be ignored, while allowing columns that are not in the list to be added as the schema evolves? I don't think that's supported yet with schema contracts or any other dlt functionality.

Handling of deletions: we’d like to handle deletions from the source as “soft deletes” in the destination (i.e. keep the row, but mark it as “is_deleted”)

We're currently adding support for hard deletes in this PR. The next step we had in mind is soft deletes, which would be a natural follow-up to that PR.

I would say we use separate issues for the column ignoring and soft deletes features.

I'll leave your open question regarding backfilling to @rudolfix.

@rudolfix
Copy link
Collaborator

re. selecting columns: my idea was to ignore certain columns already in CDC resource. the other option is to filter out values with add_map transform (we can provide a helper). in the latter case empty column in the database will be still created (if the resource emits schema columns). I think we could also use pydantic models to filter out unknown columns.

re. backfill - not sure what do you mean here @codingcyclist . a situation where certain columns were ignored and then we un-ignore them after initial (and possibly subsequent) sync already happened? If so we'd need to implement partial updates (or do a full re-sync which could be painful)

my take on next steps

@codingcyclist
Copy link
Collaborator

codingcyclist commented Feb 22, 2024

re. selecting columns: I think we have a small misunderstanding here. What we'd need is a mechanism to explicitly define a table schema and let only the columns on the schema be synced into the destination. All other columns should be ignored. If I'm not mistaken, this is already possible with the current version of schema contracts, right?

  • table: discard_row (i.e. don't auto-sync newly added tables)
  • column: discard_value (i.e. don't auto-sync newly added columns)
  • data_type: freeze (i.e. raise an exception instead of auto-creating variant columns for changing data types)

re. backfill: Exactly, this is about back-filling columns that were initially ignored when they get "un-ignored" at a later point in time. But it's more of a nice-to-have. let's not worry too much about it for the time being. Soft deletes would be much more important to have, as they are crucial to ensure referential integrity between our entity- and our event tables. Here's an example: a decision flow gets deleted, but we still need to keep a record for that decision flow in the destination so that we can map it to the historic decisions that were executed on that decision flow. From a first glance, it looks like this would be covered by the SCD2 PR that you linked above

@jorritsandbrink
Copy link
Collaborator Author

@codingcyclist

re. selecting columns: I had a little chat with @rudolfix about this on Slack. Schema contracts are still experimental and likely to change significantly. Hence, not a good idea to rely on this for now. Also, it's more efficient if we can do the filtering further upstream. My idea is to include an argument on the replication resource that let's you provide a column list, which is then used to filter already in the resource.

@VioletM VioletM added the support This issue is monitored by Solution Engineer label Feb 26, 2024
@rudolfix rudolfix moved this from In Progress to Done in dlt core library May 6, 2024
@rudolfix rudolfix closed this as completed May 6, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request support This issue is monitored by Solution Engineer
Projects
Status: Done
Development

No branches or pull requests

4 participants