Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Example: fast postgres to postgres #1428

Merged
merged 25 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
fd2d809
add example
sspaeti May 14, 2024
da06e9c
Merge pull request #1354 from sspaeti/speed-improvement-duckdb-initia…
AstrakhantsevaAA May 21, 2024
478f0f3
move postgres to postgres example to separate folder, change credenti…
AstrakhantsevaAA May 22, 2024
f456f19
deleted unused functions, uncomment norm and loading
AstrakhantsevaAA May 31, 2024
2f309bb
fix bugs, reformat, rename vars
AstrakhantsevaAA May 31, 2024
73e2631
delete old file, fix typing
AstrakhantsevaAA May 31, 2024
825f4d9
make title shorter
AstrakhantsevaAA May 31, 2024
9485fcf
fix type for load_type
AstrakhantsevaAA May 31, 2024
4d216d4
ignore load_type type
AstrakhantsevaAA May 31, 2024
db95490
add tests
AstrakhantsevaAA Jun 3, 2024
7f2c395
fix typing
AstrakhantsevaAA Jun 3, 2024
44ed251
fix dateime
AstrakhantsevaAA Jun 3, 2024
21b391d
add deps
AstrakhantsevaAA Jun 3, 2024
57c9801
update lock file
AstrakhantsevaAA Jun 3, 2024
8c4c666
Merge remote-tracking branch 'origin/devel' into example/fast_postgres
AstrakhantsevaAA Jun 3, 2024
17a1909
merge and update lock file
AstrakhantsevaAA Jun 3, 2024
df571f2
install duckdb extensions
AstrakhantsevaAA Jun 3, 2024
aaf34fc
fix install duckdb extensions
AstrakhantsevaAA Jun 3, 2024
07de252
fix install duckdb extensions
AstrakhantsevaAA Jun 3, 2024
c0faf7f
fix install duckdb extensions
AstrakhantsevaAA Jun 3, 2024
747c26a
fix version of duckdb for extensions
AstrakhantsevaAA Jun 3, 2024
d8acdb4
fix path to duckdb dump
AstrakhantsevaAA Jun 3, 2024
dcc7901
update duckdb
AstrakhantsevaAA Jun 3, 2024
f9e51d8
update duckdb in makefile
AstrakhantsevaAA Jun 3, 2024
88aff7c
delete manual duckdb extension installation
AstrakhantsevaAA Jun 5, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ lint-and-test-examples:
poetry run mypy --config-file mypy.ini docs/examples
poetry run flake8 --max-line-length=200 docs/examples
cd docs/tools && poetry run python prepare_examples_tests.py
# install duckdb extension for postgres_to_postgres example
AstrakhantsevaAA marked this conversation as resolved.
Show resolved Hide resolved
cd docs/examples/postgres_to_postgres && wget http://extensions.duckdb.org/v0.10.3/linux_amd64_gcc4/postgres_scanner.duckdb_extension.gz && gunzip ./postgres_scanner.duckdb_extension.gz
cd docs/examples/postgres_to_postgres && poetry run python ./install_duckdb_extensions.py
# run tests
cd docs/examples && poetry run pytest


Expand Down
15 changes: 15 additions & 0 deletions docs/examples/postgres_to_postgres/.dlt/example.secrets.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[destination.postgres.credentials]
host = ""
database = ""
username = ""
password = ""
port = ""
connection_timeout = 15

[sources.postgres.credentials]
host = ""
database = ""
username = ""
password = ""
port = ""
chunk_size = 1000000
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import duckdb

