Skip to content

Commit

Permalink
Merge branch 'd#/data_sink_decorator' into devel
Browse files Browse the repository at this point in the history
# Conflicts:
#	dlt/extract/incremental/__init__.py
  • Loading branch information
sh-rp committed Mar 7, 2024
2 parents 3761335 + 2db3430 commit f7ec07c
Show file tree
Hide file tree
Showing 37 changed files with 1,563 additions and 181 deletions.
3 changes: 3 additions & 0 deletions dlt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

from dlt import sources
from dlt.extract.decorators import source, resource, transformer, defer
from dlt.destinations.decorators import destination

from dlt.pipeline import (
pipeline as _pipeline,
run,
Expand Down Expand Up @@ -62,6 +64,7 @@
"resource",
"transformer",
"defer",
"destination",
"pipeline",
"run",
"attach",
Expand Down
1 change: 1 addition & 0 deletions dlt/common/configuration/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ def _thread_context(
thread_id = int(m.group(1))
else:
thread_id = threading.get_ident()

# return main context for main thread
if thread_id == Container._MAIN_THREAD_ID:
return self.main_context
Expand Down
125 changes: 83 additions & 42 deletions dlt/common/configuration/inject.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import inspect
import threading

from functools import wraps
from typing import Callable, Dict, Type, Any, Optional, Tuple, TypeVar, overload
from inspect import Signature, Parameter
Expand Down Expand Up @@ -58,6 +60,7 @@ def with_config(
include_defaults: bool = True,
accept_partial: bool = False,
initial_config: Optional[BaseConfiguration] = None,
base: Type[BaseConfiguration] = BaseConfiguration,
) -> Callable[[TFun], TFun]:
"""Injects values into decorated function arguments following the specification in `spec` or by deriving one from function's signature.
Expand All @@ -71,10 +74,11 @@ def with_config(
prefer_existing_sections: (bool, optional): When joining existing section context, the existing context will be preferred to the one in `sections`. Default: False
auto_pipeline_section (bool, optional): If True, a top level pipeline section will be added if `pipeline_name` argument is present . Defaults to False.
include_defaults (bool, optional): If True then arguments with default values will be included in synthesized spec. If False only the required arguments marked with `dlt.secrets.value` and `dlt.config.value` are included
base (Type[BaseConfiguration], optional): A base class for synthesized spec. Defaults to BaseConfiguration.
Returns:
Callable[[TFun], TFun]: A decorated function
"""

section_f: Callable[[StrAny], str] = None
# section may be a function from function arguments to section
if callable(sections):
Expand All @@ -88,9 +92,8 @@ def decorator(f: TFun) -> TFun:
)
spec_arg: Parameter = None
pipeline_name_arg: Parameter = None

if spec is None:
SPEC = spec_from_signature(f, sig, include_defaults)
SPEC = spec_from_signature(f, sig, include_defaults, base=base)
else:
SPEC = spec

Expand All @@ -109,51 +112,53 @@ def decorator(f: TFun) -> TFun:
pipeline_name_arg = p
pipeline_name_arg_default = None if p.default == Parameter.empty else p.default

@wraps(f)
def _wrap(*args: Any, **kwargs: Any) -> Any:
def resolve_config(bound_args: inspect.BoundArguments) -> BaseConfiguration:
"""Resolve arguments using the provided spec"""
# bind parameters to signature
bound_args = sig.bind(*args, **kwargs)
# for calls containing resolved spec in the kwargs, we do not need to resolve again
config: BaseConfiguration = None
if _LAST_DLT_CONFIG in kwargs:
config = last_config(**kwargs)

# if section derivation function was provided then call it
if section_f:
curr_sections: Tuple[str, ...] = (section_f(bound_args.arguments),)
# sections may be a string
elif isinstance(sections, str):
curr_sections = (sections,)
else:
# if section derivation function was provided then call it
if section_f:
curr_sections: Tuple[str, ...] = (section_f(bound_args.arguments),)
# sections may be a string
elif isinstance(sections, str):
curr_sections = (sections,)
else:
curr_sections = sections

# if one of arguments is spec the use it as initial value
if initial_config:
config = initial_config
elif spec_arg:
config = bound_args.arguments.get(spec_arg.name, None)
# resolve SPEC, also provide section_context with pipeline_name
if pipeline_name_arg:
curr_pipeline_name = bound_args.arguments.get(
pipeline_name_arg.name, pipeline_name_arg_default
)
else:
curr_pipeline_name = None
section_context = ConfigSectionContext(
pipeline_name=curr_pipeline_name,
sections=curr_sections,
merge_style=sections_merge_style,
curr_sections = sections

# if one of arguments is spec the use it as initial value
if initial_config:
config = initial_config
elif spec_arg:
config = bound_args.arguments.get(spec_arg.name, None)
# resolve SPEC, also provide section_context with pipeline_name
if pipeline_name_arg:
curr_pipeline_name = bound_args.arguments.get(
pipeline_name_arg.name, pipeline_name_arg_default
)
# this may be called from many threads so section_context is thread affine
with inject_section(section_context):
# print(f"RESOLVE CONF in inject: {f.__name__}: {section_context.sections} vs {sections}")
config = resolve_configuration(
config or SPEC(),
explicit_value=bound_args.arguments,
accept_partial=accept_partial,
)
resolved_params = dict(config)
else:
curr_pipeline_name = None
section_context = ConfigSectionContext(
pipeline_name=curr_pipeline_name,
sections=curr_sections,
merge_style=sections_merge_style,
)

# this may be called from many threads so section_context is thread affine
with inject_section(section_context):
# print(f"RESOLVE CONF in inject: {f.__name__}: {section_context.sections} vs {sections}")
return resolve_configuration(
config or SPEC(),
explicit_value=bound_args.arguments,
accept_partial=accept_partial,
)

def update_bound_args(
bound_args: inspect.BoundArguments, config: BaseConfiguration, *args, **kwargs
) -> None:
# overwrite or add resolved params
resolved_params = dict(config)
for p in sig.parameters.values():
if p.name in resolved_params:
bound_args.arguments[p.name] = resolved_params.pop(p.name)
Expand All @@ -167,12 +172,48 @@ def _wrap(*args: Any, **kwargs: Any) -> Any:
bound_args.arguments[kwargs_arg.name].update(resolved_params)
bound_args.arguments[kwargs_arg.name][_LAST_DLT_CONFIG] = config
bound_args.arguments[kwargs_arg.name][_ORIGINAL_ARGS] = (args, kwargs)

def create_resolved_partial() -> Any:
# creates a pre-resolved partial of the decorated function
empty_bound_args = sig.bind_partial()
config = resolve_config(empty_bound_args)

# TODO: do some checks, for example fail if there is a spec arg

def creator(*args: Any, **kwargs: Any) -> Any:
nonlocal config

# we can still overwrite the config
if _LAST_DLT_CONFIG in kwargs:
config = last_config(**kwargs)

# call the function with the pre-resolved config
bound_args = sig.bind(*args, **kwargs)
update_bound_args(bound_args, config, args, kwargs)
return f(*bound_args.args, **bound_args.kwargs)

return creator

@wraps(f)
def _wrap(*args: Any, **kwargs: Any) -> Any:
# Resolve config
config: BaseConfiguration = None
bound_args = sig.bind(*args, **kwargs)
if _LAST_DLT_CONFIG in kwargs:
config = last_config(**kwargs)
else:
config = resolve_config(bound_args)

# call the function with resolved config
update_bound_args(bound_args, config, args, kwargs)
return f(*bound_args.args, **bound_args.kwargs)

# register the spec for a wrapped function
_FUNC_SPECS[id(_wrap)] = SPEC

# add a method to create a pre-resolved partial
_wrap.create_resolved_partial = create_resolved_partial

return _wrap # type: ignore

# See if we're being called as @with_config or @with_config().
Expand Down
10 changes: 9 additions & 1 deletion dlt/common/data_types/type_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from enum import Enum

from dlt.common import pendulum, json, Decimal, Wei
from dlt.common.json import custom_pua_remove
from dlt.common.json import custom_pua_remove, json
from dlt.common.json._simplejson import custom_encode as json_custom_encode
from dlt.common.arithmetics import InvalidOperation
from dlt.common.data_types.typing import TDataType
Expand Down Expand Up @@ -105,6 +105,14 @@ def coerce_value(to_type: TDataType, from_type: TDataType, value: Any) -> Any:
return int(value.value)
return value

if to_type == "complex":
# try to coerce from text
if from_type == "text":
try:
return json.loads(value)
except Exception:
raise ValueError(value)

if to_type == "text":
if from_type == "complex":
return complex_to_str(value)
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/destination/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
]
ALL_SUPPORTED_FILE_FORMATS: Set[TLoaderFileFormat] = set(get_args(TLoaderFileFormat))
# file formats used internally by dlt
INTERNAL_LOADER_FILE_FORMATS: Set[TLoaderFileFormat] = {"puae-jsonl", "sql", "reference", "arrow"}
INTERNAL_LOADER_FILE_FORMATS: Set[TLoaderFileFormat] = {"sql", "reference", "arrow"}
# file formats that may be chosen by the user
EXTERNAL_LOADER_FILE_FORMATS: Set[TLoaderFileFormat] = (
set(get_args(TLoaderFileFormat)) - INTERNAL_LOADER_FILE_FORMATS
Expand Down
10 changes: 6 additions & 4 deletions dlt/common/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import datetime # noqa: 251
import humanize
import contextlib

from typing import (
Any,
Callable,
Expand Down Expand Up @@ -40,11 +41,15 @@
from dlt.common.schema.typing import TColumnNames, TColumnSchema, TWriteDisposition, TSchemaContract
from dlt.common.source import get_current_pipe_name
from dlt.common.storages.load_storage import LoadPackageInfo
from dlt.common.storages.load_package import PackageStorage

from dlt.common.time import ensure_pendulum_datetime, precise_time
from dlt.common.typing import DictStrAny, REPattern, StrAny, SupportsHumanize
from dlt.common.jsonpath import delete_matches, TAnyJsonPath
from dlt.common.data_writers.writers import DataWriterMetrics, TLoaderFileFormat
from dlt.common.utils import RowCounts, merge_row_counts
from dlt.common.versioned_state import TVersionedState
from dlt.common.storages.load_package import TLoadPackageState


class _StepInfo(NamedTuple):
Expand Down Expand Up @@ -454,7 +459,7 @@ class TPipelineLocalState(TypedDict, total=False):
"""Hash of state that was recently synced with destination"""


class TPipelineState(TypedDict, total=False):
class TPipelineState(TVersionedState, total=False):
"""Schema for a pipeline state that is stored within the pipeline working directory"""

pipeline_name: str
Expand All @@ -469,9 +474,6 @@ class TPipelineState(TypedDict, total=False):
staging_type: Optional[str]

# properties starting with _ are not automatically applied to pipeline object when state is restored
_state_version: int
_version_hash: str
_state_engine_version: int
_local: TPipelineLocalState
"""A section of state that is not synchronized with the destination and does not participate in change merging and version control"""

Expand Down
7 changes: 5 additions & 2 deletions dlt/common/reflection/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ def _first_up(s: str) -> str:


def spec_from_signature(
f: AnyFun, sig: Signature, include_defaults: bool = True
f: AnyFun,
sig: Signature,
include_defaults: bool = True,
base: Type[BaseConfiguration] = BaseConfiguration,
) -> Type[BaseConfiguration]:
name = _get_spec_name_from_f(f)
module = inspect.getmodule(f)
Expand Down Expand Up @@ -109,7 +112,7 @@ def dlt_config_literal_to_type(arg_name: str) -> AnyType:
# set annotations so they are present in __dict__
fields["__annotations__"] = annotations
# synthesize type
T: Type[BaseConfiguration] = type(name, (BaseConfiguration,), fields)
T: Type[BaseConfiguration] = type(name, (base,), fields)
SPEC = configspec()(T)
# add to the module
setattr(module, spec_id, SPEC)
Expand Down
Loading

0 comments on commit f7ec07c

Please sign in to comment.