Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

data pond: expose readable datasets as dataframes and arrow tables #1507

Merged
merged 119 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
119 commits
Select commit Hold shift + click to select a range
af6a40e
add simple ibis helper
sh-rp Jun 19, 2024
3a69ece
start working on dataframe reading interface
sh-rp Jun 20, 2024
4324650
a bit more work
sh-rp Jun 20, 2024
7c960df
first simple implementation
sh-rp Jun 21, 2024
86b89ac
small change
sh-rp Jun 21, 2024
5a8ea54
more work on dataset
sh-rp Jun 21, 2024
36e94af
some work on filesystem destination
sh-rp Jun 24, 2024
20bf9ce
add support for parquet files and compression on jsonl files in files…
sh-rp Jun 26, 2024
6dce626
Merge branch 'devel' into exp/1095-expose-readable-datasets
sh-rp Jul 17, 2024
a0ff55f
fix test after devel merge
sh-rp Jul 17, 2024
c297e96
add nice composable pipeline example
sh-rp Jul 17, 2024
d020403
small updates to demo
sh-rp Jul 18, 2024
5c3db47
Merge branch 'devel' into exp/1095-expose-readable-datasets
sh-rp Aug 6, 2024
79ef7dd
enable tests for all bucket providers
sh-rp Aug 6, 2024
ff40079
fix tests
sh-rp Aug 6, 2024
ac415b9
create views in duckdb filesystem accessor
sh-rp Aug 6, 2024
c92a527
move to relations based interface
sh-rp Aug 6, 2024
13ec73b
add generic duckdb interface to filesystem
sh-rp Aug 6, 2024
46e0226
move code for accessing frames and tables to the cursor and use duckd…
sh-rp Aug 6, 2024
7cf69a7
add native db api cursor fetching to exposed dataset
sh-rp Aug 7, 2024
6ffe302
some small changes
sh-rp Aug 7, 2024
c200262
switch dataaccess pandas to pyarrow
sh-rp Aug 7, 2024
226454f
add native bigquery support for df and arrow tables
sh-rp Aug 7, 2024
3296e63
change iter functions to always expect chunk size (None will default …
sh-rp Aug 7, 2024
6f6500f
add native implementation for databricks
sh-rp Aug 7, 2024
152b788
add dremio native implementation for full frames and tables
sh-rp Aug 7, 2024
6d73bc5
fix filesystem test
sh-rp Aug 7, 2024
bdb39ba
add test for evolving filesystem
sh-rp Aug 7, 2024
3ead92b
fix empty dataframe retrieval
sh-rp Aug 7, 2024
9fcbd00
remove old df test
sh-rp Aug 7, 2024
28ee1c6
clean up interfaces a bit (more to come?)
sh-rp Aug 8, 2024
28cb282
move dataset creation into destination client and clean up interfaces…
sh-rp Aug 8, 2024
77926fa
renames some interfaces and adds brief docstrings
sh-rp Aug 8, 2024
6ef04bc
add filesystem cached duckdb and remove the need to declare needed vi…
sh-rp Aug 8, 2024
ec13b49
fix tests for snowflake
sh-rp Aug 8, 2024
b222d1d
make data set a function
sh-rp Aug 8, 2024
9f0a6a5
fix db-types depdency for bigquery
sh-rp Aug 8, 2024
289b63c
create duckdb based sql client for filesystem
sh-rp Aug 13, 2024
779bca6
fix example pipeline
sh-rp Aug 13, 2024
584ab47
enable filesystem sql client to work on streamlit
sh-rp Aug 13, 2024
6594053
add comments
sh-rp Aug 13, 2024
9e0a61d
rename sql to query
sh-rp Aug 13, 2024
dd47326
fix tests that rely on sql client
sh-rp Aug 13, 2024
9f8f79b
Merge branch 'devel' into exp/1095-expose-readable-datasets
sh-rp Sep 18, 2024
fda1cb5
post merge cleanups
sh-rp Sep 18, 2024
c7a0e05
move imports around a bit
sh-rp Sep 18, 2024
8497036
exclude abfss buckets from test
sh-rp Sep 19, 2024
3dc2c90
add support for arrow schema creation from known dlt schema
sh-rp Aug 13, 2024
d6bec38
re-use sqldatabase code for cursors
sh-rp Sep 19, 2024
62ea3ba
fix bug
sh-rp Sep 19, 2024
3fd4d61
add default columns where needed
sh-rp Sep 19, 2024
eeca4ac
Merge branch 'devel' into exp/1095-expose-readable-datasets
sh-rp Sep 20, 2024
52f8523
add sql glot to filesystem deps
sh-rp Sep 20, 2024
90c669a
store filesystem tables in correct dataset
sh-rp Sep 20, 2024
7657fb1
move cursor columns location
sh-rp Sep 20, 2024
352b238
fix snowflake and mssql
sh-rp Sep 20, 2024
5fadeeb
Merge branch 'devel' into exp/1095-expose-readable-datasets
sh-rp Sep 20, 2024
9a1752d
clean up compose files a bit
sh-rp Sep 20, 2024
a77192f
fix sqlalchemy
sh-rp Sep 20, 2024
420eaf1
add mysql docker compose file
sh-rp Sep 20, 2024
97e2757
fix linting
sh-rp Sep 20, 2024
df4f6d0
prepare hint checking
sh-rp Sep 20, 2024
6b27b98
disable part of state test
sh-rp Sep 22, 2024
ffba901
enable hint check
sh-rp Sep 23, 2024
fab5232
add column type support for filesystem json
sh-rp Sep 23, 2024
0de4a6c
rename dataset implementation to DBAPI
sh-rp Sep 23, 2024
077a25a
wrap functions in dbapi readable dataset
sh-rp Sep 23, 2024
13a759b
remove example pipeline
sh-rp Sep 23, 2024
10e04d6
rename test_decimal_name
sh-rp Sep 23, 2024
5077ce1
make column code a bit clearer and fix mssql again
sh-rp Sep 23, 2024
1025560
rename df methods to pandas
sh-rp Sep 23, 2024
f8927d3
fix bug in default columns
sh-rp Sep 23, 2024
7fd3c62
fix hints test and columns bug
sh-rp Sep 23, 2024
3a76178
catch mysql error if no rows returned
sh-rp Sep 23, 2024
27104e3
add exceptions for not implemented bucket and filetypes
sh-rp Sep 23, 2024
1c06d11
fix docs
sh-rp Sep 23, 2024
e5b3688
Merge branch 'devel' into exp/1095-expose-readable-datasets
sh-rp Sep 23, 2024
7d09bdb
add config section for getting pipeline clients
sh-rp Sep 26, 2024
dbe4baa
set default dataset in filesystem sqlclient
sh-rp Sep 26, 2024
f4e0099
add config section for sync_destination
sh-rp Sep 26, 2024
80fe898
Merge branch 'devel' into exp/1095-expose-readable-datasets
sh-rp Sep 26, 2024
d698cd5
Merge branch 'devel' into exp/1095-expose-readable-datasets
sh-rp Sep 27, 2024
857803c
rename readablerelation methods
sh-rp Sep 30, 2024
8055529
use more functions of the duckdb sql client in filesystem version
sh-rp Sep 30, 2024
24c7308
update dependencies
sh-rp Sep 30, 2024
76759cf
use active pipeline capabilities if available for arrow table
sh-rp Sep 30, 2024
d3d8381
update types
sh-rp Sep 30, 2024
f9a766d
rename dataset accessor function
sh-rp Sep 30, 2024
b6c7fbc
add test for accessing tables with unquqlified tablename
sh-rp Sep 30, 2024
86fc914
fix sql client
sh-rp Sep 30, 2024
58380ec
add duckdb native support for azure, s3 and gcs (via s3)
sh-rp Sep 30, 2024
0a24b3a
some typing
sh-rp Sep 30, 2024
bef50d7
add dataframes tests back in
sh-rp Sep 30, 2024
b13e492
add join table and update view tests for filesystem
sh-rp Sep 30, 2024
92ea515
start adding tests for creating views on remote duckdb
sh-rp Sep 30, 2024
e1fa308
fix snippets
sh-rp Sep 30, 2024
a7958d5
fix some dependencies and mssql/synapse tests
sh-rp Sep 30, 2024
ed197ea
fix bigquery dependencies and abfss tests
sh-rp Oct 1, 2024
0ec1656
add tests for adding view to external dbs and persistent secrets
sh-rp Oct 1, 2024
9cd4173
add support for delta tables
sh-rp Oct 1, 2024
7dba771
add duckdb to read interface tests
sh-rp Oct 1, 2024
3e96a6c
fix delta tests
sh-rp Oct 1, 2024
355f5b6
make default secret name derived from bucket url
sh-rp Oct 1, 2024
9002f02
try fix azure tests again
sh-rp Oct 1, 2024
c3050d4
fix df access tests
sh-rp Oct 2, 2024
bbc0525
PR fixes
sh-rp Oct 2, 2024
ef148c3
Merge branch 'devel' into exp/1095-expose-readable-datasets
sh-rp Oct 2, 2024
a99e987
Merge branch 'devel' into exp/1095-expose-readable-datasets
sh-rp Oct 2, 2024
eaf1cd8
correct internal table access
sh-rp Oct 4, 2024
6bb7117
allow datasets without schema
sh-rp Oct 4, 2024
6648b86
skips parametrized queries, skips tables from non-dataset schemas
rudolfix Oct 6, 2024
89a9861
move filesystem specific sql_client tests to correct location and tes…
sh-rp Oct 7, 2024
631d50b
fix sql client tests
sh-rp Oct 7, 2024
8e2e37c
make secret name when dropping optional
sh-rp Oct 7, 2024
dc383fc
fix gs test
sh-rp Oct 7, 2024
41926ae
remove moved filesystem tests from test_read_interfaces
sh-rp Oct 7, 2024
9b8437a
fix sql client tests again... :)
sh-rp Oct 7, 2024
5d14045
clear duckdb secrets
sh-rp Oct 8, 2024
fb9a445
disable secrets deleting for delta tests
sh-rp Oct 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 50 additions & 24 deletions dlt/destinations/impl/filesystem/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,31 +46,26 @@ def __init__(
)
self.fs_client = fs_client
self.using_external_database = duckdb_connection is not None
rudolfix marked this conversation as resolved.
Show resolved Hide resolved
self.create_persistent_secrets = False
self.autocreate_required_views = False
self.autocreate_required_views = True