conn = duckdb.connect()
conn.sql('from duckdb_extensions()').show() # checking if extensions are loaded
conn.install_extension("./postgres_scanner.duckdb_extension") # this is where I copied it manually
conn.sql("LOAD postgres;")
conn.sql('from duckdb_extensions()').show() # checking if extensions are loaded
292 changes: 292 additions & 0 deletions docs/examples/postgres_to_postgres/postgres_to_postgres.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,292 @@
"""
---
title: Load from Postgres to Postgres faster
description: Load data fast from Postgres to Postgres with ConnectorX & Arrow export as Parquet, normalizing and exporting as DuckDB, and attaching it to Postgres for bigger Postgres tables (GBs)
keywords: [connector x, pyarrow, zero copy, duckdb, postgres, initial load]
---

This examples shows you how to export and import data from Postgres to Postgres in a fast way with ConnectorX and DuckDB
AstrakhantsevaAA marked this conversation as resolved.
Show resolved Hide resolved
since the default export will generate `Insert_statement` during the normalization phase, which is super slow for large tables.

As it's an initial load, we create a separate schema with timestamp initially and then replace the existing schema with the new one.

:::note
This approach is tested and works well for an initial load (`--replace`), however, the incremental load (`--merge`) might need some adjustments (loading of load-tables of dlt, setting up first run after an initial
load, etc.).
:::

We'll learn:

- How to get arrow tables from [connector X](https://github.com/sfu-db/connector-x) and yield them in chunks.
- That merge and incremental loads work with arrow tables.
- How to use DuckDB for a speedy normalization.
- How to use `argparse` to turn your pipeline script into a CLI.
- How to work with `ConnectionStringCredentials` spec.


Be aware that you need to define the database credentials in `.dlt/secrets.toml` or dlt ENVs and adjust the tables names ("table_1" and "table_2").

Install `dlt` with `duckdb` as extra, also `connectorx`, Postgres adapter and progress bar tool:

```sh
pip install dlt[duckdb] connectorx pyarrow psycopg2-binary alive-progress
```

Run the example:
```sh
python postgres_to_postgres.py --replace
```

:::warn
Attention: There were problems with data type TIME that includes nano seconds. More details in
[Slack](https://dlthub-community.slack.com/archives/C04DQA7JJN6/p1711579390028279?thread_ts=1711477727.553279&cid=C04DQA7JJN60)

As well as with installing DuckDB extension (see [issue
here](https://github.com/duckdb/duckdb/issues/8035#issuecomment-2020803032)), that's why I manually installed the `postgres_scanner.duckdb_extension` in my Dockerfile to load the data into Postgres.
:::
"""

import argparse
import os
from dlt.common import pendulum
from typing import List

import connectorx as cx
import duckdb
import psycopg2

import dlt
from dlt.sources.credentials import ConnectionStringCredentials

CHUNKSIZE = int(
os.getenv("CHUNKSIZE", 1000000)
) # 1 mio rows works well with 1GiB RAM memory (if no parallelism)


def read_sql_x_chunked(conn_str: str, query: str, chunk_size: int = CHUNKSIZE):
offset = 0
while True:
chunk_query = f"{query} LIMIT {chunk_size} OFFSET {offset}"
data_chunk = cx.read_sql(
conn_str,
chunk_query,
return_type="arrow2",
protocol="binary",
)
yield data_chunk
if data_chunk.num_rows < chunk_size:
break # No more data to read
offset += chunk_size


@dlt.source(max_table_nesting=0)
def pg_resource_chunked(
table_name: str,
primary_key: List[str],
schema_name: str,
order_date: str,
load_type: str = "merge",
columns: str = "*",
credentials: ConnectionStringCredentials = dlt.secrets[
"sources.postgres.credentials"
],
):
print(
f"dlt.resource write_disposition: `{load_type}` -- ",
f"connection string: postgresql://{credentials.username}:*****@{credentials.host}:{credentials.host}/{credentials.database}",
)

query = f"SELECT {columns} FROM {schema_name}.{table_name} ORDER BY {order_date}" # Needed to have an idempotent query

source = dlt.resource( # type: ignore
name=table_name,
table_name=table_name,
write_disposition=load_type, # use `replace` for initial load, `merge` for incremental
primary_key=primary_key,
standalone=True,
parallelized=True,
)(read_sql_x_chunked)(
credentials.to_native_representation(), # Pass the connection string directly
query,
)

if load_type == "merge":
# Retrieve the last value processed for incremental loading
source.apply_hints(incremental=dlt.sources.incremental(order_date))

return source


def table_desc(table_name, pk, schema_name, order_date, columns="*"):
return {
"table_name": table_name,
"pk": pk,
"schema_name": schema_name,
"order_date": order_date,
"columns": columns,
}


if __name__ == "__main__":
# Input Handling
parser = argparse.ArgumentParser(
description="Run specific functions in the script."
)
parser.add_argument("--replace", action="store_true", help="Run initial load")
parser.add_argument("--merge", action="store_true", help="Run delta load")
args = parser.parse_args()

source_schema_name = "example_data_1"
target_schema_name = "example_data_2"
pipeline_name = "loading_postgres_to_postgres"

tables = [
table_desc("table_1", ["pk"], source_schema_name, "updated_at"),
table_desc("table_2", ["pk"], source_schema_name, "updated_at"),
]

# default is initial loading (replace)
load_type = "merge" if args.merge else "replace"
print(f"LOAD-TYPE: {load_type}")

resources = []
for table in tables:
resources.append(
pg_resource_chunked(
table["table_name"],
table["pk"],
table["schema_name"],
table["order_date"],
load_type=load_type,
columns=table["columns"],
)
)

