Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

data pond: expose readable datasets as dataframes and arrow tables #1507

Merged
merged 119 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from 111 commits
Commits
Show all changes
119 commits
Select commit Hold shift + click to select a range
af6a40e
add simple ibis helper
sh-rp Jun 19, 2024
3a69ece
start working on dataframe reading interface
sh-rp Jun 20, 2024
4324650
a bit more work
sh-rp Jun 20, 2024
7c960df
first simple implementation
sh-rp Jun 21, 2024
86b89ac
small change
sh-rp Jun 21, 2024
5a8ea54
more work on dataset
sh-rp Jun 21, 2024
36e94af
some work on filesystem destination
sh-rp Jun 24, 2024
20bf9ce
add support for parquet files and compression on jsonl files in files…
sh-rp Jun 26, 2024
6dce626
Merge branch 'devel' into exp/1095-expose-readable-datasets
sh-rp Jul 17, 2024
a0ff55f
fix test after devel merge
sh-rp Jul 17, 2024
c297e96
add nice composable pipeline example
sh-rp Jul 17, 2024
d020403
small updates to demo
sh-rp Jul 18, 2024
5c3db47
Merge branch 'devel' into exp/1095-expose-readable-datasets
sh-rp Aug 6, 2024
79ef7dd
enable tests for all bucket providers
sh-rp Aug 6, 2024
ff40079
fix tests
sh-rp Aug 6, 2024
ac415b9
create views in duckdb filesystem accessor
sh-rp Aug 6, 2024
c92a527
move to relations based interface
sh-rp Aug 6, 2024
13ec73b
add generic duckdb interface to filesystem
sh-rp Aug 6, 2024
46e0226
move code for accessing frames and tables to the cursor and use duckd…
sh-rp Aug 6, 2024
7cf69a7
add native db api cursor fetching to exposed dataset
sh-rp Aug 7, 2024
6ffe302
some small changes
sh-rp Aug 7, 2024
c200262
switch dataaccess pandas to pyarrow
sh-rp Aug 7, 2024
226454f
add native bigquery support for df and arrow tables
sh-rp Aug 7, 2024
3296e63
change iter functions to always expect chunk size (None will default …
sh-rp Aug 7, 2024
6f6500f
add native implementation for databricks
sh-rp Aug 7, 2024
152b788
add dremio native implementation for full frames and tables
sh-rp Aug 7, 2024
6d73bc5
fix filesystem test
sh-rp Aug 7, 2024
bdb39ba
add test for evolving filesystem
sh-rp Aug 7, 2024
3ead92b
fix empty dataframe retrieval
sh-rp Aug 7, 2024
9fcbd00
remove old df test
sh-rp Aug 7, 2024
28ee1c6
clean up interfaces a bit (more to come?)
sh-rp Aug 8, 2024
28cb282
move dataset creation into destination client and clean up interfaces…
sh-rp Aug 8, 2024
77926fa
renames some interfaces and adds brief docstrings
sh-rp Aug 8, 2024
6ef04bc
add filesystem cached duckdb and remove the need to declare needed vi…
sh-rp Aug 8, 2024
ec13b49
fix tests for snowflake
sh-rp Aug 8, 2024
b222d1d
make data set a function
sh-rp Aug 8, 2024
9f0a6a5
fix db-types depdency for bigquery
sh-rp Aug 8, 2024
289b63c
create duckdb based sql client for filesystem
sh-rp Aug 13, 2024
779bca6
fix example pipeline
sh-rp Aug 13, 2024
584ab47
enable filesystem sql client to work on streamlit
sh-rp Aug 13, 2024
6594053
add comments
sh-rp Aug 13, 2024
9e0a61d
rename sql to query
sh-rp Aug 13, 2024
dd47326
fix tests that rely on sql client
sh-rp Aug 13, 2024
9f8f79b
Merge branch 'devel' into exp/1095-expose-readable-datasets
sh-rp Sep 18, 2024
fda1cb5
post merge cleanups
sh-rp Sep 18, 2024
c7a0e05
move imports around a bit
sh-rp Sep 18, 2024
8497036
exclude abfss buckets from test
sh-rp Sep 19, 2024
3dc2c90
add support for arrow schema creation from known dlt schema
sh-rp Aug 13, 2024
d6bec38
re-use sqldatabase code for cursors
sh-rp Sep 19, 2024
62ea3ba
fix bug
sh-rp Sep 19, 2024
3fd4d61
add default columns where needed
sh-rp Sep 19, 2024
eeca4ac
Merge branch 'devel' into exp/1095-expose-readable-datasets
sh-rp Sep 20, 2024
52f8523
add sql glot to filesystem deps
sh-rp Sep 20, 2024
90c669a
store filesystem tables in correct dataset
sh-rp Sep 20, 2024
7657fb1
move cursor columns location
sh-rp Sep 20, 2024
352b238
fix snowflake and mssql
sh-rp Sep 20, 2024
5fadeeb
Merge branch 'devel' into exp/1095-expose-readable-datasets
sh-rp Sep 20, 2024
9a1752d
clean up compose files a bit
sh-rp Sep 20, 2024
a77192f
fix sqlalchemy
sh-rp Sep 20, 2024
420eaf1
add mysql docker compose file
sh-rp Sep 20, 2024
97e2757
fix linting
sh-rp Sep 20, 2024
df4f6d0
prepare hint checking
sh-rp Sep 20, 2024
6b27b98
disable part of state test
sh-rp Sep 22, 2024
ffba901
enable hint check
sh-rp Sep 23, 2024
fab5232
add column type support for filesystem json
sh-rp Sep 23, 2024
0de4a6c
rename dataset implementation to DBAPI
sh-rp Sep 23, 2024
077a25a
wrap functions in dbapi readable dataset
sh-rp Sep 23, 2024
13a759b
remove example pipeline
sh-rp Sep 23, 2024
10e04d6
rename test_decimal_name
sh-rp Sep 23, 2024
5077ce1
make column code a bit clearer and fix mssql again
sh-rp Sep 23, 2024
1025560
rename df methods to pandas
sh-rp Sep 23, 2024
f8927d3
fix bug in default columns
sh-rp Sep 23, 2024
7fd3c62
fix hints test and columns bug
sh-rp Sep 23, 2024
3a76178
catch mysql error if no rows returned
sh-rp Sep 23, 2024
27104e3
add exceptions for not implemented bucket and filetypes
sh-rp Sep 23, 2024
1c06d11
fix docs
sh-rp Sep 23, 2024
e5b3688
Merge branch 'devel' into exp/1095-expose-readable-datasets
sh-rp Sep 23, 2024
7d09bdb
add config section for getting pipeline clients
sh-rp Sep 26, 2024
dbe4baa
set default dataset in filesystem sqlclient
sh-rp Sep 26, 2024
f4e0099
add config section for sync_destination
sh-rp Sep 26, 2024
80fe898
Merge branch 'devel' into exp/1095-expose-readable-datasets
sh-rp Sep 26, 2024
d698cd5
Merge branch 'devel' into exp/1095-expose-readable-datasets
sh-rp Sep 27, 2024
857803c
rename readablerelation methods
sh-rp Sep 30, 2024
8055529
use more functions of the duckdb sql client in filesystem version
sh-rp Sep 30, 2024
24c7308
update dependencies
sh-rp Sep 30, 2024
76759cf
use active pipeline capabilities if available for arrow table
sh-rp Sep 30, 2024
d3d8381
update types
sh-rp Sep 30, 2024
f9a766d
rename dataset accessor function
sh-rp Sep 30, 2024
b6c7fbc
add test for accessing tables with unquqlified tablename
sh-rp Sep 30, 2024
86fc914
fix sql client
sh-rp Sep 30, 2024
58380ec
add duckdb native support for azure, s3 and gcs (via s3)
sh-rp Sep 30, 2024
0a24b3a
some typing
sh-rp Sep 30, 2024
bef50d7
add dataframes tests back in
sh-rp Sep 30, 2024
b13e492
add join table and update view tests for filesystem
sh-rp Sep 30, 2024
92ea515
start adding tests for creating views on remote duckdb
sh-rp Sep 30, 2024
e1fa308
fix snippets
sh-rp Sep 30, 2024
a7958d5
fix some dependencies and mssql/synapse tests
sh-rp Sep 30, 2024
ed197ea
fix bigquery dependencies and abfss tests
sh-rp Oct 1, 2024
0ec1656
add tests for adding view to external dbs and persistent secrets
sh-rp Oct 1, 2024
9cd4173
add support for delta tables
sh-rp Oct 1, 2024
7dba771
add duckdb to read interface tests
sh-rp Oct 1, 2024
3e96a6c
fix delta tests
sh-rp Oct 1, 2024
355f5b6
make default secret name derived from bucket url
sh-rp Oct 1, 2024
9002f02
try fix azure tests again
sh-rp Oct 1, 2024
c3050d4
fix df access tests
sh-rp Oct 2, 2024
bbc0525
PR fixes
sh-rp Oct 2, 2024
ef148c3
Merge branch 'devel' into exp/1095-expose-readable-datasets
sh-rp Oct 2, 2024
a99e987
Merge branch 'devel' into exp/1095-expose-readable-datasets
sh-rp Oct 2, 2024
eaf1cd8
correct internal table access
sh-rp Oct 4, 2024
6bb7117
allow datasets without schema
sh-rp Oct 4, 2024
6648b86
skips parametrized queries, skips tables from non-dataset schemas
rudolfix Oct 6, 2024
89a9861
move filesystem specific sql_client tests to correct location and tes…
sh-rp Oct 7, 2024
631d50b
fix sql client tests
sh-rp Oct 7, 2024
8e2e37c
make secret name when dropping optional
sh-rp Oct 7, 2024
dc383fc
fix gs test
sh-rp Oct 7, 2024
41926ae
remove moved filesystem tests from test_read_interfaces
sh-rp Oct 7, 2024
9b8437a
fix sql client tests again... :)
sh-rp Oct 7, 2024
5d14045
clear duckdb secrets
sh-rp Oct 8, 2024
fb9a445
disable secrets deleting for delta tests
sh-rp Oct 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ jobs:

- name: Install dependencies
# if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
run: poetry install --no-interaction -E redshift -E gs -E s3 -E az -E parquet -E duckdb -E cli --with sentry-sdk --with pipeline -E deltalake
run: poetry install --no-interaction -E redshift -E gs -E s3 -E az -E parquet -E duckdb -E cli -E filesystem --with sentry-sdk --with pipeline -E deltalake

- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_doc_snippets.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ jobs:
uses: actions/checkout@master

- name: Start weaviate
run: docker compose -f ".github/weaviate-compose.yml" up -d
run: docker compose -f "tests/load/weaviate/docker-compose.yml" up -d

- name: Setup Python
uses: actions/setup-python@v4
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test_local_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ jobs:
uses: actions/checkout@master

- name: Start weaviate
run: docker compose -f ".github/weaviate-compose.yml" up -d
run: docker compose -f "tests/load/weaviate/docker-compose.yml" up -d

- name: Setup Python
uses: actions/setup-python@v4
Expand Down Expand Up @@ -122,7 +122,7 @@ jobs:

- name: Stop weaviate
if: always()
run: docker compose -f ".github/weaviate-compose.yml" down -v
run: docker compose -f "tests/load/weaviate/docker-compose.yml" down -v

- name: Stop SFTP server
if: always()
Expand Down
5 changes: 3 additions & 2 deletions .github/workflows/test_pyarrow17.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,12 @@ jobs:
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-pyarrow17

- name: Install dependencies
run: poetry install --no-interaction --with sentry-sdk --with pipeline -E deltalake -E gs -E s3 -E az
run: poetry install --no-interaction --with sentry-sdk --with pipeline -E deltalake -E duckdb -E filesystem -E gs -E s3 -E az


- name: Upgrade pyarrow
run: poetry run pip install pyarrow==17.0.0

- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml

Expand Down
3 changes: 0 additions & 3 deletions .github/workflows/test_sqlalchemy_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,3 @@ jobs:
# always run full suite, also on branches
- run: poetry run pytest tests/load -x --ignore tests/load/sources
name: Run tests Linux
env:
DESTINATION__SQLALCHEMY_MYSQL__CREDENTIALS: mysql://root:[email protected]:3306/dlt_data # Use root cause we need to create databases
DESTINATION__SQLALCHEMY_SQLITE__CREDENTIALS: sqlite:///_storage/dl_data.sqlite
9 changes: 8 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,11 @@ test-build-images: build-library

preprocess-docs:
# run docs preprocessing to run a few checks and ensure examples can be parsed
cd docs/website && npm i && npm run preprocess-docs
cd docs/website && npm i && npm run preprocess-docs

start-test-containers:
docker compose -f "tests/load/dremio/docker-compose.yml" up -d
docker compose -f "tests/load/postgres/docker-compose.yml" up -d
docker compose -f "tests/load/weaviate/docker-compose.yml" up -d
docker compose -f "tests/load/filesystem_sftp/docker-compose.yml" up -d
docker compose -f "tests/load/sqlalchemy/docker-compose.yml" up -d
17 changes: 2 additions & 15 deletions dlt/common/data_writers/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,23 +320,10 @@ def _create_writer(self, schema: "pa.Schema") -> "pa.parquet.ParquetWriter":
)