if self.fs_client.config.protocol not in SUPPORTED_PROTOCOLS:
raise NotImplementedError(
f"Protocol {self.fs_client.config.protocol} currently not supported for"
f" FilesystemSqlClient. Supported protocols are {SUPPORTED_PROTOCOLS}."
)

def open_connection(self) -> duckdb.DuckDBPyConnection:
# we keep the in memory instance around, so if this prop is set, return it
if self._conn:
return self._conn
super().open_connection()
def create_authentication(self, persistent: bool = False, secret_name: str = None) -> None:
if not secret_name:
secret_name = f"secret_{self.fs_client.config.protocol}"

# set up connection and dataset
self._existing_views: List[str] = [] # remember which views already where created
if not self.has_dataset():
self.create_dataset()
self._conn.sql(f"USE {self.dataset_name}")
self.autocreate_required_views = True
persistent_stmt = ""
if persistent:
persistent_stmt = " PERSISTENT "

persistent = ""
if self.create_persistent_secrets:
persistent = " PERSISTENT "
# abfss buckets have an @ compontent
scope = self.fs_client.config.bucket_url
if "@" in scope:
scope = scope.split("@")[0]

# add secrets required for creating views
if self.fs_client.config.protocol == "s3":
rudolfix marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -81,12 +76,13 @@ def open_connection(self) -> duckdb.DuckDBPyConnection:
else "s3.amazonaws.com"
)
self._conn.sql(f"""
CREATE {persistent} SECRET secret_aws (
CREATE OR REPLACE {persistent_stmt} SECRET {secret_name} (
TYPE S3,
KEY_ID '{aws_creds.aws_access_key_id}',
SECRET '{aws_creds.aws_secret_access_key}',
REGION '{aws_creds.region_name}',
ENDPOINT '{endpoint}'
ENDPOINT '{endpoint}',
SCOPE '{scope}'
);""")