if load_type == "replace":
pipeline = dlt.pipeline(
pipeline_name=pipeline_name,
destination="duckdb",
dataset_name=target_schema_name,
full_refresh=True,
progress="alive_progress",
)
else:
pipeline = dlt.pipeline(
pipeline_name=pipeline_name,
destination="postgres",
dataset_name=target_schema_name,
full_refresh=False,
) # full_refresh=False

# start timer
startTime = pendulum.now()

# 1. extract
print("##################################### START EXTRACT ########")
pipeline.extract(resources)
print(f"--Time elapsed: {pendulum.now() - startTime}")

# 2. normalize
print("##################################### START NORMALIZATION ########")
if load_type == "replace":
info = pipeline.normalize(
workers=2, loader_file_format="parquet"
) # https://dlthub.com/docs/blog/dlt-arrow-loading
else:
info = pipeline.normalize()

print(info)
print(pipeline.last_trace.last_normalize_info)
print(f"--Time elapsed: {pendulum.now() - startTime}")

# 3. load
print("##################################### START LOAD ########")
load_info = pipeline.load()
print(load_info)
print(f"--Time elapsed: {pendulum.now() - startTime}")

# check that stuff was loaded
row_counts = pipeline.last_trace.last_normalize_info.row_counts
assert row_counts["table_1"] == 9
assert row_counts["table_2"] == 9

# make sure nothing failed
load_info.raise_on_failed_jobs()

if load_type == "replace":
# 4. Load DuckDB local database into Postgres
print("##################################### START DUCKDB LOAD ########")
# connect to local duckdb dump
conn = duckdb.connect(f"{load_info.destination_displayable_credentials}".split(":///")[1])
conn.install_extension(
"./postgres_scanner.duckdb_extension"
) # duckdb_extension is downloaded/installed manually here, only needed if `LOAD/INSTALL postgres` throws an error
conn.sql("LOAD postgres;")
# select generated timestamp schema
timestamped_schema = conn.sql(
f"""select distinct table_schema from information_schema.tables
where table_schema like '{target_schema_name}%'
and table_schema NOT LIKE '%_staging'
order by table_schema desc"""
).fetchone()[0]
print(f"timestamped_schema: {timestamped_schema}")

target_credentials = ConnectionStringCredentials(dlt.secrets["destination.postgres.credentials"])
# connect to destination (timestamped schema)
conn.sql(
f"ATTACH 'dbname={target_credentials.database} user={target_credentials.username} password={target_credentials.password} host={target_credentials.host} port={target_credentials.port}' AS pg_db (TYPE postgres);"
)
conn.sql(f"CREATE SCHEMA IF NOT EXISTS pg_db.{timestamped_schema};")

for table in tables:
print(
f"LOAD DuckDB -> Postgres: table: {timestamped_schema}.{table['table_name']} TO Postgres {timestamped_schema}.{table['table_name']}"
)

conn.sql(
f"CREATE OR REPLACE TABLE pg_db.{timestamped_schema}.{table['table_name']} AS SELECT * FROM {timestamped_schema}.{table['table_name']};"
)
conn.sql(
f"SELECT count(*) as count FROM pg_db.{timestamped_schema}.{table['table_name']};"
).show()

print(f"--Time elapsed: {pendulum.now() - startTime}")
print("##################################### FINISHED ########")

# check that stuff was loaded
rows = conn.sql(
f"SELECT count(*) as count FROM pg_db.{timestamped_schema}.{table['table_name']};"
).fetchone()[0]
assert int(rows) == 9

# 5. Cleanup and rename Schema
print(
"##################################### RENAME Schema and CLEANUP ########"
)
try:
con_hd = psycopg2.connect(
dbname=target_credentials.database,
user=target_credentials.username,
password=target_credentials.password,
host=target_credentials.host,
port=target_credentials.port,
)
con_hd.autocommit = True
print(
"Connected to HD-DB: "
+ target_credentials.host
+ ", DB: "
+ target_credentials.username
)
except Exception as e:
print(f"Unable to connect to HD-database! The reason: {e}")

with con_hd.cursor() as cur:
# Drop existing target_schema_name
print(f"Drop existing {target_schema_name}")
cur.execute(f"DROP SCHEMA IF EXISTS {target_schema_name} CASCADE;")
# Rename timestamped-target_schema_name to target_schema_name
print(f"Going to rename schema {timestamped_schema} to {target_schema_name}")
cur.execute(f"ALTER SCHEMA {timestamped_schema} RENAME TO {target_schema_name};")

con_hd.close()
Loading
Loading