def write_header(self, columns_schema: TTableSchemaColumns) -> None:
from dlt.common.libs.pyarrow import pyarrow, get_py_arrow_datatype
from dlt.common.libs.pyarrow import columns_to_arrow

# build schema
self.schema = pyarrow.schema(
[
pyarrow.field(
name,
get_py_arrow_datatype(
schema_item,
self._caps,
self.timestamp_timezone,
),
nullable=is_nullable_column(schema_item),
)
for name, schema_item in columns_schema.items()
]
)
self.schema = columns_to_arrow(columns_schema, self._caps, self.timestamp_timezone)
# find row items that are of the json type (could be abstracted out for use in other writers?)
self.nested_indices = [
i for i, field in columns_schema.items() if field["data_type"] == "json"
Expand Down
85 changes: 84 additions & 1 deletion dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from abc import ABC, abstractmethod
import dataclasses
from importlib import import_module
from contextlib import contextmanager

from types import TracebackType
from typing import (
Callable,
Expand All @@ -18,24 +20,33 @@
Any,
TypeVar,
Generic,
Generator,
TYPE_CHECKING,
Protocol,
Tuple,
AnyStr,
)
from typing_extensions import Annotated
import datetime # noqa: 251
import inspect

from dlt.common import logger, pendulum

from dlt.common.configuration.specs.base_configuration import extract_inner_hint
from dlt.common.destination.typing import PreparedTableSchema
from dlt.common.destination.utils import verify_schema_capabilities, verify_supported_data_types
from dlt.common.exceptions import TerminalException
from dlt.common.metrics import LoadJobMetrics
from dlt.common.normalizers.naming import NamingConvention
from dlt.common.schema import Schema, TSchemaTables
from dlt.common.schema.typing import TTableSchemaColumns

from dlt.common.schema import Schema, TSchemaTables, TTableSchema
from dlt.common.schema.typing import (
C_DLT_LOAD_ID,
TLoaderReplaceStrategy,
)
from dlt.common.schema.utils import fill_hints_from_parent_and_clone_table

from dlt.common.configuration import configspec, resolve_configuration, known_sections, NotResolved
from dlt.common.configuration.specs import BaseConfiguration, CredentialsConfiguration
from dlt.common.destination.capabilities import DestinationCapabilitiesContext
Expand All @@ -49,13 +60,26 @@
from dlt.common.storages import FileStorage
from dlt.common.storages.load_storage import ParsedLoadJobFileName
from dlt.common.storages.load_package import LoadJobInfo, TPipelineStateDoc
from dlt.common.exceptions import MissingDependencyException


TDestinationConfig = TypeVar("TDestinationConfig", bound="DestinationClientConfiguration")
TDestinationClient = TypeVar("TDestinationClient", bound="JobClientBase")
TDestinationDwhClient = TypeVar("TDestinationDwhClient", bound="DestinationClientDwhConfiguration")

DEFAULT_FILE_LAYOUT = "{table_name}/{load_id}.{file_id}.{ext}"

if TYPE_CHECKING:
try:
from dlt.common.libs.pandas import DataFrame
from dlt.common.libs.pyarrow import Table as ArrowTable
except MissingDependencyException:
DataFrame = Any
ArrowTable = Any
else:
DataFrame = Any
ArrowTable = Any


class StorageSchemaInfo(NamedTuple):
version_hash: str
Expand Down Expand Up @@ -442,6 +466,65 @@ def create_followup_jobs(self, final_state: TLoadJobState) -> List[FollowupJobRe
return []


class SupportsReadableRelation(Protocol):
"""A readable relation retrieved from a destination that supports it"""

schema_columns: TTableSchemaColumns
"""Known dlt table columns for this relation"""

def df(self, chunk_size: int = None) -> Optional[DataFrame]:
rudolfix marked this conversation as resolved.
Show resolved Hide resolved
"""Fetches the results as data frame. For large queries the results may be chunked

Fetches the results into a data frame. The default implementation uses helpers in `pandas.io.sql` to generate Pandas data frame.
This function will try to use native data frame generation for particular destination. For `BigQuery`: `QueryJob.to_dataframe` is used.
For `duckdb`: `DuckDBPyConnection.df'

Args:
chunk_size (int, optional): Will chunk the results into several data frames. Defaults to None
**kwargs (Any): Additional parameters which will be passed to native data frame generation function.

Returns:
Optional[DataFrame]: A data frame with query results. If chunk_size > 0, None will be returned if there is no more data in results
"""
...

def arrow(self, chunk_size: int = None) -> Optional[ArrowTable]: ...

def iter_df(self, chunk_size: int) -> Generator[DataFrame, None, None]: ...

def iter_arrow(self, chunk_size: int) -> Generator[ArrowTable, None, None]: ...

def fetchall(self) -> List[Tuple[Any, ...]]: ...
rudolfix marked this conversation as resolved.
Show resolved Hide resolved

def fetchmany(self, chunk_size: int) -> List[Tuple[Any, ...]]: ...

def iter_fetch(self, chunk_size: int) -> Generator[List[Tuple[Any, ...]], Any, Any]: ...

def fetchone(self) -> Optional[Tuple[Any, ...]]: ...


class DBApiCursor(SupportsReadableRelation):
"""Protocol for DBAPI cursor"""

description: Tuple[Any, ...]

native_cursor: "DBApiCursor"
"""Cursor implementation native to current destination"""

def execute(self, query: AnyStr, *args: Any, **kwargs: Any) -> None: ...
def close(self) -> None: ...


class SupportsReadableDataset(Protocol):
"""A readable dataset retrieved from a destination, has support for creating readable relations for a query or table"""

def __call__(self, query: Any) -> SupportsReadableRelation: ...

def __getitem__(self, table: str) -> SupportsReadableRelation: ...

def __getattr__(self, table: str) -> SupportsReadableRelation: ...


class JobClientBase(ABC):
def __init__(
self,
Expand Down
1 change: 1 addition & 0 deletions dlt/common/libs/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

try:
import pandas
from pandas import DataFrame
except ModuleNotFoundError:
raise MissingDependencyException("dlt Pandas Helpers", ["pandas"])

Expand Down
Loading
Loading