# azure with storage account creds
Expand All @@ -95,9 +91,10 @@ def open_connection(self) -> duckdb.DuckDBPyConnection:
):
azsa_creds = self.fs_client.config.credentials
self._conn.sql(f"""
CREATE {persistent} SECRET secret_az (
CREATE OR REPLACE {persistent_stmt} SECRET {secret_name} (
TYPE AZURE,
CONNECTION_STRING 'AccountName={azsa_creds.azure_storage_account_name};AccountKey={azsa_creds.azure_storage_account_key}'
CONNECTION_STRING 'AccountName={azsa_creds.azure_storage_account_name};AccountKey={azsa_creds.azure_storage_account_key}',
SCOPE '{scope}'
);""")

# azure with service principal creds
Expand All @@ -106,14 +103,21 @@ def open_connection(self) -> duckdb.DuckDBPyConnection:
):
azsp_creds = self.fs_client.config.credentials
self._conn.sql(f"""
CREATE SECRET secret_az (
CREATE OR REPLACE {persistent_stmt} SECRET {secret_name} (
TYPE AZURE,
PROVIDER SERVICE_PRINCIPAL,
TENANT_ID '{azsp_creds.azure_tenant_id}',
CLIENT_ID '{azsp_creds.azure_client_id}',
CLIENT_SECRET '{azsp_creds.azure_client_secret}',
ACCOUNT_NAME '{azsp_creds.azure_storage_account_name}'
ACCOUNT_NAME '{azsp_creds.azure_storage_account_name}',
SCOPE '{scope}'
);""")
elif persistent:
raise Exception(
"Cannot create persistent secret for filesystem protocol"
f" {self.fs_client.config.protocol}. If you are trying to use persistent secrets"
" with gs/gcs, please use the s3 compatibility layer."
)

# native google storage implementation is not supported..
elif self.fs_client.config.protocol in ["gs", "gcs"]:
Expand All @@ -127,6 +131,28 @@ def open_connection(self) -> duckdb.DuckDBPyConnection:
elif self.fs_client.config.protocol == "memory":
self._conn.register_filesystem(self.fs_client.fs_client)

def open_connection(self) -> duckdb.DuckDBPyConnection:
# we keep the in memory instance around, so if this prop is set, return it
if self._conn:
return self._conn
super().open_connection()

# set up connection and dataset
self._existing_views: List[str] = [] # remember which views already where created

self.autocreate_required_views = False
if not self.has_dataset():
self.create_dataset()
self.autocreate_required_views = True
self._conn.sql(f"USE {self.dataset_name}")

# the line below solves problems with certificate path lookup on linux
# see duckdb docs
self._conn.sql("SET azure_transport_option_type = 'curl';")
rudolfix marked this conversation as resolved.
Show resolved Hide resolved

# create authentication to data provider
self.create_authentication()

return self._conn

def close_connection(self) -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you won't need override if you pass connection instance in credentials.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed it

Expand All @@ -135,7 +161,7 @@ def close_connection(self) -> None:
return super().close_connection()

@raise_database_error
def create_view_for_tables(self, tables: Dict[str, str]) -> None:
def create_views_for_tables(self, tables: Dict[str, str]) -> None:
"""Add the required tables as views to the duckdb in memory instance"""

# create all tables in duck instance
Expand Down Expand Up @@ -208,7 +234,7 @@ def execute_query(self, query: AnyStr, *args: Any, **kwargs: Any) -> Iterator[DB
if self.autocreate_required_views: # skip this step when operating on the schema..
expression = sqlglot.parse_one(query, read="duckdb") # type: ignore
rudolfix marked this conversation as resolved.
Show resolved Hide resolved
load_tables = {t.name: t.name for t in expression.find_all(exp.Table)}
self.create_view_for_tables(load_tables)
self.create_views_for_tables(load_tables)

# TODO: raise on non-select queries here, they do not make sense in this context
with super().execute_query(query, *args, **kwargs) as cursor:
Expand Down
59 changes: 40 additions & 19 deletions tests/load/test_read_interfaces.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from typing import Any

import pytest
import dlt
import os

from dlt import Pipeline
from dlt.common import Decimal
from dlt.common.utils import uniq_id

from typing import List
from functools import reduce
Expand Down Expand Up @@ -200,7 +203,7 @@ def double_items():

# check we can create new tables from the views
with pipeline.sql_client() as c:
c.create_view_for_tables({"items": "items", "double_items": "double_items"})
c.create_views_for_tables({"items": "items", "double_items": "double_items"})
c.execute_sql(
rudolfix marked this conversation as resolved.
Show resolved Hide resolved
"CREATE TABLE items_joined AS (SELECT i.id, di.double_id FROM items as i JOIN"
" double_items as di ON (i.id = di.id));"
Expand All @@ -218,30 +221,48 @@ def double_items():
except Exception as exc:
assert "double_items is not an table" in str(exc)

# we create a second duckdb pipieline and will see if we can make our filesystem views available there
other_pipeline = dlt.pipeline("other_pipeline", dev_mode=True, destination="duckdb")
other_db_location = (
other_pipeline.destination_client().config.credentials.database # type: ignore
)
other_pipeline.run([1, 2, 3], table_name="items")
assert len(other_pipeline._dataset().items.fetchall()) == 3

# TODO: implement these tests
return
# we create a duckdb with a table an see wether we can add more views
duck_db_location = "_storage/" + uniq_id()
rudolfix marked this conversation as resolved.
Show resolved Hide resolved
external_db = duckdb.connect(duck_db_location)
external_db.execute("CREATE SCHEMA first;")
external_db.execute("CREATE SCHEMA second;")
external_db.execute("CREATE TABLE first.items AS SELECT i FROM range(0, 3) t(i)")
assert len(external_db.sql("SELECT * FROM first.items").fetchall()) == 3

# now we can use the filesystemsql client to create the needed views
fs_client: Any = pipeline.destination_client()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this interface is a bit clunky at the moment. we should probably have a unified interface that enables us to mount views into duckdb from any destination that supports it, if you have an idea of where it could live and what the signature should be you can let me know. for the first little while it should be ok, since we are using inner classes here. the secrets are actually also not stored in the database file but in a global directory for duckdb, so I've built it in a way that whoever controls the duckdb connection from the outside can also create secrets with a given key and is then able to delete them again.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the interface is already there. it is called update_stored_schema on job client. in this case filesystem is a staging destination for duckdb and you create all tables as views, pass the credentials etc. maybe we need some convenience method that creates dataset instance out of such strucutre (so we take not just destination but also staging as input to dataset).

the nice thing is that all this duckdb related code that creates views and does permission handover could go to duckdb client.

#1692

this is a good followup ticket

fs_sql_client = FilesystemSqlClient(
pipeline.destination_client(),
dataset_name=other_pipeline.dataset_name,
duckdb_connection=duckdb.connect(other_db_location),
dataset_name="second",
fs_client=fs_client,
duckdb_connection=external_db,
)
fs_sql_client.create_persistent_secrets = True
with fs_sql_client as sql_client:
sql_client.create_view_for_tables({"items": "referenced_items"})
sql_client.create_views_for_tables({"items": "referenced_items"})
assert len(external_db.sql("SELECT * FROM second.referenced_items").fetchall()) == 3000
assert len(external_db.sql("SELECT * FROM first.items").fetchall()) == 3

# test creating persistent secrets
# NOTE: there is some kind of duckdb cache that makes testing persistent secrets impossible
# because somehow the non-persistent secrets are around as long as the python process runs, even
# wenn closing the db connection, renaming the db file and reconnecting
secret_name = f"secret_{uniq_id()}_secret"

supports_persistent_secrets = (
destination_config.bucket_url.startswith("s3")
or destination_config.bucket_url.startswith("az")
or destination_config.bucket_url.startswith("abfss")
)

# we now have access to this view on the original dataset
assert len(other_pipeline._dataset().items.fetchall()) == 3
assert len(other_pipeline._dataset().referenced_items.fetchall()) == 3000
try:
with fs_sql_client as sql_client:
fs_sql_client.create_authentication(persistent=True, secret_name=secret_name)
# the line below would error if there were no persistent secrets of the given name
external_db.execute(f"DROP PERSISTENT SECRET {secret_name}")
except Exception as exc:
assert (
not supports_persistent_secrets
), f"{destination_config.bucket_url} is expected to support persistent secrets"
assert "Cannot create persistent secret" in str(exc)


@pytest.mark.essential
Expand Down
Loading