-
Notifications
You must be signed in to change notification settings - Fork 199
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
data sink destination #891
Conversation
✅ Deploy Preview for dlt-hub-docs canceled.
|
9056025
to
9947a62
Compare
try: | ||
return json.loads(value) | ||
except Exception: | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should raise ValueError I suppose? so the calling code knows that value
cannot be coerced?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe JSON DecodeError is better for debugging?
dlt/destinations/decorators.py
Outdated
|
||
def sink(loader_file_format: TLoaderFileFormat = None, batch_size: int = 10) -> Any: | ||
def decorator(f: TSinkCallable) -> TDestinationReferenceArg: | ||
return _sink(credentials=f, loader_file_format=loader_file_format, batch_size=batch_size) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO this is the best place to extract the destination name and clean it up the same way we extract source/resource name in the relevant decorator?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
another thing we should solve is to pass credentials and additional parameters to the sing function. we can reuse dlt.transformer decorator does exactly what we want here (wraps callable in another callable where only additional parameters are visible and uses with_config
). we can take a look at this together or do this is another PR after merging this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what you mean with cleaning up the name? Do you mean extract it from the callable name or something like that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, i added it so it will take a name param or use the callable name if possible
dlt/destinations/impl/sink/sink.py
Outdated
|
||
|
||
# TODO: implement proper state storage somewhere, can this somehow go into the loadpackage? | ||
job_execution_storage: Dict[str, int] = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO best would be to use existing pipeline state to keep it. exactly like we handle the source
state. you may extend pipeline.py
in common (pipeline_state
) method. there are a few difficulties though:
- state should be cleared when a new package starts to load
- we actually must persist state after each batch. right now state stays in memory and is written at the end of the context manager in the pipeline.
another problem with the state is that does not implement state sync interface. so if working directory is wiped, all the state is lost (source and destination). this will prevent incremental loading from working.
btw. we could do state sync as another plugin. if specified state and schemas would be stored and restored via plugin. we could use buckets or some KV databases like dynamo db or wrap any destination as such plugin (for destinations that support state sync)
does sink work with staging destination?
dlt/destinations/impl/sink/sink.py
Outdated
self._schema = schema | ||
self._job_execution_storage = job_execution_storage | ||
|
||
# TODO: is this the correct way to tell dlt to retry this job in the next attempt? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
read the docstrings for LoadJob. particular instance will not be retried. "retry" is a terminal state for the LoadJob
class. loader.py will create a new instance of SinkLoadJob will be created
dlt/common/pipeline.py
Outdated
|
||
|
||
def commit_pipeline_state() -> None: | ||
container = Container() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make sure this code is thread safe! sinks are called from many threads and work in parallel
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i did this in the new code in the place where the context is created, do you think we should create the lock here?
dlt/destinations/impl/sink/sink.py
Outdated
self._state = "completed" | ||
except Exception as e: | ||
self._state = "retry" | ||
raise e | ||
finally: | ||
# save progress | ||
commit_pipeline_state() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmmm this looks good enough. better would be to save state after each batch.
dlt/destinations/impl/sink/sink.py
Outdated
@@ -193,23 +144,21 @@ def update_stored_schema( | |||
return super().update_stored_schema(only_tables, expected_update) | |||
|
|||
def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob: | |||
load_state = destination_state().setdefault(load_id, {}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
set default is not thread safe. but there won't be anyone else accessing this key so OK
8b3da5b
to
df5d025
Compare
@@ -14,7 +14,7 @@ def name(self) -> str: | |||
|
|||
def _look_vault(self, full_key: str, hint: type) -> str: | |||
"""Get Airflow Variable with given `full_key`, return None if not found""" | |||
|
|||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops I for the moment thought that I committed this in my PR
df5d025
to
0657034
Compare
dlt/load/load.py
Outdated
@@ -556,12 +566,29 @@ def run(self, pool: Optional[Executor]) -> TRunMetrics: | |||
schema = self.load_storage.normalized_packages.load_schema(load_id) | |||
logger.info(f"Loaded schema name {schema.name} and version {schema.stored_version}") | |||
|
|||
# prepare load package state context |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i am injecting the load package state context here, i think it makes the most sense but it also clutters the load class a bit, wdyt? For now we only use the load package state for the destinations, so there is no need to implement this any other stage than load.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now when I look at this I think we do not need to inject state at all, what we could inject is load id and instance of the package storage as a context available to a function decorated with sink. we can do that later.
@rudolfix you can have another look at this PR. The main questions are:
|
dlt/common/storages/load_package.py
Outdated
) | ||
return cast(DictStrAny, json.loads(state)) | ||
except FileNotFoundError: | ||
return {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is logging details is useful for debugging?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, this just means that there is no statefile yet
try: | ||
return json.loads(value) | ||
except Exception: | ||
raise ValueError("Cannot load text as json for complex type") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we make the message more clear because for me "Cannot" sounds like "we do not support", maybe "were unable to load text as json" or "Was unable to load text as json" or better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the pattern is to raise ValueError(value). you'll get the error message from exception.context
dlt/destinations/impl/sink/sink.py
Outdated
file_path: str, | ||
config: SinkClientConfiguration, | ||
schema: Schema, | ||
load_package_state: Dict[str, int], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this have TLoadPackageState
alias instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, but i renamed it to make it more clear what is is :)
|
||
```python | ||
@dlt.sink(batch_size=10, loader_file_format="jsonl", name="my_sink") | ||
def sink(items: TDataItems, table: TTableSchema) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add a comment clarifying where users can import our internal types from?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can see that a couple of lines above this example
if expect_filtered_null_columns: | ||
for key, expected in expected_rows.items(): | ||
if expected is None: | ||
assert db_mapping.get(key, None) is None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe better to check that key is not in key not in db_mapping
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, the key could be there and set to none and that would also be ok i would say.
|
||
"""A section of state that does not participate in change merging and version control""" | ||
destinations: NotRequired[Dict[str, Dict[str, Any]]] | ||
"""private space for destinations to store state relevant only to the load package""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this style of docstrings called?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i have no idea, i took this from the way it is done in other places @rudolfix is this some convention?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is unofficial convention of adding docstrings to fields. I think there's no PEP for that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic LGTM! I think we have basics covered.
- Should we rename sink to something else? Alex is using "publisher", we could also use "dlt.destination" but maybe that would be too big for a name?
- could we add
dlt.current.destination_state()
and dlt.current.load_state()` that could be used in the decorated function. It may make sense to keep some state or be able to inspect full schema etc. - I will probably try wrap a sink function so additional configuration (ie. credentials) can be passed to it. IMO we can merge into destination factory better than making credentials a callable
try: | ||
return json.loads(value) | ||
except Exception: | ||
raise ValueError("Cannot load text as json for complex type") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the pattern is to raise ValueError(value). you'll get the error message from exception.context
dlt/common/pipeline.py
Outdated
@@ -599,6 +602,28 @@ class StateInjectableContext(ContainerInjectableContext): | |||
def __init__(self, state: TPipelineState = None) -> None: ... | |||
|
|||
|
|||
@configspec | |||
class LoadPackageStateInjectableContext(ContainerInjectableContext): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cool but why here? it is a part of package storage. and it is a big 👍 that it is decoupled from pipeline (it will help me to handle bad data for example)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i moved it, but i am not 100% that it is in the right location now..
dlt/common/storages/load_package.py
Outdated
|
||
|
||
class TLoadPackageState(TVersionedState, total=False): | ||
created: int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmm we use ISO timestamps everywhere
dlt/common/storages/load_package.py
Outdated
"""Timestamp when the loadpackage was created""" | ||
|
||
"""A section of state that does not participate in change merging and version control""" | ||
destinations: NotRequired[Dict[str, Dict[str, Any]]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a load package will be processed by just one destination. the processing is destructive (we move files around, create followup jobs etc.) so I assume we just need `destination_state which is a dict?
dlt/destinations/impl/sink/sink.py
Outdated
if start_batch > 0: | ||
start_batch -= 1 | ||
continue | ||
batch = record_batch.to_pylist() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmmmm. IMO just yield arrow here. Alex for example is pushing panda frames to salesforce
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
dlt/destinations/impl/sink/sink.py
Outdated
|
||
def complete_load(self, load_id: str) -> None: | ||
# pop all state for this load on success | ||
clear_loadpackage_destination_state() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO it is better if we do not delete it. You can use it ie. to inspect how many rows were processed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
alright, changed!
…sink_decorator # Conflicts: # dlt/pipeline/current.py
@rudolfix i implemented all the changes, since puae-jsonl files are also stored with the .jsonl extension, i had to fix a place in the loader which does not seem super nice. If we wanted to do this properly I think we would have puae-json (the current format but renamed) and puae-jsonl (new format with individually encoded lines) and save them with a different exension then jsonl, and it would also be nice to have an abstracted reader class (similar to the write) where you just stick a file path and it emits rows or batches of rows, so that this decoding code is not in the sink destination. But we could do a later ticket for that, not sure. |
97b0eac
to
105569a
Compare
@rudolfix there were no more comments here since my last changes, did you forget to publish your review maybe? Since you mentioned yesterday that you'd like to have some kind of callbacks for start and end load, i also had a suggestion for special sentinels in the sink function. We also make the decorator work for classes that then need to implement a certain interface where we have class methods for start and end load etc. |
# Conflicts: # dlt/common/storages/load_package.py # dlt/destinations/impl/athena/athena.py # dlt/destinations/impl/bigquery/bigquery.py # dlt/load/load.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is getting so good!
we'll have another round of comments. now I was focused on generating proper config and destination signature
regarding tests:
please implement a BigQuery insert api sink, use #1037 for a reference. you can add this in bigquery specific tests and in examples. what it should do
- accept GcpCredentials and credentials + some default parameters
- insert json in batches
- bigquery should be able to infer the schema
|
||
"""A section of state that does not participate in change merging and version control""" | ||
destinations: NotRequired[Dict[str, Dict[str, Any]]] | ||
"""private space for destinations to store state relevant only to the load package""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is unofficial convention of adding docstrings to fields. I think there's no PEP for that
|
||
# folders to manage load jobs in a single load package | ||
TJobState = Literal["new_jobs", "failed_jobs", "started_jobs", "completed_jobs"] | ||
WORKING_FOLDERS: Set[TJobState] = set(get_args(TJobState)) | ||
TLoadPackageState = Literal["new", "extracted", "normalized", "loaded", "aborted"] | ||
TLoadPackageStatus = Literal["new", "extracted", "normalized", "loaded", "aborted"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why it got changed? this is state, not status. state as in state machine through which package travels
dlt/destinations/decorators.py
Outdated
from dlt.common.utils import get_callable_name | ||
|
||
|
||
def sink( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we agreed to name this destination
right @VioletM ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, this is what the decorator is called when importing it from dlt (see the tests). I can also rename this function though.
dlt/destinations/decorators.py
Outdated
) -> Any: | ||
def decorator(f: TSinkCallable) -> TDestinationReferenceArg: | ||
nonlocal name | ||
if name is None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or something was not pushed or the ticket was not clear. What I'd like to do there is to take SinkClientConfiguration
and use is as a base to synthesize user config.
resource_sections = (known_sections.DESTINATION, name)
# standalone resource will prefer existing section context when resolving config values
# this lets the source to override those values and provide common section for all config values for resources present in that source
# for autogenerated spec do not include defaults
conf_f = with_config(
incr_f,
spec=spec,
sections=resource_sections,
include_defaults=spec is not None,
base_=SinkClientConfiguration # you need to implement this
)
# get spec for wrapped function
SPEC = get_fun_spec(conf_f)
# store the standalone resource information
_DESITNATIONS[f.__qualname__] = SourceInfo(SPEC, f, func_module)
Now what you should have in SPEC:
- your base
- arguments from decorated function that have defaults or dlt.config/dlt.secret.value
- if user uses
credentials
you'll have user credentials automatically
in essence you have destination configuration that works same as other configurations
|
||
|
||
@configspec | ||
class SinkClientCredentials(CredentialsConfiguration): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those are not credentials. User will provide credentials (eventually) in the decorated function. if you need to keep the reference to decorated function, use some other field name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, moved the callable into the regular config
*, | ||
credentials: Union[SinkClientCredentials, TSinkCallable, str] = None, | ||
loader_file_format: TLoaderFileFormat = "puae-jsonl", | ||
batch_size: int = 10, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
file_format and batch_size are part of the decorator, not destination signature, remove
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not quite sure what you mean. When I create a destination in the decorator, I need a way to hand over these values to the destination instance, and in all destinations this kind of stuff is done via the config, no?
dlt/destinations/impl/sink/sink.py
Outdated
try: | ||
if self._config.batch_size == 0: | ||
# on batch size zero we only call the callable with the filename | ||
self.call_callable_with_items(self._file_path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is super good
dlt/destinations/impl/sink/sink.py
Outdated
) == 0, "Batch size was changed during processing of one load package" | ||
|
||
start_batch = start_index / self._config.batch_size | ||
with pyarrow.parquet.ParquetFile(self._file_path) as reader: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mhhhh this looks to me like file_readers
- opposite to our file_writers
. so you can read parquet and jsonl (with pua) / json with them.
makes sense to extract it now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's do a separate PR for this where filetypes are fully pluggable..
dlt/destinations/impl/sink/sink.py
Outdated
for batch in self.run(current_index): | ||
self.call_callable_with_items(batch) | ||
current_index += len(batch) | ||
destination_state[self._storage_id] = current_index |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are you able to store batch number per table in state? (you'll need to lock the state). then user will be able to figure out that a batch is a first batch for a given table. you already store stuff in state
@@ -62,6 +64,7 @@ | |||
"resource", | |||
"transformer", | |||
"defer", | |||
"destination", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please also import load_package()
into current
module so it can be requested from within decorated function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is there, no?
thread_id = int(m.group(1)) | ||
else: | ||
thread_id = threading.get_ident() | ||
# print(threading.currentThread().getName()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not quite sure how this is supposed to work, in any case it will not work to inject multiple configcontexts of the same type in different threads if I keep this in. lmk.
Description
Implementation of #752
Notes: