Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

data sink destination #891

Merged
merged 38 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
8b6028b
start sink
sh-rp Jan 11, 2024
6a924d9
parquet sink prototype
sh-rp Jan 11, 2024
83c1af6
some more sink implementations
sh-rp Jan 12, 2024
b972f23
finish first batch of helpers
sh-rp Jan 12, 2024
9dabeff
add missing tests and fix linting
sh-rp Jan 13, 2024
af6defd
make configuratio more versatile
sh-rp Jan 15, 2024
4d730e8
implement sink function progress state
sh-rp Jan 15, 2024
3b577b0
move to iterator
sh-rp Jan 15, 2024
5527689
persist sink load state in pipeline state
sh-rp Jan 25, 2024
0657034
fix unrelated typo
sh-rp Jan 30, 2024
b5db5b8
move sink state storage to loadpackage state
sh-rp Jan 30, 2024
189d24b
additional pr fixes
sh-rp Jan 30, 2024
57ed090
disable creating empty state file on loadpackage init
sh-rp Jan 30, 2024
4f53bc4
add sink docs page
sh-rp Jan 31, 2024
c6c06ba
small changes
sh-rp Feb 1, 2024
2f6a15a
Merge branch 'devel' into d#/data_sink_decorator
sh-rp Feb 6, 2024
374b267
make loadstorage state versioned and separate out common base functions
sh-rp Feb 6, 2024
daae33e
restrict access of destinations to load package state in accessor fun…
sh-rp Feb 6, 2024
644e6f3
fix tests
sh-rp Feb 6, 2024
9930ad6
add tests for state and new injectable context
sh-rp Feb 6, 2024
678d187
fix linter
sh-rp Feb 6, 2024
376832d
fix linter error
sh-rp Feb 7, 2024
79fce9e
some pr fixes
sh-rp Feb 7, 2024
8eb3e1a
Merge commit '17aea98527eab19f3300ee161114541f7eb2b5b5' into d#/data_…
sh-rp Feb 8, 2024
105569a
more pr fixes
sh-rp Feb 8, 2024
27b8b2c
small readme changes
sh-rp Feb 8, 2024
e60f2f1
Merge branch 'devel' into d#/data_sink_decorator
sh-rp Mar 4, 2024
3229745
add load id to loadpackage info in current
sh-rp Mar 4, 2024
dbbbe7c
add support for directly passing through the naming convention to the…
sh-rp Mar 4, 2024
db9b488
add support for batch size zero (filepath passthrouh)
sh-rp Mar 4, 2024
3c39f41
use patched version of flak8 encoding
sh-rp Mar 5, 2024
3dfcf39
fix tests
sh-rp Mar 5, 2024
bc44618
add support for secrets and config in sink
sh-rp Mar 5, 2024
db8d0ed
update sink docs
sh-rp Mar 5, 2024
d8719c1
revert encodings branch
sh-rp Mar 5, 2024
d7eb19d
fix small linting problem
sh-rp Mar 5, 2024
ef35502
add support for config specs
sh-rp Mar 6, 2024
2db3430
add possibility to create a resolved partial
sh-rp Mar 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions dlt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

from dlt import sources
from dlt.extract.decorators import source, resource, transformer, defer
from dlt.destinations.decorators import destination

from dlt.pipeline import (
pipeline as _pipeline,
run,
Expand Down Expand Up @@ -62,6 +64,7 @@
"resource",
"transformer",
"defer",
"destination",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please also import load_package() into current module so it can be requested from within decorated function

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is there, no?

"pipeline",
"run",
"attach",
Expand Down
1 change: 1 addition & 0 deletions dlt/common/configuration/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ def _thread_context(
thread_id = int(m.group(1))
else:
thread_id = threading.get_ident()

# return main context for main thread
if thread_id == Container._MAIN_THREAD_ID:
return self.main_context
Expand Down
125 changes: 83 additions & 42 deletions dlt/common/configuration/inject.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import inspect
import threading

from functools import wraps
from typing import Callable, Dict, Type, Any, Optional, Tuple, TypeVar, overload
from inspect import Signature, Parameter
Expand Down Expand Up @@ -58,6 +60,7 @@ def with_config(
include_defaults: bool = True,
accept_partial: bool = False,
initial_config: Optional[BaseConfiguration] = None,
base: Type[BaseConfiguration] = BaseConfiguration,
) -> Callable[[TFun], TFun]:
"""Injects values into decorated function arguments following the specification in `spec` or by deriving one from function's signature.

Expand All @@ -71,10 +74,11 @@ def with_config(
prefer_existing_sections: (bool, optional): When joining existing section context, the existing context will be preferred to the one in `sections`. Default: False
auto_pipeline_section (bool, optional): If True, a top level pipeline section will be added if `pipeline_name` argument is present . Defaults to False.
include_defaults (bool, optional): If True then arguments with default values will be included in synthesized spec. If False only the required arguments marked with `dlt.secrets.value` and `dlt.config.value` are included

base (Type[BaseConfiguration], optional): A base class for synthesized spec. Defaults to BaseConfiguration.
Returns:
Callable[[TFun], TFun]: A decorated function
"""

section_f: Callable[[StrAny], str] = None
# section may be a function from function arguments to section
if callable(sections):
Expand All @@ -88,9 +92,8 @@ def decorator(f: TFun) -> TFun:
)
spec_arg: Parameter = None
pipeline_name_arg: Parameter = None

if spec is None:
SPEC = spec_from_signature(f, sig, include_defaults)
SPEC = spec_from_signature(f, sig, include_defaults, base=base)
else:
SPEC = spec

Expand All @@ -109,51 +112,53 @@ def decorator(f: TFun) -> TFun:
pipeline_name_arg = p
pipeline_name_arg_default = None if p.default == Parameter.empty else p.default

@wraps(f)
def _wrap(*args: Any, **kwargs: Any) -> Any:
def resolve_config(bound_args: inspect.BoundArguments) -> BaseConfiguration:
"""Resolve arguments using the provided spec"""
# bind parameters to signature
bound_args = sig.bind(*args, **kwargs)
# for calls containing resolved spec in the kwargs, we do not need to resolve again
config: BaseConfiguration = None
if _LAST_DLT_CONFIG in kwargs:
config = last_config(**kwargs)

# if section derivation function was provided then call it
if section_f:
curr_sections: Tuple[str, ...] = (section_f(bound_args.arguments),)
# sections may be a string
elif isinstance(sections, str):
curr_sections = (sections,)
else:
# if section derivation function was provided then call it
if section_f:
curr_sections: Tuple[str, ...] = (section_f(bound_args.arguments),)
# sections may be a string
elif isinstance(sections, str):
curr_sections = (sections,)
else:
curr_sections = sections

# if one of arguments is spec the use it as initial value
if initial_config:
config = initial_config
elif spec_arg:
config = bound_args.arguments.get(spec_arg.name, None)
# resolve SPEC, also provide section_context with pipeline_name
if pipeline_name_arg:
curr_pipeline_name = bound_args.arguments.get(
pipeline_name_arg.name, pipeline_name_arg_default
)
else:
curr_pipeline_name = None
section_context = ConfigSectionContext(
pipeline_name=curr_pipeline_name,
sections=curr_sections,
merge_style=sections_merge_style,
curr_sections = sections

# if one of arguments is spec the use it as initial value
if initial_config:
config = initial_config
elif spec_arg:
config = bound_args.arguments.get(spec_arg.name, None)
# resolve SPEC, also provide section_context with pipeline_name
if pipeline_name_arg:
curr_pipeline_name = bound_args.arguments.get(
pipeline_name_arg.name, pipeline_name_arg_default
)
# this may be called from many threads so section_context is thread affine
with inject_section(section_context):
# print(f"RESOLVE CONF in inject: {f.__name__}: {section_context.sections} vs {sections}")
config = resolve_configuration(
config or SPEC(),
explicit_value=bound_args.arguments,
accept_partial=accept_partial,
)
resolved_params = dict(config)
else:
curr_pipeline_name = None
section_context = ConfigSectionContext(
pipeline_name=curr_pipeline_name,
sections=curr_sections,
merge_style=sections_merge_style,
)

# this may be called from many threads so section_context is thread affine
with inject_section(section_context):
# print(f"RESOLVE CONF in inject: {f.__name__}: {section_context.sections} vs {sections}")
return resolve_configuration(
config or SPEC(),
explicit_value=bound_args.arguments,
accept_partial=accept_partial,
)

def update_bound_args(
bound_args: inspect.BoundArguments, config: BaseConfiguration, *args, **kwargs
) -> None:
# overwrite or add resolved params
resolved_params = dict(config)
for p in sig.parameters.values():
if p.name in resolved_params:
bound_args.arguments[p.name] = resolved_params.pop(p.name)
Expand All @@ -167,12 +172,48 @@ def _wrap(*args: Any, **kwargs: Any) -> Any:
bound_args.arguments[kwargs_arg.name].update(resolved_params)
bound_args.arguments[kwargs_arg.name][_LAST_DLT_CONFIG] = config
bound_args.arguments[kwargs_arg.name][_ORIGINAL_ARGS] = (args, kwargs)

def create_resolved_partial() -> Any:
# creates a pre-resolved partial of the decorated function
empty_bound_args = sig.bind_partial()
config = resolve_config(empty_bound_args)

# TODO: do some checks, for example fail if there is a spec arg

def creator(*args: Any, **kwargs: Any) -> Any:
nonlocal config

# we can still overwrite the config
if _LAST_DLT_CONFIG in kwargs:
config = last_config(**kwargs)

# call the function with the pre-resolved config
bound_args = sig.bind(*args, **kwargs)
update_bound_args(bound_args, config, args, kwargs)
return f(*bound_args.args, **bound_args.kwargs)

return creator

@wraps(f)
def _wrap(*args: Any, **kwargs: Any) -> Any:
# Resolve config
config: BaseConfiguration = None
bound_args = sig.bind(*args, **kwargs)
if _LAST_DLT_CONFIG in kwargs:
config = last_config(**kwargs)
else:
config = resolve_config(bound_args)

# call the function with resolved config
update_bound_args(bound_args, config, args, kwargs)
return f(*bound_args.args, **bound_args.kwargs)

# register the spec for a wrapped function
_FUNC_SPECS[id(_wrap)] = SPEC

# add a method to create a pre-resolved partial
_wrap.create_resolved_partial = create_resolved_partial

return _wrap # type: ignore

# See if we're being called as @with_config or @with_config().
Expand Down
10 changes: 9 additions & 1 deletion dlt/common/data_types/type_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from enum import Enum

from dlt.common import pendulum, json, Decimal, Wei
from dlt.common.json import custom_pua_remove
from dlt.common.json import custom_pua_remove, json
from dlt.common.json._simplejson import custom_encode as json_custom_encode
from dlt.common.arithmetics import InvalidOperation
from dlt.common.data_types.typing import TDataType
Expand Down Expand Up @@ -105,6 +105,14 @@ def coerce_value(to_type: TDataType, from_type: TDataType, value: Any) -> Any:
return int(value.value)
return value

if to_type == "complex":
# try to coerce from text
if from_type == "text":
try:
return json.loads(value)
except Exception:
raise ValueError(value)

if to_type == "text":
if from_type == "complex":
return complex_to_str(value)
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/destination/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
]
ALL_SUPPORTED_FILE_FORMATS: Set[TLoaderFileFormat] = set(get_args(TLoaderFileFormat))
# file formats used internally by dlt
INTERNAL_LOADER_FILE_FORMATS: Set[TLoaderFileFormat] = {"puae-jsonl", "sql", "reference", "arrow"}
INTERNAL_LOADER_FILE_FORMATS: Set[TLoaderFileFormat] = {"sql", "reference", "arrow"}
# file formats that may be chosen by the user
EXTERNAL_LOADER_FILE_FORMATS: Set[TLoaderFileFormat] = (
set(get_args(TLoaderFileFormat)) - INTERNAL_LOADER_FILE_FORMATS
Expand Down
10 changes: 6 additions & 4 deletions dlt/common/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import datetime # noqa: 251
import humanize
import contextlib

from typing import (
Any,
Callable,
Expand Down Expand Up @@ -40,11 +41,15 @@
from dlt.common.schema.typing import TColumnNames, TColumnSchema, TWriteDisposition, TSchemaContract
from dlt.common.source import get_current_pipe_name
from dlt.common.storages.load_storage import LoadPackageInfo
from dlt.common.storages.load_package import PackageStorage

from dlt.common.time import ensure_pendulum_datetime, precise_time
from dlt.common.typing import DictStrAny, REPattern, StrAny, SupportsHumanize
from dlt.common.jsonpath import delete_matches, TAnyJsonPath
from dlt.common.data_writers.writers import DataWriterMetrics, TLoaderFileFormat
from dlt.common.utils import RowCounts, merge_row_counts
from dlt.common.versioned_state import TVersionedState
from dlt.common.storages.load_package import TLoadPackageState


class _StepInfo(NamedTuple):
Expand Down Expand Up @@ -448,7 +453,7 @@ class TPipelineLocalState(TypedDict, total=False):
"""Hash of state that was recently synced with destination"""


class TPipelineState(TypedDict, total=False):
class TPipelineState(TVersionedState, total=False):
"""Schema for a pipeline state that is stored within the pipeline working directory"""

pipeline_name: str
Expand All @@ -463,9 +468,6 @@ class TPipelineState(TypedDict, total=False):
staging_type: Optional[str]

# properties starting with _ are not automatically applied to pipeline object when state is restored
_state_version: int
_version_hash: str
_state_engine_version: int
_local: TPipelineLocalState
"""A section of state that is not synchronized with the destination and does not participate in change merging and version control"""

Expand Down
7 changes: 5 additions & 2 deletions dlt/common/reflection/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ def _first_up(s: str) -> str:


def spec_from_signature(
f: AnyFun, sig: Signature, include_defaults: bool = True
f: AnyFun,
sig: Signature,
include_defaults: bool = True,
base: Type[BaseConfiguration] = BaseConfiguration,
) -> Type[BaseConfiguration]:
name = _get_spec_name_from_f(f)
module = inspect.getmodule(f)
Expand Down Expand Up @@ -109,7 +112,7 @@ def dlt_config_literal_to_type(arg_name: str) -> AnyType:
# set annotations so they are present in __dict__
fields["__annotations__"] = annotations
# synthesize type
T: Type[BaseConfiguration] = type(name, (BaseConfiguration,), fields)
T: Type[BaseConfiguration] = type(name, (base,), fields)
SPEC = configspec()(T)
# add to the module
setattr(module, spec_id, SPEC)
Expand Down
Loading