From 0369496f392d23171fe33e9f7536478fc852bbba Mon Sep 17 00:00:00 2001 From: Dave Date: Wed, 3 Apr 2024 15:09:41 +0200 Subject: [PATCH 01/27] clean some stuff --- dlt/destinations/impl/weaviate/weaviate_client.py | 11 ----------- dlt/destinations/job_client_impl.py | 9 --------- 2 files changed, 20 deletions(-) diff --git a/dlt/destinations/impl/weaviate/weaviate_client.py b/dlt/destinations/impl/weaviate/weaviate_client.py index 6486a75e6e..380f81bd4e 100644 --- a/dlt/destinations/impl/weaviate/weaviate_client.py +++ b/dlt/destinations/impl/weaviate/weaviate_client.py @@ -521,17 +521,6 @@ def get_stored_state(self, pipeline_name: str) -> Optional[StateInfo]: state["dlt_load_id"] = state.pop("_dlt_load_id") return StateInfo(**state) - # def get_stored_states(self, state_table: str) -> List[StateInfo]: - # state_records = self.get_records(state_table, - # sort={ - # "path": ["created_at"], - # "order": "desc" - # }, properties=self.state_properties) - - # for state in state_records: - # state["dlt_load_id"] = state.pop("_dlt_load_id") - # return [StateInfo(**state) for state in state_records] - def get_stored_schema(self) -> Optional[StorageSchemaInfo]: """Retrieves newest schema from destination storage""" try: diff --git a/dlt/destinations/job_client_impl.py b/dlt/destinations/job_client_impl.py index ea0d10d11d..bd2092d525 100644 --- a/dlt/destinations/job_client_impl.py +++ b/dlt/destinations/job_client_impl.py @@ -371,15 +371,6 @@ def get_stored_state(self, pipeline_name: str) -> StateInfo: return None return StateInfo(row[0], row[1], row[2], row[3], pendulum.instance(row[4])) - # def get_stored_states(self, state_table: str) -> List[StateInfo]: - # """Loads list of compressed states from destination storage, optionally filtered by pipeline name""" - # query = f"SELECT {self.STATE_TABLE_COLUMNS} FROM {state_table} AS s ORDER BY created_at DESC" - # result: List[StateInfo] = [] - # with self.sql_client.execute_query(query) as cur: - # for row in cur.fetchall(): - # result.append(StateInfo(row[0], row[1], row[2], row[3], pendulum.instance(row[4]))) - # return result - def get_stored_schema_by_hash(self, version_hash: str) -> StorageSchemaInfo: name = self.sql_client.make_qualified_table_name(self.schema.version_table_name) query = f"SELECT {self.version_table_schema_columns} FROM {name} WHERE version_hash = %s;" From 9a87f0f0e51d882a19bcc9e8bf2e29b2caa77359 Mon Sep 17 00:00:00 2001 From: Dave Date: Wed, 3 Apr 2024 18:19:12 +0200 Subject: [PATCH 02/27] first messy version of filesystem state sync --- dlt/common/destination/reference.py | 1 + .../impl/filesystem/filesystem.py | 112 +++++++++++++++++- dlt/pipeline/pipeline.py | 1 - dlt/pipeline/state_sync.py | 15 ++- fs_testing_pipe.py | 20 ++++ 5 files changed, 137 insertions(+), 12 deletions(-) create mode 100644 fs_testing_pipe.py diff --git a/dlt/common/destination/reference.py b/dlt/common/destination/reference.py index ddcc5d1146..c71532fb70 100644 --- a/dlt/common/destination/reference.py +++ b/dlt/common/destination/reference.py @@ -427,6 +427,7 @@ def get_stored_schema(self) -> Optional[StorageSchemaInfo]: @abstractmethod def get_stored_schema_by_hash(self, version_hash: str) -> StorageSchemaInfo: + """retrieves the stored schema by hash""" pass @abstractmethod diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index 33a597f915..61516d5e30 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -1,9 +1,12 @@ import posixpath import os from types import TracebackType -from typing import ClassVar, List, Type, Iterable, Set, Iterator +from typing import ClassVar, List, Type, Iterable, Set, Iterator, Optional, Tuple from fsspec import AbstractFileSystem from contextlib import contextmanager +from dlt.common import json, pendulum +import base64 +import re from dlt.common import logger from dlt.common.schema import Schema, TSchemaTables, TTableSchema @@ -16,6 +19,10 @@ JobClientBase, FollowupJob, WithStagingDataset, + WithStateSync, + StorageSchemaInfo, + StateInfo, + DoNothingJob ) from dlt.destinations.job_impl import EmptyLoadJob @@ -87,7 +94,7 @@ def create_followup_jobs(self, final_state: TLoadJobState) -> List[NewLoadJob]: return jobs -class FilesystemClient(JobClientBase, WithStagingDataset): +class FilesystemClient(JobClientBase, WithStagingDataset, WithStateSync): """filesystem client storing jobs in memory""" capabilities: ClassVar[DestinationCapabilitiesContext] = capabilities() @@ -167,15 +174,67 @@ def initialize_storage(self, truncate_tables: Iterable[str] = None) -> None: " should be created previously!" ) + def _get_schema_file_name(self, hash: str) -> Tuple[str, str]: + """gets tuple of dir and fullpath for schema file for a given hash""" + dir = f"{self.dataset_path}/{self.schema.version_table_name}" + # remove all special chars from hash + safe_hash = "".join([c for c in hash if re.match(r"\w", c)]) + path = f"{dir}/{self.schema.name}__{safe_hash}.jsonl" + return dir, path + def update_stored_schema( self, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None ) -> TSchemaTables: # create destination dirs for all tables + # TODO we should only create dirs for datatables dirs_to_create = self._get_table_dirs(only_tables or self.schema.tables.keys()) for directory in dirs_to_create: self.fs_client.makedirs(directory, exist_ok=True) + + # get paths + dir, current_path = self._get_schema_file_name("current") + _, hash_path = self._get_schema_file_name(self.schema.stored_version_hash) + + # TODO: duplicate of weaviate implementation, should be abstracted out + version_info = { + "version_hash": self.schema.stored_version_hash, + "schema_name": self.schema.name, + "version": self.schema.version, + "engine_version": self.schema.ENGINE_VERSION, + "inserted_at": pendulum.now().isoformat(), + "schema": json.dumps(self.schema.to_dict()), + } + + # we always keep tabs on what the current schema is + self.fs_client.makedirs(dir, exist_ok=True) # may not be needed.. + self.fs_client.write_text(current_path, json.dumps(version_info), "utf-8") + self.fs_client.write_text(hash_path, json.dumps(version_info), "utf-8") + return expected_update + def get_stored_schema(self) -> Optional[StorageSchemaInfo]: + """Retrieves newest schema from destination storage""" + return self.get_stored_schema_by_hash("current") + + def get_stored_schema_by_hash(self, version_hash: str) -> Optional[StorageSchemaInfo]: + """retrieves the stored schema by hash""" + _, filepath = self._get_schema_file_name(version_hash) + if not self.fs_client.exists(filepath): + return None + schema_info = json.loads(self.fs_client.read_text(filepath)) + schema_info["inserted_at"] = pendulum.parse(schema_info["inserted_at"]) + return StorageSchemaInfo(**schema_info) + + def get_stored_state(self, pipeline_name: str) -> Optional[StateInfo]: + """Loads compressed state from destination storage""" + dir = f"{self.dataset_path}/{self.schema.state_table_name}" + current_file_name = f"{dir}/{pipeline_name}_current.jsonl" + if not self.fs_client.exists(current_file_name): + return None + state_json = json.loads(self.fs_client.read_text(current_file_name)) + state_json.pop("version_hash") + return StateInfo(**state_json) + def _get_table_dirs(self, table_names: Iterable[str]) -> Set[str]: """Gets unique directories where table data is stored.""" table_dirs: Set[str] = set() @@ -192,6 +251,11 @@ def is_storage_initialized(self) -> bool: return self.fs_client.isdir(self.dataset_path) # type: ignore[no-any-return] def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob: + + # do not load state file the regular way + if table["name"] == self.schema.state_table_name: + return DoNothingJob(file_path) + cls = FollowupFilesystemJob if self.config.as_staging else LoadFilesystemJob return cls( file_path, @@ -205,10 +269,46 @@ def restore_file_load(self, file_path: str) -> LoadJob: return EmptyLoadJob.from_file_path(file_path, "completed") def complete_load(self, load_id: str) -> None: - schema_name = self.schema.name - table_name = self.schema.loads_table_name - file_name = f"{schema_name}.{table_name}.{load_id}" - self.fs_client.touch(posixpath.join(self.dataset_path, file_name)) + + # write entry to state "table" + from dlt import current + from dlt.pipeline.state_sync import state_doc + + # get the state from the current pipeline + pipeline = current.pipeline() + state = pipeline._get_state() + doc = state_doc(state) + + # convert pendulum now to iso timestamp + doc["created_at"] = doc["created_at"].isoformat() + + dir = f"{self.dataset_path}/{self.schema.state_table_name}" + safe_hash = "".join([c for c in doc["version_hash"] if re.match(r"\w", c)]) + hash_file_name = f"{dir}/{pipeline.pipeline_name}_{safe_hash}.jsonl" + current_file_name = f"{dir}/{pipeline.pipeline_name}_current.jsonl" + + self.fs_client.makedirs(dir, exist_ok=True) # may not be needed.. + self.fs_client.write_text(hash_file_name, json.dumps(doc), "utf-8") + self.fs_client.write_text(current_file_name, json.dumps(doc), "utf-8") + + # write entry to load "table" + dir = f"{self.dataset_path}/{self.schema.loads_table_name}" + file_name = f"{self.schema.name}.{load_id}" + filepath = f"{dir}/{file_name}" + + # TODO: this is also duplicate across all destinations. DRY this. + load_data = { + "load_id": load_id, + "schema_name": self.schema.name, + "status": 0, + "inserted_at": pendulum.now().isoformat(), + "schema_version_hash": self.schema.version_hash, + } + + self.fs_client.makedirs(dir, exist_ok=True) # may not be needed.. + self.fs_client.write_text(filepath, json.dumps(load_data), "utf-8") + + def __enter__(self) -> "FilesystemClient": return self diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index de1f7afced..c46e012ab5 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -713,7 +713,6 @@ def sync_destination( remote_state["schema_names"], always_download=True ) # TODO: we should probably wipe out pipeline here - # if we didn't full refresh schemas, get only missing schemas if restored_schemas is None: restored_schemas = self._get_schemas_from_destination( diff --git a/dlt/pipeline/state_sync.py b/dlt/pipeline/state_sync.py index 5366b9c46d..c5f2448d61 100644 --- a/dlt/pipeline/state_sync.py +++ b/dlt/pipeline/state_sync.py @@ -115,11 +115,11 @@ def migrate_pipeline_state( return cast(TPipelineState, state) -def state_resource(state: TPipelineState) -> DltResource: - state = copy(state) - state.pop("_local") +def state_doc(state: TPipelineState) -> DictStrAny: + doc = copy(state) + doc.pop("_local") state_str = compress_state(state) - state_doc = { + doc = { "version": state["_state_version"], "engine_version": state["_state_engine_version"], "pipeline_name": state["pipeline_name"], @@ -127,8 +127,13 @@ def state_resource(state: TPipelineState) -> DltResource: "created_at": pendulum.now(), "version_hash": state["_version_hash"], } + return doc + + +def state_resource(state: TPipelineState) -> DltResource: + doc = state_doc(state) return dlt.resource( - [state_doc], name=STATE_TABLE_NAME, write_disposition="append", columns=STATE_TABLE_COLUMNS + [doc], name=STATE_TABLE_NAME, write_disposition="append", columns=STATE_TABLE_COLUMNS ) diff --git a/fs_testing_pipe.py b/fs_testing_pipe.py new file mode 100644 index 0000000000..a3d3275f71 --- /dev/null +++ b/fs_testing_pipe.py @@ -0,0 +1,20 @@ +import dlt +import os + +if __name__ == "__main__": + os.environ["DESTINATION__FILESYSTEM__BUCKET_URL"] = "file://my_files" + os.environ["DATA_WRITER__DISABLE_COMPRESSION"] = "TRUE" + + # resource with incremental for testing restoring of pipeline state + @dlt.resource(name="my_table") + def my_resouce(id=dlt.sources.incremental("id")): + yield from [ + {"id": 1}, + {"id": 2}, + {"id": 3}, + {"id": 4}, + {"id": 5} + ] + + pipe = dlt.pipeline(pipeline_name="dave", destination="filesystem") + pipe.run(my_resouce(), table_name="my_table") #, loader_file_format="parquet") From f6d5c9ccd451074a9c12680267e9674e652c2836 Mon Sep 17 00:00:00 2001 From: Dave Date: Wed, 3 Apr 2024 18:42:38 +0200 Subject: [PATCH 03/27] clean up a bit --- .../impl/filesystem/filesystem.py | 190 ++++++++++-------- 1 file changed, 105 insertions(+), 85 deletions(-) diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index 61516d5e30..388ed3fc69 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -5,7 +5,8 @@ from fsspec import AbstractFileSystem from contextlib import contextmanager from dlt.common import json, pendulum -import base64 +from dlt.common.typing import DictStrAny + import re from dlt.common import logger @@ -22,7 +23,7 @@ WithStateSync, StorageSchemaInfo, StateInfo, - DoNothingJob + DoNothingJob, ) from dlt.destinations.job_impl import EmptyLoadJob @@ -174,14 +175,6 @@ def initialize_storage(self, truncate_tables: Iterable[str] = None) -> None: " should be created previously!" ) - def _get_schema_file_name(self, hash: str) -> Tuple[str, str]: - """gets tuple of dir and fullpath for schema file for a given hash""" - dir = f"{self.dataset_path}/{self.schema.version_table_name}" - # remove all special chars from hash - safe_hash = "".join([c for c in hash if re.match(r"\w", c)]) - path = f"{dir}/{self.schema.name}__{safe_hash}.jsonl" - return dir, path - def update_stored_schema( self, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None ) -> TSchemaTables: @@ -191,50 +184,11 @@ def update_stored_schema( for directory in dirs_to_create: self.fs_client.makedirs(directory, exist_ok=True) - # get paths - dir, current_path = self._get_schema_file_name("current") - _, hash_path = self._get_schema_file_name(self.schema.stored_version_hash) - - # TODO: duplicate of weaviate implementation, should be abstracted out - version_info = { - "version_hash": self.schema.stored_version_hash, - "schema_name": self.schema.name, - "version": self.schema.version, - "engine_version": self.schema.ENGINE_VERSION, - "inserted_at": pendulum.now().isoformat(), - "schema": json.dumps(self.schema.to_dict()), - } - - # we always keep tabs on what the current schema is - self.fs_client.makedirs(dir, exist_ok=True) # may not be needed.. - self.fs_client.write_text(current_path, json.dumps(version_info), "utf-8") - self.fs_client.write_text(hash_path, json.dumps(version_info), "utf-8") + # write schema to destination + self.store_current_schema() return expected_update - def get_stored_schema(self) -> Optional[StorageSchemaInfo]: - """Retrieves newest schema from destination storage""" - return self.get_stored_schema_by_hash("current") - - def get_stored_schema_by_hash(self, version_hash: str) -> Optional[StorageSchemaInfo]: - """retrieves the stored schema by hash""" - _, filepath = self._get_schema_file_name(version_hash) - if not self.fs_client.exists(filepath): - return None - schema_info = json.loads(self.fs_client.read_text(filepath)) - schema_info["inserted_at"] = pendulum.parse(schema_info["inserted_at"]) - return StorageSchemaInfo(**schema_info) - - def get_stored_state(self, pipeline_name: str) -> Optional[StateInfo]: - """Loads compressed state from destination storage""" - dir = f"{self.dataset_path}/{self.schema.state_table_name}" - current_file_name = f"{dir}/{pipeline_name}_current.jsonl" - if not self.fs_client.exists(current_file_name): - return None - state_json = json.loads(self.fs_client.read_text(current_file_name)) - state_json.pop("version_hash") - return StateInfo(**state_json) - def _get_table_dirs(self, table_names: Iterable[str]) -> Set[str]: """Gets unique directories where table data is stored.""" table_dirs: Set[str] = set() @@ -251,8 +205,8 @@ def is_storage_initialized(self) -> bool: return self.fs_client.isdir(self.dataset_path) # type: ignore[no-any-return] def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob: - - # do not load state file the regular way + # skip the state table, we create a jsonl file in the complete_load + # step if table["name"] == self.schema.state_table_name: return DoNothingJob(file_path) @@ -268,34 +222,31 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> def restore_file_load(self, file_path: str) -> LoadJob: return EmptyLoadJob.from_file_path(file_path, "completed") - def complete_load(self, load_id: str) -> None: + def __enter__(self) -> "FilesystemClient": + return self - # write entry to state "table" - from dlt import current - from dlt.pipeline.state_sync import state_doc + def __exit__( + self, exc_type: Type[BaseException], exc_val: BaseException, exc_tb: TracebackType + ) -> None: + pass - # get the state from the current pipeline - pipeline = current.pipeline() - state = pipeline._get_state() - doc = state_doc(state) + def should_load_data_to_staging_dataset(self, table: TTableSchema) -> bool: + return False - # convert pendulum now to iso timestamp - doc["created_at"] = doc["created_at"].isoformat() - - dir = f"{self.dataset_path}/{self.schema.state_table_name}" - safe_hash = "".join([c for c in doc["version_hash"] if re.match(r"\w", c)]) - hash_file_name = f"{dir}/{pipeline.pipeline_name}_{safe_hash}.jsonl" - current_file_name = f"{dir}/{pipeline.pipeline_name}_current.jsonl" + # + # state stuff + # - self.fs_client.makedirs(dir, exist_ok=True) # may not be needed.. - self.fs_client.write_text(hash_file_name, json.dumps(doc), "utf-8") - self.fs_client.write_text(current_file_name, json.dumps(doc), "utf-8") + def _write_to_json_file(self, filepath: str, data: DictStrAny) -> None: + dirname = os.path.dirname(filepath) + self.fs_client.makedirs(dirname, exist_ok=True) + self.fs_client.write_text(filepath, json.dumps(data), "utf-8") - # write entry to load "table" - dir = f"{self.dataset_path}/{self.schema.loads_table_name}" - file_name = f"{self.schema.name}.{load_id}" - filepath = f"{dir}/{file_name}" + def complete_load(self, load_id: str) -> None: + # store current state + self.store_current_state() + # write entry to load "table" # TODO: this is also duplicate across all destinations. DRY this. load_data = { "load_id": load_id, @@ -304,19 +255,88 @@ def complete_load(self, load_id: str) -> None: "inserted_at": pendulum.now().isoformat(), "schema_version_hash": self.schema.version_hash, } + filepath = ( + f"{self.dataset_path}/{self.schema.loads_table_name}/{self.schema.name}.{load_id}.jsonl" + ) - self.fs_client.makedirs(dir, exist_ok=True) # may not be needed.. - self.fs_client.write_text(filepath, json.dumps(load_data), "utf-8") + self._write_to_json_file(filepath, load_data) + # + # state read/write + # + def _get_state_file_name(self, pipeline_name: str, hash: str) -> Tuple[str, str]: + """gets tuple of dir and fullpath for schema file for a given hash""" + safe_hash = "".join( + [c for c in hash if re.match(r"\w", c)] + ) # remove all special chars from hash + return ( + f"{self.dataset_path}/{self.schema.state_table_name}/{pipeline_name}__{safe_hash}.jsonl" + ) - def __enter__(self) -> "FilesystemClient": - return self + def store_current_state(self) -> None: + # get state doc from current pipeline + from dlt import current + from dlt.pipeline.state_sync import state_doc - def __exit__( - self, exc_type: Type[BaseException], exc_val: BaseException, exc_tb: TracebackType - ) -> None: - pass + pipeline = current.pipeline() + state = pipeline._get_state() + doc = state_doc(state) - def should_load_data_to_staging_dataset(self, table: TTableSchema) -> bool: - return False + # get paths + current_path = self._get_state_file_name(pipeline.pipeline_name, "current") + hash_path = self._get_state_file_name( + pipeline.pipeline_name, self.schema.stored_version_hash + ) + + # write + self._write_to_json_file(current_path, doc) + self._write_to_json_file(hash_path, doc) + + def get_stored_state(self, pipeline_name: str) -> Optional[StateInfo]: + """Loads compressed state from destination storage""" + file_name = self._get_state_file_name(pipeline_name, "current") + if self.fs_client.exists(file_name): + state_json = json.loads(self.fs_client.read_text(file_name)) + state_json.pop("version_hash") + return StateInfo(**state_json) + + # + # Schema read/write + # + + def _get_schema_file_name(self, hash: str) -> Tuple[str, str]: + """gets tuple of dir and fullpath for schema file for a given hash""" + safe_hash = "".join( + [c for c in hash if re.match(r"\w", c)] + ) # remove all special chars from hash + return f"{self.dataset_path}/{self.schema.version_table_name}/{self.schema.name}__{safe_hash}.jsonl" + + def get_stored_schema(self) -> Optional[StorageSchemaInfo]: + """Retrieves newest schema from destination storage""" + return self.get_stored_schema_by_hash("current") + + def get_stored_schema_by_hash(self, version_hash: str) -> Optional[StorageSchemaInfo]: + """retrieves the stored schema by hash""" + filepath = self._get_schema_file_name(version_hash) + if self.fs_client.exists(filepath): + return StorageSchemaInfo(**json.loads(self.fs_client.read_text(filepath))) + + def store_current_schema(self) -> None: + # get paths + current_path = self._get_schema_file_name("current") + hash_path = self._get_schema_file_name(self.schema.stored_version_hash) + + # TODO: duplicate of weaviate implementation, should be abstracted out + version_info = { + "version_hash": self.schema.stored_version_hash, + "schema_name": self.schema.name, + "version": self.schema.version, + "engine_version": self.schema.ENGINE_VERSION, + "inserted_at": pendulum.now(), + "schema": json.dumps(self.schema.to_dict()), + } + + # we always keep tabs on what the current schema is + self._write_to_json_file(current_path, version_info) + self._write_to_json_file(hash_path, version_info) From d58a38b17bfd0c440f1e0a72addf5ce6ff368df1 Mon Sep 17 00:00:00 2001 From: Dave Date: Thu, 4 Apr 2024 11:01:34 +0200 Subject: [PATCH 04/27] fix bug in state sync --- dlt/pipeline/state_sync.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dlt/pipeline/state_sync.py b/dlt/pipeline/state_sync.py index c5f2448d61..d4f87cd765 100644 --- a/dlt/pipeline/state_sync.py +++ b/dlt/pipeline/state_sync.py @@ -116,8 +116,8 @@ def migrate_pipeline_state( def state_doc(state: TPipelineState) -> DictStrAny: - doc = copy(state) - doc.pop("_local") + state = copy(state) + state.pop("_local") state_str = compress_state(state) doc = { "version": state["_state_version"], From 2913c33a79c418d750249b00f304800a014cd86b Mon Sep 17 00:00:00 2001 From: Dave Date: Thu, 4 Apr 2024 12:12:22 +0200 Subject: [PATCH 05/27] enable state tests for all bucket providers --- .../impl/filesystem/filesystem.py | 31 +++++++++------ tests/load/pipeline/test_restore_state.py | 38 +++++++++++++------ 2 files changed, 47 insertions(+), 22 deletions(-) diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index 388ed3fc69..4d371e6ac5 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -25,7 +25,7 @@ StateInfo, DoNothingJob, ) - +from dlt.common.destination.exceptions import DestinationUndefinedEntity from dlt.destinations.job_impl import EmptyLoadJob from dlt.destinations.impl.filesystem import capabilities from dlt.destinations.impl.filesystem.configuration import FilesystemDestinationClientConfiguration @@ -179,10 +179,13 @@ def update_stored_schema( self, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None ) -> TSchemaTables: # create destination dirs for all tables - # TODO we should only create dirs for datatables - dirs_to_create = self._get_table_dirs(only_tables or self.schema.tables.keys()) - for directory in dirs_to_create: + table_names = only_tables or self.schema.tables.keys() + dirs_to_create = self._get_table_dirs(table_names) + for tables_name, directory in zip(table_names, dirs_to_create): self.fs_client.makedirs(directory, exist_ok=True) + # we need to mark the folders of the data tables as initialized + if tables_name in self.schema.dlt_table_names(): + self.fs_client.touch(f"{directory}/init") # write schema to destination self.store_current_schema() @@ -205,8 +208,7 @@ def is_storage_initialized(self) -> bool: return self.fs_client.isdir(self.dataset_path) # type: ignore[no-any-return] def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob: - # skip the state table, we create a jsonl file in the complete_load - # step + # skip the state table, we create a jsonl file in the complete_load step if table["name"] == self.schema.state_table_name: return DoNothingJob(file_path) @@ -238,8 +240,6 @@ def should_load_data_to_staging_dataset(self, table: TTableSchema) -> bool: # def _write_to_json_file(self, filepath: str, data: DictStrAny) -> None: - dirname = os.path.dirname(filepath) - self.fs_client.makedirs(dirname, exist_ok=True) self.fs_client.write_text(filepath, json.dumps(data), "utf-8") def complete_load(self, load_id: str) -> None: @@ -294,10 +294,15 @@ def store_current_state(self) -> None: self._write_to_json_file(hash_path, doc) def get_stored_state(self, pipeline_name: str) -> Optional[StateInfo]: + # raise if dir not initialized + filepath = self._get_state_file_name(pipeline_name, "current") + dirname = os.path.dirname(filepath) + if not self.fs_client.isdir(dirname): + raise DestinationUndefinedEntity({"dir": dirname}) + """Loads compressed state from destination storage""" - file_name = self._get_state_file_name(pipeline_name, "current") - if self.fs_client.exists(file_name): - state_json = json.loads(self.fs_client.read_text(file_name)) + if self.fs_client.exists(filepath): + state_json = json.loads(self.fs_client.read_text(filepath)) state_json.pop("version_hash") return StateInfo(**state_json) @@ -319,6 +324,10 @@ def get_stored_schema(self) -> Optional[StorageSchemaInfo]: def get_stored_schema_by_hash(self, version_hash: str) -> Optional[StorageSchemaInfo]: """retrieves the stored schema by hash""" filepath = self._get_schema_file_name(version_hash) + # raise if dir not initialized + dirname = os.path.dirname(filepath) + if not self.fs_client.isdir(dirname): + raise DestinationUndefinedEntity({"dir": dirname}) if self.fs_client.exists(filepath): return StorageSchemaInfo(**json.loads(self.fs_client.read_text(filepath))) diff --git a/tests/load/pipeline/test_restore_state.py b/tests/load/pipeline/test_restore_state.py index e50654adcc..3bea83f949 100644 --- a/tests/load/pipeline/test_restore_state.py +++ b/tests/load/pipeline/test_restore_state.py @@ -42,7 +42,10 @@ def duckdb_pipeline_location() -> None: @pytest.mark.parametrize( "destination_config", destinations_configs( - default_staging_configs=True, default_sql_configs=True, default_vector_configs=True + default_staging_configs=True, + default_sql_configs=True, + default_vector_configs=True, + all_buckets_filesystem_configs=True, ), ids=lambda x: x.name, ) @@ -61,8 +64,9 @@ def test_restore_state_utils(destination_config: DestinationTestConfiguration) - load_pipeline_state_from_destination(p.pipeline_name, job_client) # sync the schema p.sync_schema() - exists, _ = job_client.get_storage_table(schema.version_table_name) - assert exists is True + # check if schema exists + stored_schema = job_client.get_stored_schema() + assert stored_schema is not None # dataset exists, still no table with pytest.raises(DestinationUndefinedEntity): load_pipeline_state_from_destination(p.pipeline_name, job_client) @@ -85,8 +89,8 @@ def test_restore_state_utils(destination_config: DestinationTestConfiguration) - # then in database. parquet is created in schema order and in Redshift it must exactly match the order. # schema.bump_version() p.sync_schema() - exists, _ = job_client.get_storage_table(schema.state_table_name) - assert exists is True + stored_schema = job_client.get_stored_schema() + assert stored_schema is not None # table is there but no state assert load_pipeline_state_from_destination(p.pipeline_name, job_client) is None # extract state @@ -179,7 +183,9 @@ def test_silently_skip_on_invalid_credentials( @pytest.mark.parametrize( "destination_config", - destinations_configs(default_sql_configs=True, default_vector_configs=True), + destinations_configs( + default_sql_configs=True, default_vector_configs=True, all_buckets_filesystem_configs=True + ), ids=lambda x: x.name, ) @pytest.mark.parametrize("use_single_dataset", [True, False]) @@ -262,7 +268,9 @@ def _make_dn_name(schema_name: str) -> str: @pytest.mark.parametrize( "destination_config", - destinations_configs(default_sql_configs=True, default_vector_configs=True), + destinations_configs( + default_sql_configs=True, default_vector_configs=True, all_buckets_filesystem_configs=True + ), ids=lambda x: x.name, ) def test_restore_state_pipeline(destination_config: DestinationTestConfiguration) -> None: @@ -386,7 +394,9 @@ def some_data(): @pytest.mark.parametrize( "destination_config", - destinations_configs(default_sql_configs=True, default_vector_configs=True), + destinations_configs( + default_sql_configs=True, default_vector_configs=True, all_buckets_filesystem_configs=True + ), ids=lambda x: x.name, ) def test_ignore_state_unfinished_load(destination_config: DestinationTestConfiguration) -> None: @@ -416,7 +426,9 @@ def complete_package_mock(self, load_id: str, schema: Schema, aborted: bool = Fa @pytest.mark.parametrize( "destination_config", - destinations_configs(default_sql_configs=True, default_vector_configs=True), + destinations_configs( + default_sql_configs=True, default_vector_configs=True, all_buckets_filesystem_configs=True + ), ids=lambda x: x.name, ) def test_restore_schemas_while_import_schemas_exist( @@ -502,7 +514,9 @@ def test_restore_change_dataset_and_destination(destination_name: str) -> None: @pytest.mark.parametrize( "destination_config", - destinations_configs(default_sql_configs=True, default_vector_configs=True), + destinations_configs( + default_sql_configs=True, default_vector_configs=True, all_buckets_filesystem_configs=True + ), ids=lambda x: x.name, ) def test_restore_state_parallel_changes(destination_config: DestinationTestConfiguration) -> None: @@ -608,7 +622,9 @@ def some_data(param: str) -> Any: @pytest.mark.parametrize( "destination_config", - destinations_configs(default_sql_configs=True, default_vector_configs=True), + destinations_configs( + default_sql_configs=True, default_vector_configs=True, all_buckets_filesystem_configs=True + ), ids=lambda x: x.name, ) def test_reset_pipeline_on_deleted_dataset( From e32ad957da4b3ea1a61ad2bf7dc7f03bc43a6318 Mon Sep 17 00:00:00 2001 From: Dave Date: Thu, 4 Apr 2024 13:21:35 +0200 Subject: [PATCH 06/27] do not store state to uninitialized dataset folders --- dlt/destinations/impl/filesystem/filesystem.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index 4d371e6ac5..fb01fde6c2 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -196,9 +196,13 @@ def _get_table_dirs(self, table_names: Iterable[str]) -> Set[str]: """Gets unique directories where table data is stored.""" table_dirs: Set[str] = set() for table_name in table_names: - table_prefix = self.table_prefix_layout.format( - schema_name=self.schema.name, table_name=table_name - ) + # dlt tables do not respect layout (for now) + if table_name in self.schema.dlt_table_names(): + table_prefix = posixpath.join(table_name, "") + else: + table_prefix = self.table_prefix_layout.format( + schema_name=self.schema.name, table_name=table_name + ) destination_dir = posixpath.join(self.dataset_path, table_prefix) # extract the path component table_dirs.add(os.path.dirname(destination_dir)) @@ -240,6 +244,9 @@ def should_load_data_to_staging_dataset(self, table: TTableSchema) -> bool: # def _write_to_json_file(self, filepath: str, data: DictStrAny) -> None: + dirname = os.path.dirname(filepath) + if not self.fs_client.isdir(dirname): + return self.fs_client.write_text(filepath, json.dumps(data), "utf-8") def complete_load(self, load_id: str) -> None: From cd21ff652270cd01e2bb4028b88c4adbbb83aaae Mon Sep 17 00:00:00 2001 From: Dave Date: Thu, 4 Apr 2024 13:34:33 +0200 Subject: [PATCH 07/27] fix linter errors --- .../impl/filesystem/filesystem.py | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index fb01fde6c2..e5407c7972 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -1,7 +1,7 @@ import posixpath import os from types import TracebackType -from typing import ClassVar, List, Type, Iterable, Set, Iterator, Optional, Tuple +from typing import ClassVar, List, Type, Iterable, Set, Iterator, Optional from fsspec import AbstractFileSystem from contextlib import contextmanager from dlt.common import json, pendulum @@ -272,10 +272,10 @@ def complete_load(self, load_id: str) -> None: # state read/write # - def _get_state_file_name(self, pipeline_name: str, hash: str) -> Tuple[str, str]: - """gets tuple of dir and fullpath for schema file for a given hash""" + def _get_state_file_name(self, pipeline_name: str, version_hash: str) -> str: + """gets full path for schema file for a given hash""" safe_hash = "".join( - [c for c in hash if re.match(r"\w", c)] + [c for c in version_hash if re.match(r"\w", c)] ) # remove all special chars from hash return ( f"{self.dataset_path}/{self.schema.state_table_name}/{pipeline_name}__{safe_hash}.jsonl" @@ -286,7 +286,7 @@ def store_current_state(self) -> None: from dlt import current from dlt.pipeline.state_sync import state_doc - pipeline = current.pipeline() + pipeline = current.pipeline() # type: ignore state = pipeline._get_state() doc = state_doc(state) @@ -313,14 +313,16 @@ def get_stored_state(self, pipeline_name: str) -> Optional[StateInfo]: state_json.pop("version_hash") return StateInfo(**state_json) + return None + # # Schema read/write # - def _get_schema_file_name(self, hash: str) -> Tuple[str, str]: - """gets tuple of dir and fullpath for schema file for a given hash""" + def _get_schema_file_name(self, version_hash: str) -> str: + """gets full path for schema file for a given hash""" safe_hash = "".join( - [c for c in hash if re.match(r"\w", c)] + [c for c in version_hash if re.match(r"\w", c)] ) # remove all special chars from hash return f"{self.dataset_path}/{self.schema.version_table_name}/{self.schema.name}__{safe_hash}.jsonl" @@ -338,6 +340,8 @@ def get_stored_schema_by_hash(self, version_hash: str) -> Optional[StorageSchema if self.fs_client.exists(filepath): return StorageSchemaInfo(**json.loads(self.fs_client.read_text(filepath))) + return None + def store_current_schema(self) -> None: # get paths current_path = self._get_schema_file_name("current") From 6b7c16d6c6d8d4d687090485930e0dcca4c7ee48 Mon Sep 17 00:00:00 2001 From: Dave Date: Thu, 4 Apr 2024 15:05:03 +0200 Subject: [PATCH 08/27] get current pipeline from pipeline context --- dlt/destinations/impl/filesystem/filesystem.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index e5407c7972..ba1077d678 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -283,11 +283,12 @@ def _get_state_file_name(self, pipeline_name: str, version_hash: str) -> str: def store_current_state(self) -> None: # get state doc from current pipeline - from dlt import current + from dlt.common.configuration.container import Container + from dlt.common.pipeline import PipelineContext from dlt.pipeline.state_sync import state_doc - pipeline = current.pipeline() # type: ignore - state = pipeline._get_state() + pipeline = Container()[PipelineContext].pipeline() + state = pipeline.state doc = state_doc(state) # get paths From 95cc882ef49f27dc6b3eca90ea4891340292a0aa Mon Sep 17 00:00:00 2001 From: Dave Date: Thu, 4 Apr 2024 15:38:23 +0200 Subject: [PATCH 09/27] fix bug in filesystem table init --- dlt/destinations/impl/filesystem/filesystem.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index ba1077d678..4352725992 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -185,6 +185,7 @@ def update_stored_schema( self.fs_client.makedirs(directory, exist_ok=True) # we need to mark the folders of the data tables as initialized if tables_name in self.schema.dlt_table_names(): + print(directory + " " + tables_name) self.fs_client.touch(f"{directory}/init") # write schema to destination @@ -192,9 +193,9 @@ def update_stored_schema( return expected_update - def _get_table_dirs(self, table_names: Iterable[str]) -> Set[str]: + def _get_table_dirs(self, table_names: Iterable[str]) -> List[str]: """Gets unique directories where table data is stored.""" - table_dirs: Set[str] = set() + table_dirs: List[str] = [] for table_name in table_names: # dlt tables do not respect layout (for now) if table_name in self.schema.dlt_table_names(): @@ -205,7 +206,7 @@ def _get_table_dirs(self, table_names: Iterable[str]) -> Set[str]: ) destination_dir = posixpath.join(self.dataset_path, table_prefix) # extract the path component - table_dirs.add(os.path.dirname(destination_dir)) + table_dirs.append(os.path.dirname(destination_dir)) return table_dirs def is_storage_initialized(self) -> bool: From 15ac9bf81490c4e52967e2fab5c7e70b13d4d84e Mon Sep 17 00:00:00 2001 From: Dave Date: Mon, 15 Apr 2024 12:36:16 +0200 Subject: [PATCH 10/27] update testing pipe --- fs_testing_pipe.py | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/fs_testing_pipe.py b/fs_testing_pipe.py index a3d3275f71..bf5a67f4ab 100644 --- a/fs_testing_pipe.py +++ b/fs_testing_pipe.py @@ -1,7 +1,11 @@ import dlt import os +import shutil +import random if __name__ == "__main__": + + shutil.rmtree("./my_files", ignore_errors=True) os.environ["DESTINATION__FILESYSTEM__BUCKET_URL"] = "file://my_files" os.environ["DATA_WRITER__DISABLE_COMPRESSION"] = "TRUE" @@ -15,6 +19,21 @@ def my_resouce(id=dlt.sources.incremental("id")): {"id": 4}, {"id": 5} ] + + pipeline_name = f"dave_{random.randint(0, 10000000)}" - pipe = dlt.pipeline(pipeline_name="dave", destination="filesystem") + pipe = dlt.pipeline(pipeline_name=pipeline_name, destination="filesystem") pipe.run(my_resouce(), table_name="my_table") #, loader_file_format="parquet") + + # resource with incremental for testing restoring of pipeline state + @dlt.resource(name="my_table") + def updated_resouce(id=dlt.sources.incremental("id")): + yield from [ + {"id": 1}, + {"id": 2}, + {"id": 3}, + {"id": 4}, + {"id": 5}, + {"id": 6} + ] + pipe.run(updated_resouce(), table_name="my_table") #, loader_file_format="parquet") From a6ce1b12e00e7c4589fbb360645a8dbda2a62253 Mon Sep 17 00:00:00 2001 From: Dave Date: Mon, 15 Apr 2024 12:48:05 +0200 Subject: [PATCH 11/27] move away from "current" file, rather iterator bucket path contents --- dlt/common/destination/reference.py | 6 +- .../impl/destination/destination.py | 7 +- dlt/destinations/impl/dummy/dummy.py | 7 +- .../impl/filesystem/filesystem.py | 120 +++++++++++------- dlt/destinations/impl/qdrant/qdrant_client.py | 7 +- .../impl/weaviate/weaviate_client.py | 7 +- dlt/destinations/job_client_impl.py | 7 +- dlt/load/load.py | 2 + dlt/load/utils.py | 7 +- fs_testing_pipe.py | 4 +- tests/utils.py | 2 + 11 files changed, 114 insertions(+), 62 deletions(-) diff --git a/dlt/common/destination/reference.py b/dlt/common/destination/reference.py index 67ffececca..738a11c069 100644 --- a/dlt/common/destination/reference.py +++ b/dlt/common/destination/reference.py @@ -273,7 +273,10 @@ def drop_storage(self) -> None: pass def update_stored_schema( - self, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None + self, + load_id: str = None, + only_tables: Iterable[str] = None, + expected_update: TSchemaTables = None, ) -> Optional[TSchemaTables]: """Updates storage to the current schema. @@ -281,6 +284,7 @@ def update_stored_schema( destination has single writer and no other processes modify the schema. Args: + load_id (str, optional): Load id during which the schema is updated only_tables (Sequence[str], optional): Updates only listed tables. Defaults to None. expected_update (TSchemaTables, optional): Update that is expected to be applied to the destination Returns: diff --git a/dlt/destinations/impl/destination/destination.py b/dlt/destinations/impl/destination/destination.py index 4d0f081aa6..4a526720d2 100644 --- a/dlt/destinations/impl/destination/destination.py +++ b/dlt/destinations/impl/destination/destination.py @@ -47,9 +47,12 @@ def drop_storage(self) -> None: pass def update_stored_schema( - self, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None + self, + load_id: str = None, + only_tables: Iterable[str] = None, + expected_update: TSchemaTables = None, ) -> Optional[TSchemaTables]: - return super().update_stored_schema(only_tables, expected_update) + return super().update_stored_schema(load_id, only_tables, expected_update) def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob: # skip internal tables and remove columns from schema if so configured diff --git a/dlt/destinations/impl/dummy/dummy.py b/dlt/destinations/impl/dummy/dummy.py index bafac210cc..60c759f5d7 100644 --- a/dlt/destinations/impl/dummy/dummy.py +++ b/dlt/destinations/impl/dummy/dummy.py @@ -127,9 +127,12 @@ def drop_storage(self) -> None: pass def update_stored_schema( - self, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None + self, + load_id: str = None, + only_tables: Iterable[str] = None, + expected_update: TSchemaTables = None, ) -> Optional[TSchemaTables]: - applied_update = super().update_stored_schema(only_tables, expected_update) + applied_update = super().update_stored_schema(load_id, only_tables, expected_update) if self.config.fail_schema_update: raise DestinationTransientException( "Raise on schema update due to fail_schema_update config flag" diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index 4352725992..0f8d85b26d 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -1,7 +1,7 @@ import posixpath import os from types import TracebackType -from typing import ClassVar, List, Type, Iterable, Set, Iterator, Optional +from typing import ClassVar, List, Type, Iterable, Set, Iterator, Optional, Tuple from fsspec import AbstractFileSystem from contextlib import contextmanager from dlt.common import json, pendulum @@ -33,6 +33,9 @@ from dlt.destinations import path_utils +INIT_FILE_NAME = "init" + + class LoadFilesystemJob(LoadJob): def __init__( self, @@ -176,7 +179,10 @@ def initialize_storage(self, truncate_tables: Iterable[str] = None) -> None: ) def update_stored_schema( - self, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None + self, + load_id: str = None, + only_tables: Iterable[str] = None, + expected_update: TSchemaTables = None, ) -> TSchemaTables: # create destination dirs for all tables table_names = only_tables or self.schema.tables.keys() @@ -185,11 +191,10 @@ def update_stored_schema( self.fs_client.makedirs(directory, exist_ok=True) # we need to mark the folders of the data tables as initialized if tables_name in self.schema.dlt_table_names(): - print(directory + " " + tables_name) - self.fs_client.touch(f"{directory}/init") + self.fs_client.touch(posixpath.join(directory, INIT_FILE_NAME)) # write schema to destination - self.store_current_schema() + self.store_current_schema(load_id or "1") return expected_update @@ -206,7 +211,7 @@ def _get_table_dirs(self, table_names: Iterable[str]) -> List[str]: ) destination_dir = posixpath.join(self.dataset_path, table_prefix) # extract the path component - table_dirs.append(os.path.dirname(destination_dir)) + table_dirs.append(posixpath.dirname(destination_dir)) return table_dirs def is_storage_initialized(self) -> bool: @@ -245,14 +250,27 @@ def should_load_data_to_staging_dataset(self, table: TTableSchema) -> bool: # def _write_to_json_file(self, filepath: str, data: DictStrAny) -> None: - dirname = os.path.dirname(filepath) + dirname = posixpath.dirname(filepath) if not self.fs_client.isdir(dirname): return self.fs_client.write_text(filepath, json.dumps(data), "utf-8") + def _to_path_safe_string(self, s: str) -> str: + return "".join([c for c in s if re.match(r"\w", c)]) if s else None + + def _list_dlt_dir(self, dirname: str) -> Iterator[Tuple[str, List[str]]]: + if not self.fs_client.exists(posixpath.join(dirname, INIT_FILE_NAME)): + raise DestinationUndefinedEntity({"dir": dirname}) + for filepath in self.fs_client.listdir(dirname, detail=False): + filename = os.path.splitext(os.path.basename(filepath))[0] + fileparts = filename.split("__") + if len(fileparts) != 3: + continue + yield filepath, fileparts + def complete_load(self, load_id: str) -> None: # store current state - self.store_current_state() + self.store_current_state(load_id) # write entry to load "table" # TODO: this is also duplicate across all destinations. DRY this. @@ -263,9 +281,7 @@ def complete_load(self, load_id: str) -> None: "inserted_at": pendulum.now().isoformat(), "schema_version_hash": self.schema.version_hash, } - filepath = ( - f"{self.dataset_path}/{self.schema.loads_table_name}/{self.schema.name}.{load_id}.jsonl" - ) + filepath = f"{self.dataset_path}/{self.schema.loads_table_name}/{self.schema.name}__{load_id}.jsonl" self._write_to_json_file(filepath, load_data) @@ -273,16 +289,11 @@ def complete_load(self, load_id: str) -> None: # state read/write # - def _get_state_file_name(self, pipeline_name: str, version_hash: str) -> str: + def _get_state_file_name(self, pipeline_name: str, version_hash: str, load_id: str) -> str: """gets full path for schema file for a given hash""" - safe_hash = "".join( - [c for c in version_hash if re.match(r"\w", c)] - ) # remove all special chars from hash - return ( - f"{self.dataset_path}/{self.schema.state_table_name}/{pipeline_name}__{safe_hash}.jsonl" - ) + return f"{self.dataset_path}/{self.schema.state_table_name}/{pipeline_name}__{load_id}__{self._to_path_safe_string(version_hash)}.jsonl" - def store_current_state(self) -> None: + def store_current_state(self, load_id: str) -> None: # get state doc from current pipeline from dlt.common.configuration.container import Container from dlt.common.pipeline import PipelineContext @@ -293,25 +304,28 @@ def store_current_state(self) -> None: doc = state_doc(state) # get paths - current_path = self._get_state_file_name(pipeline.pipeline_name, "current") hash_path = self._get_state_file_name( - pipeline.pipeline_name, self.schema.stored_version_hash + pipeline.pipeline_name, self.schema.stored_version_hash, load_id ) # write - self._write_to_json_file(current_path, doc) self._write_to_json_file(hash_path, doc) def get_stored_state(self, pipeline_name: str) -> Optional[StateInfo]: - # raise if dir not initialized - filepath = self._get_state_file_name(pipeline_name, "current") - dirname = os.path.dirname(filepath) - if not self.fs_client.isdir(dirname): - raise DestinationUndefinedEntity({"dir": dirname}) + # get base dir + dirname = posixpath.dirname(self._get_state_file_name(pipeline_name, "", "")) + + # search newest state + selected_path = None + newest_load_id = "0" + for filepath, fileparts in self._list_dlt_dir(dirname): + if fileparts[0] == pipeline_name and fileparts[1] > newest_load_id: + newest_load_id = fileparts[1] + selected_path = filepath """Loads compressed state from destination storage""" - if self.fs_client.exists(filepath): - state_json = json.loads(self.fs_client.read_text(filepath)) + if selected_path: + state_json = json.loads(self.fs_client.read_text(selected_path)) state_json.pop("version_hash") return StateInfo(**state_json) @@ -321,33 +335,46 @@ def get_stored_state(self, pipeline_name: str) -> Optional[StateInfo]: # Schema read/write # - def _get_schema_file_name(self, version_hash: str) -> str: + def _get_schema_file_name(self, version_hash: str, load_id: str) -> str: """gets full path for schema file for a given hash""" - safe_hash = "".join( - [c for c in version_hash if re.match(r"\w", c)] - ) # remove all special chars from hash - return f"{self.dataset_path}/{self.schema.version_table_name}/{self.schema.name}__{safe_hash}.jsonl" + return f"{self.dataset_path}/{self.schema.version_table_name}/{self.schema.name}__{load_id}__{self._to_path_safe_string(version_hash)}.jsonl" def get_stored_schema(self) -> Optional[StorageSchemaInfo]: """Retrieves newest schema from destination storage""" - return self.get_stored_schema_by_hash("current") + return self._get_stored_schema_by_hash_or_newest() def get_stored_schema_by_hash(self, version_hash: str) -> Optional[StorageSchemaInfo]: - """retrieves the stored schema by hash""" - filepath = self._get_schema_file_name(version_hash) - # raise if dir not initialized - dirname = os.path.dirname(filepath) - if not self.fs_client.isdir(dirname): - raise DestinationUndefinedEntity({"dir": dirname}) - if self.fs_client.exists(filepath): - return StorageSchemaInfo(**json.loads(self.fs_client.read_text(filepath))) + return self._get_stored_schema_by_hash_or_newest(version_hash) + + def _get_stored_schema_by_hash_or_newest( + self, version_hash: str = None + ) -> Optional[StorageSchemaInfo]: + """Get the schema by supplied hash, falls back to getting the newest version matching the existing schema name""" + version_hash = self._to_path_safe_string(version_hash) + dirname = posixpath.dirname(self._get_schema_file_name("", "")) + # find newest schema for pipeline or by version hash + selected_path = None + newest_load_id = "0" + for filepath, fileparts in self._list_dlt_dir(dirname): + if ( + not version_hash + and fileparts[0] == self.schema.name + and fileparts[1] > newest_load_id + ): + newest_load_id = fileparts[1] + selected_path = filepath + elif fileparts[2] == version_hash: + selected_path = filepath + break + + if selected_path: + return StorageSchemaInfo(**json.loads(self.fs_client.read_text(selected_path))) return None - def store_current_schema(self) -> None: + def store_current_schema(self, load_id: str) -> None: # get paths - current_path = self._get_schema_file_name("current") - hash_path = self._get_schema_file_name(self.schema.stored_version_hash) + hash_path = self._get_schema_file_name(self.schema.stored_version_hash, load_id) # TODO: duplicate of weaviate implementation, should be abstracted out version_info = { @@ -360,5 +387,4 @@ def store_current_schema(self) -> None: } # we always keep tabs on what the current schema is - self._write_to_json_file(current_path, version_info) self._write_to_json_file(hash_path, version_info) diff --git a/dlt/destinations/impl/qdrant/qdrant_client.py b/dlt/destinations/impl/qdrant/qdrant_client.py index 5a5e5f8cfd..970580eb51 100644 --- a/dlt/destinations/impl/qdrant/qdrant_client.py +++ b/dlt/destinations/impl/qdrant/qdrant_client.py @@ -283,9 +283,12 @@ def _delete_sentinel_collection(self) -> None: self.db_client.delete_collection(self.sentinel_collection) def update_stored_schema( - self, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None + self, + load_id: str = None, + only_tables: Iterable[str] = None, + expected_update: TSchemaTables = None, ) -> Optional[TSchemaTables]: - super().update_stored_schema(only_tables, expected_update) + super().update_stored_schema(load_id, only_tables, expected_update) applied_update: TSchemaTables = {} schema_info = self.get_stored_schema_by_hash(self.schema.stored_version_hash) if schema_info is None: diff --git a/dlt/destinations/impl/weaviate/weaviate_client.py b/dlt/destinations/impl/weaviate/weaviate_client.py index bdf2fa0b2b..3bbc2a8c7d 100644 --- a/dlt/destinations/impl/weaviate/weaviate_client.py +++ b/dlt/destinations/impl/weaviate/weaviate_client.py @@ -424,9 +424,12 @@ def _delete_sentinel_class(self) -> None: @wrap_weaviate_error def update_stored_schema( - self, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None + self, + load_id: str = None, + only_tables: Iterable[str] = None, + expected_update: TSchemaTables = None, ) -> Optional[TSchemaTables]: - super().update_stored_schema(only_tables, expected_update) + super().update_stored_schema(load_id, only_tables, expected_update) # Retrieve the schema from Weaviate applied_update: TSchemaTables = {} try: diff --git a/dlt/destinations/job_client_impl.py b/dlt/destinations/job_client_impl.py index 043b309b1e..cfb95b0af2 100644 --- a/dlt/destinations/job_client_impl.py +++ b/dlt/destinations/job_client_impl.py @@ -180,9 +180,12 @@ def is_storage_initialized(self) -> bool: return self.sql_client.has_dataset() def update_stored_schema( - self, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None + self, + load_id: str = None, + only_tables: Iterable[str] = None, + expected_update: TSchemaTables = None, ) -> Optional[TSchemaTables]: - super().update_stored_schema(only_tables, expected_update) + super().update_stored_schema(load_id, only_tables, expected_update) applied_update: TSchemaTables = {} schema_info = self.get_stored_schema_by_hash(self.schema.stored_version_hash) if schema_info is None: diff --git a/dlt/load/load.py b/dlt/load/load.py index c5790d467b..4da64e472a 100644 --- a/dlt/load/load.py +++ b/dlt/load/load.py @@ -369,6 +369,7 @@ def load_single_package(self, load_id: str, schema: Schema) -> None: if isinstance(job_client, WithStagingDataset) else None ), + load_id=load_id, ) # init staging client @@ -385,6 +386,7 @@ def load_single_package(self, load_id: str, schema: Schema) -> None: expected_update, job_client.should_truncate_table_before_load_on_staging_destination, job_client.should_load_data_to_staging_dataset_on_staging_destination, + load_id=load_id, ) self.load_storage.commit_schema_update(load_id, applied_update) diff --git a/dlt/load/utils.py b/dlt/load/utils.py index 067ae33613..1635d21efe 100644 --- a/dlt/load/utils.py +++ b/dlt/load/utils.py @@ -66,6 +66,7 @@ def init_client( expected_update: TSchemaTables, truncate_filter: Callable[[TTableSchema], bool], load_staging_filter: Callable[[TTableSchema], bool], + load_id: str = None, ) -> TSchemaTables: """Initializes destination storage including staging dataset if supported @@ -97,7 +98,7 @@ def init_client( ) applied_update = _init_dataset_and_update_schema( - job_client, expected_update, tables_with_jobs | dlt_tables, truncate_tables + job_client, expected_update, tables_with_jobs | dlt_tables, truncate_tables, load_id=load_id ) # update the staging dataset if client supports this @@ -117,6 +118,7 @@ def init_client( staging_tables | {schema.version_table_name}, # keep only schema version staging_tables, # all eligible tables must be also truncated staging_info=True, + load_id=load_id, ) return applied_update @@ -128,6 +130,7 @@ def _init_dataset_and_update_schema( update_tables: Iterable[str], truncate_tables: Iterable[str] = None, staging_info: bool = False, + load_id: str = None, ) -> TSchemaTables: staging_text = "for staging dataset" if staging_info else "" logger.info( @@ -140,7 +143,7 @@ def _init_dataset_and_update_schema( f" {staging_text}" ) applied_update = job_client.update_stored_schema( - only_tables=update_tables, expected_update=expected_update + load_id=load_id, only_tables=update_tables, expected_update=expected_update ) logger.info( f"Client for {job_client.config.destination_type} will truncate tables {staging_text}" diff --git a/fs_testing_pipe.py b/fs_testing_pipe.py index bf5a67f4ab..7f95f1f389 100644 --- a/fs_testing_pipe.py +++ b/fs_testing_pipe.py @@ -5,7 +5,7 @@ if __name__ == "__main__": - shutil.rmtree("./my_files", ignore_errors=True) + # shutil.rmtree("./my_files", ignore_errors=True) os.environ["DESTINATION__FILESYSTEM__BUCKET_URL"] = "file://my_files" os.environ["DATA_WRITER__DISABLE_COMPRESSION"] = "TRUE" @@ -20,7 +20,7 @@ def my_resouce(id=dlt.sources.incremental("id")): {"id": 5} ] - pipeline_name = f"dave_{random.randint(0, 10000000)}" + pipeline_name = f"dave" pipe = dlt.pipeline(pipeline_name=pipeline_name, destination="filesystem") pipe.run(my_resouce(), table_name="my_table") #, loader_file_format="parquet") diff --git a/tests/utils.py b/tests/utils.py index 1ccb7fc5e4..69338b2f72 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -62,6 +62,8 @@ # filter out active destinations for current tests ACTIVE_DESTINATIONS = set(dlt.config.get("ACTIVE_DESTINATIONS", list) or IMPLEMENTED_DESTINATIONS) +ACTIVE_DESTINATIONS = {"filesystem"} + ACTIVE_SQL_DESTINATIONS = SQL_DESTINATIONS.intersection(ACTIVE_DESTINATIONS) ACTIVE_NON_SQL_DESTINATIONS = NON_SQL_DESTINATIONS.intersection(ACTIVE_DESTINATIONS) From bce2837c49bec57d01233de07afe327e235e39c6 Mon Sep 17 00:00:00 2001 From: Dave Date: Mon, 15 Apr 2024 16:49:02 +0200 Subject: [PATCH 12/27] store pipeline state in load package state and send to filesystem destination from there --- dlt/common/storages/load_package.py | 8 +++ .../impl/filesystem/filesystem.py | 50 ++++++++++--------- dlt/extract/__init__.py | 2 +- dlt/extract/extract.py | 15 +++++- dlt/extract/extractors.py | 9 +++- dlt/extract/resource.py | 16 ++++++ dlt/load/load.py | 11 ++-- dlt/pipeline/current.py | 1 + dlt/pipeline/mark.py | 1 + dlt/pipeline/state_sync.py | 3 +- tests/load/utils.py | 2 + 11 files changed, 86 insertions(+), 32 deletions(-) diff --git a/dlt/common/storages/load_package.py b/dlt/common/storages/load_package.py index 1c76fd39cd..8870024de9 100644 --- a/dlt/common/storages/load_package.py +++ b/dlt/common/storages/load_package.py @@ -61,6 +61,8 @@ class TLoadPackageState(TVersionedState, total=False): """A section of state that does not participate in change merging and version control""" destination_state: NotRequired[Dict[str, Any]] """private space for destinations to store state relevant only to the load package""" + source_state: NotRequired[Dict[str, Any]] + """private space for source to store state relevant only to the load package, currently used for storing pipeline state""" class TLoadPackage(TypedDict, total=False): @@ -689,6 +691,12 @@ def destination_state() -> DictStrAny: return lp["state"].setdefault("destination_state", {}) +def load_package_source_state() -> DictStrAny: + """Get segment of load package state that is specific to the current destination.""" + lp = load_package() + return lp["state"].setdefault("source_state", {}) + + def clear_destination_state(commit: bool = True) -> None: """Clear segment of load package state that is specific to the current destination. Optionally commit to load package.""" lp = load_package() diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index 0f8d85b26d..a8cd3e9422 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -194,12 +194,12 @@ def update_stored_schema( self.fs_client.touch(posixpath.join(directory, INIT_FILE_NAME)) # write schema to destination - self.store_current_schema(load_id or "1") + self._store_current_schema(load_id or "1") return expected_update def _get_table_dirs(self, table_names: Iterable[str]) -> List[str]: - """Gets unique directories where table data is stored.""" + """Gets directories where table data is stored.""" table_dirs: List[str] = [] for table_name in table_names: # dlt tables do not respect layout (for now) @@ -268,10 +268,7 @@ def _list_dlt_dir(self, dirname: str) -> Iterator[Tuple[str, List[str]]]: continue yield filepath, fileparts - def complete_load(self, load_id: str) -> None: - # store current state - self.store_current_state(load_id) - + def _store_load(self, load_id: str) -> None: # write entry to load "table" # TODO: this is also duplicate across all destinations. DRY this. load_data = { @@ -282,9 +279,13 @@ def complete_load(self, load_id: str) -> None: "schema_version_hash": self.schema.version_hash, } filepath = f"{self.dataset_path}/{self.schema.loads_table_name}/{self.schema.name}__{load_id}.jsonl" - self._write_to_json_file(filepath, load_data) + def complete_load(self, load_id: str) -> None: + # store current state + self._store_current_state(load_id) + self._store_load(load_id) + # # state read/write # @@ -293,19 +294,20 @@ def _get_state_file_name(self, pipeline_name: str, version_hash: str, load_id: s """gets full path for schema file for a given hash""" return f"{self.dataset_path}/{self.schema.state_table_name}/{pipeline_name}__{load_id}__{self._to_path_safe_string(version_hash)}.jsonl" - def store_current_state(self, load_id: str) -> None: + def _store_current_state(self, load_id: str) -> None: # get state doc from current pipeline - from dlt.common.configuration.container import Container - from dlt.common.pipeline import PipelineContext - from dlt.pipeline.state_sync import state_doc + from dlt.pipeline.current import load_package_source_state + from dlt.pipeline.state_sync import LOAD_PACKAGE_STATE_KEY + + doc = load_package_source_state().get(LOAD_PACKAGE_STATE_KEY, {}) - pipeline = Container()[PipelineContext].pipeline() - state = pipeline.state - doc = state_doc(state) + if not doc: + return # get paths + pipeline_name = doc["pipeline_name"] hash_path = self._get_state_file_name( - pipeline.pipeline_name, self.schema.stored_version_hash, load_id + pipeline_name, self.schema.stored_version_hash, load_id ) # write @@ -323,7 +325,7 @@ def get_stored_state(self, pipeline_name: str) -> Optional[StateInfo]: newest_load_id = fileparts[1] selected_path = filepath - """Loads compressed state from destination storage""" + # Load compressed state from destination if selected_path: state_json = json.loads(self.fs_client.read_text(selected_path)) state_json.pop("version_hash") @@ -339,13 +341,6 @@ def _get_schema_file_name(self, version_hash: str, load_id: str) -> str: """gets full path for schema file for a given hash""" return f"{self.dataset_path}/{self.schema.version_table_name}/{self.schema.name}__{load_id}__{self._to_path_safe_string(version_hash)}.jsonl" - def get_stored_schema(self) -> Optional[StorageSchemaInfo]: - """Retrieves newest schema from destination storage""" - return self._get_stored_schema_by_hash_or_newest() - - def get_stored_schema_by_hash(self, version_hash: str) -> Optional[StorageSchemaInfo]: - return self._get_stored_schema_by_hash_or_newest(version_hash) - def _get_stored_schema_by_hash_or_newest( self, version_hash: str = None ) -> Optional[StorageSchemaInfo]: @@ -372,7 +367,7 @@ def _get_stored_schema_by_hash_or_newest( return None - def store_current_schema(self, load_id: str) -> None: + def _store_current_schema(self, load_id: str) -> None: # get paths hash_path = self._get_schema_file_name(self.schema.stored_version_hash, load_id) @@ -388,3 +383,10 @@ def store_current_schema(self, load_id: str) -> None: # we always keep tabs on what the current schema is self._write_to_json_file(hash_path, version_info) + + def get_stored_schema(self) -> Optional[StorageSchemaInfo]: + """Retrieves newest schema from destination storage""" + return self._get_stored_schema_by_hash_or_newest() + + def get_stored_schema_by_hash(self, version_hash: str) -> Optional[StorageSchemaInfo]: + return self._get_stored_schema_by_hash_or_newest(version_hash) diff --git a/dlt/extract/__init__.py b/dlt/extract/__init__.py index 03b2e59539..7e0dd3d0fc 100644 --- a/dlt/extract/__init__.py +++ b/dlt/extract/__init__.py @@ -1,4 +1,4 @@ -from dlt.extract.resource import DltResource, with_table_name, with_hints +from dlt.extract.resource import DltResource, with_table_name, with_hints, with_package_state from dlt.extract.hints import make_hints from dlt.extract.source import DltSource from dlt.extract.decorators import source, resource, transformer, defer diff --git a/dlt/extract/extract.py b/dlt/extract/extract.py index cc2b03c50b..f07c4ccbc6 100644 --- a/dlt/extract/extract.py +++ b/dlt/extract/extract.py @@ -27,7 +27,12 @@ TWriteDispositionConfig, ) from dlt.common.storages import NormalizeStorageConfiguration, LoadPackageInfo, SchemaStorage -from dlt.common.storages.load_package import ParsedLoadJobFileName +from dlt.common.storages.load_package import ( + ParsedLoadJobFileName, + LoadPackageStateInjectableContext, +) + + from dlt.common.utils import get_callable_name, get_full_class_name from dlt.extract.decorators import SourceInjectableContext, SourceSchemaInjectableContext @@ -367,7 +372,13 @@ def extract( load_id = self.extract_storage.create_load_package(source.discover_schema()) with Container().injectable_context( SourceSchemaInjectableContext(source.schema) - ), Container().injectable_context(SourceInjectableContext(source)): + ), Container().injectable_context( + SourceInjectableContext(source) + ), Container().injectable_context( + LoadPackageStateInjectableContext( + storage=self.extract_storage.new_packages, load_id=load_id + ) + ): # inject the config section with the current source name with inject_section( ConfigSectionContext( diff --git a/dlt/extract/extractors.py b/dlt/extract/extractors.py index b4afc5b1f8..86888fed0a 100644 --- a/dlt/extract/extractors.py +++ b/dlt/extract/extractors.py @@ -19,7 +19,7 @@ TPartialTableSchema, ) from dlt.extract.hints import HintsMeta -from dlt.extract.resource import DltResource +from dlt.extract.resource import DltResource, LoadPackageStateMeta from dlt.extract.items import TableNameMeta from dlt.extract.storage import ExtractorItemStorage @@ -88,6 +88,13 @@ def write_items(self, resource: DltResource, items: TDataItems, meta: Any) -> No meta = TableNameMeta(meta.hints["name"]) # type: ignore[arg-type] self._reset_contracts_cache() + # if we have a load package state meta, store to load package + if isinstance(meta, LoadPackageStateMeta): + from dlt.pipeline.current import load_package_source_state, commit_load_package_state + + load_package_source_state()[meta.state_key_name] = items + commit_load_package_state() + if table_name := self._get_static_table_name(resource, meta): # write item belonging to table with static name self._write_to_static_table(resource, table_name, items, meta) diff --git a/dlt/extract/resource.py b/dlt/extract/resource.py index 4776158bbb..87e4fd7f76 100644 --- a/dlt/extract/resource.py +++ b/dlt/extract/resource.py @@ -76,6 +76,22 @@ def with_hints( return DataItemWithMeta(HintsMeta(hints, create_table_variant), item) +class LoadPackageStateMeta: + __slots__ = "state_key_name" + + def __init__(self, state_key_name: str) -> None: + self.state_key_name = state_key_name + + +def with_package_state(item: TDataItems, state_key_name: str) -> DataItemWithMeta: + """Marks `item` to also be inserted into the package state. + + Will create a separate variant of hints for a table if `name` is provided in `hints` and `create_table_variant` is set. + + """ + return DataItemWithMeta(LoadPackageStateMeta(state_key_name), item) + + class DltResource(Iterable[TDataItem], DltResourceHints): """Implements dlt resource. Contains a data pipe that wraps a generating item and table schema that can be adjusted""" diff --git a/dlt/load/load.py b/dlt/load/load.py index 4da64e472a..7bedb3dfa6 100644 --- a/dlt/load/load.py +++ b/dlt/load/load.py @@ -341,7 +341,13 @@ def complete_package(self, load_id: str, schema: Schema, aborted: bool = False) # do not commit load id for aborted packages if not aborted: with self.get_destination_client(schema) as job_client: - job_client.complete_load(load_id) + with Container().injectable_context( + LoadPackageStateInjectableContext( + storage=self.load_storage.normalized_packages, + load_id=load_id, + ) + ): + job_client.complete_load(load_id) self.load_storage.complete_load_package(load_id, aborted) # collect package info self._loaded_packages.append(self.load_storage.get_load_package_info(load_id)) @@ -471,10 +477,9 @@ def run(self, pool: Optional[Executor]) -> TRunMetrics: schema = self.load_storage.normalized_packages.load_schema(load_id) logger.info(f"Loaded schema name {schema.name} and version {schema.stored_version}") - container = Container() # get top load id and mark as being processed with self.collector(f"Load {schema.name} in {load_id}"): - with container.injectable_context( + with Container().injectable_context( LoadPackageStateInjectableContext( storage=self.load_storage.normalized_packages, load_id=load_id, diff --git a/dlt/pipeline/current.py b/dlt/pipeline/current.py index 25fd398623..4bbe74a123 100644 --- a/dlt/pipeline/current.py +++ b/dlt/pipeline/current.py @@ -7,6 +7,7 @@ load_package, commit_load_package_state, destination_state, + load_package_source_state, clear_destination_state, ) from dlt.extract.decorators import get_source_schema, get_source diff --git a/dlt/pipeline/mark.py b/dlt/pipeline/mark.py index 3956d9bbe2..0b753539be 100644 --- a/dlt/pipeline/mark.py +++ b/dlt/pipeline/mark.py @@ -3,5 +3,6 @@ with_table_name, with_hints, make_hints, + with_package_state, materialize_schema_item as materialize_table_schema, ) diff --git a/dlt/pipeline/state_sync.py b/dlt/pipeline/state_sync.py index bc9e35bafe..70a58d1f98 100644 --- a/dlt/pipeline/state_sync.py +++ b/dlt/pipeline/state_sync.py @@ -22,6 +22,7 @@ ) PIPELINE_STATE_ENGINE_VERSION = 4 +LOAD_PACKAGE_STATE_KEY = "pipeline_state" # state table columns STATE_TABLE_COLUMNS: TTableSchemaColumns = { @@ -109,7 +110,7 @@ def state_doc(state: TPipelineState) -> DictStrAny: def state_resource(state: TPipelineState) -> DltResource: - doc = state_doc(state) + doc = dlt.mark.with_package_state(state_doc(state), LOAD_PACKAGE_STATE_KEY) return dlt.resource( [doc], name=STATE_TABLE_NAME, write_disposition="append", columns=STATE_TABLE_COLUMNS ) diff --git a/tests/load/utils.py b/tests/load/utils.py index 110c2b433d..487fd588be 100644 --- a/tests/load/utils.py +++ b/tests/load/utils.py @@ -64,6 +64,8 @@ "r2", ] +ALL_FILESYSTEM_DRIVERS = ["memory", "file"] + # Filter out buckets not in all filesystem drivers WITH_GDRIVE_BUCKETS = [GCS_BUCKET, AWS_BUCKET, FILE_BUCKET, MEMORY_BUCKET, AZ_BUCKET, GDRIVE_BUCKET] WITH_GDRIVE_BUCKETS = [ From 40f1f3ea377ce390d01973e961213e6284fb32c3 Mon Sep 17 00:00:00 2001 From: Dave Date: Mon, 15 Apr 2024 17:35:19 +0200 Subject: [PATCH 13/27] fix tests for changed number of files in filesystem destination --- .../impl/filesystem/filesystem.py | 9 ++++-- .../load/filesystem/test_filesystem_client.py | 6 ++++ .../load/pipeline/test_filesystem_pipeline.py | 10 +++++-- .../load/pipeline/test_replace_disposition.py | 30 ++++++++----------- tests/pipeline/utils.py | 9 ------ tests/utils.py | 2 +- 6 files changed, 34 insertions(+), 32 deletions(-) diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index a8cd3e9422..8e3f65f4de 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -368,8 +368,13 @@ def _get_stored_schema_by_hash_or_newest( return None def _store_current_schema(self, load_id: str) -> None: + # check if schema with hash exists + current_hash = self.schema.stored_version_hash + if self._get_stored_schema_by_hash_or_newest(current_hash): + return + # get paths - hash_path = self._get_schema_file_name(self.schema.stored_version_hash, load_id) + filepath = self._get_schema_file_name(self.schema.stored_version_hash, load_id) # TODO: duplicate of weaviate implementation, should be abstracted out version_info = { @@ -382,7 +387,7 @@ def _store_current_schema(self, load_id: str) -> None: } # we always keep tabs on what the current schema is - self._write_to_json_file(hash_path, version_info) + self._write_to_json_file(filepath, version_info) def get_stored_schema(self) -> Optional[StorageSchemaInfo]: """Retrieves newest schema from destination storage""" diff --git a/tests/load/filesystem/test_filesystem_client.py b/tests/load/filesystem/test_filesystem_client.py index 5d6dbe33ef..1cc0b61856 100644 --- a/tests/load/filesystem/test_filesystem_client.py +++ b/tests/load/filesystem/test_filesystem_client.py @@ -129,6 +129,9 @@ def test_replace_write_disposition(layout: str, default_buckets_env: str) -> Non for basedir, _dirs, files in client.fs_client.walk( client.dataset_path, detail=False, refresh=True ): + # remove internal paths + if "_dlt" in basedir: + continue for f in files: paths.append(posixpath.join(basedir, f)) ls = set(paths) @@ -166,6 +169,9 @@ def test_append_write_disposition(layout: str, default_buckets_env: str) -> None for basedir, _dirs, files in client.fs_client.walk( client.dataset_path, detail=False, refresh=True ): + # remove internal paths + if "_dlt" in basedir: + continue for f in files: paths.append(posixpath.join(basedir, f)) assert list(sorted(paths)) == expected_files diff --git a/tests/load/pipeline/test_filesystem_pipeline.py b/tests/load/pipeline/test_filesystem_pipeline.py index d4e8777d28..0d859df3b0 100644 --- a/tests/load/pipeline/test_filesystem_pipeline.py +++ b/tests/load/pipeline/test_filesystem_pipeline.py @@ -86,11 +86,15 @@ def some_source(): for job in pkg.jobs["completed_jobs"]: assert_file_matches(layout, job, pkg.load_id, client) - complete_fn = f"{client.schema.name}.{LOADS_TABLE_NAME}.%s" + complete_fn = f"{client.schema.name}__%s.jsonl" # Test complete_load markers are saved - assert client.fs_client.isfile(posixpath.join(client.dataset_path, complete_fn % load_id1)) - assert client.fs_client.isfile(posixpath.join(client.dataset_path, complete_fn % load_id2)) + assert client.fs_client.isfile( + posixpath.join(client.dataset_path, client.schema.loads_table_name, complete_fn % load_id1) + ) + assert client.fs_client.isfile( + posixpath.join(client.dataset_path, client.schema.loads_table_name, complete_fn % load_id2) + ) # Force replace pipeline.run(some_source(), write_disposition="replace") diff --git a/tests/load/pipeline/test_replace_disposition.py b/tests/load/pipeline/test_replace_disposition.py index 6efde6e019..09a746433f 100644 --- a/tests/load/pipeline/test_replace_disposition.py +++ b/tests/load/pipeline/test_replace_disposition.py @@ -41,8 +41,6 @@ def test_replace_disposition( # make duckdb to reuse database in working folder os.environ["DESTINATION__DUCKDB__CREDENTIALS"] = "duckdb:///test_replace_disposition.duckdb" - # TODO: start storing _dlt_loads with right json content - increase_loads = lambda x: x if destination_config.destination == "filesystem" else x + 1 increase_state_loads = lambda info: len( [ job @@ -52,11 +50,9 @@ def test_replace_disposition( ] ) - # filesystem does not have versions and child tables + # filesystem does not have child tables, prepend defaults def norm_table_counts(counts: Dict[str, int], *child_tables: str) -> Dict[str, int]: - if destination_config.destination != "filesystem": - return counts - return {**{"_dlt_version": 0}, **{t: 0 for t in child_tables}, **counts} + return {**{t: 0 for t in child_tables}, **counts} dataset_name = "test_replace_strategies_ds" + uniq_id() pipeline = destination_config.setup_pipeline( @@ -108,8 +104,8 @@ def append_items(): assert_load_info(info) # count state records that got extracted state_records = increase_state_loads(info) - dlt_loads: int = increase_loads(0) - dlt_versions: int = increase_loads(0) + dlt_loads: int = 1 + dlt_versions: int = 1 # second run with higher offset so we can check the results offset = 1000 @@ -118,11 +114,11 @@ def append_items(): ) assert_load_info(info) state_records += increase_state_loads(info) - dlt_loads = increase_loads(dlt_loads) + dlt_loads += 1 # we should have all items loaded table_counts = load_table_counts(pipeline, *pipeline.default_schema.tables.keys()) - assert norm_table_counts(table_counts) == { + assert table_counts == { "append_items": 24, # loaded twice "items": 120, "items__sub_items": 240, @@ -166,7 +162,7 @@ def load_items_none(): ) assert_load_info(info) state_records += increase_state_loads(info) - dlt_loads = increase_loads(dlt_loads) + dlt_loads += 1 # table and child tables should be cleared table_counts = load_table_counts(pipeline, *pipeline.default_schema.tables.keys()) @@ -200,8 +196,8 @@ def load_items_none(): assert_load_info(info) new_state_records = increase_state_loads(info) assert new_state_records == 1 - dlt_loads = increase_loads(dlt_loads) - dlt_versions = increase_loads(dlt_versions) + dlt_loads += 1 + dlt_versions += 1 # check trace assert pipeline_2.last_trace.last_normalize_info.row_counts == { "items_copy": 120, @@ -214,18 +210,18 @@ def load_items_none(): assert_load_info(info) new_state_records = increase_state_loads(info) assert new_state_records == 0 - dlt_loads = increase_loads(dlt_loads) + dlt_loads += 1 # new pipeline table_counts = load_table_counts(pipeline_2, *pipeline_2.default_schema.tables.keys()) - assert norm_table_counts(table_counts) == { + assert table_counts == { "append_items": 48, "items_copy": 120, "items_copy__sub_items": 240, "items_copy__sub_items__sub_sub_items": 120, "_dlt_pipeline_state": state_records + 1, "_dlt_loads": dlt_loads, - "_dlt_version": increase_loads(dlt_versions), + "_dlt_version": dlt_versions + 1, } # check trace assert pipeline_2.last_trace.last_normalize_info.row_counts == { @@ -243,7 +239,7 @@ def load_items_none(): "items__sub_items__sub_sub_items": 0, "_dlt_pipeline_state": state_records + 1, "_dlt_loads": dlt_loads, # next load - "_dlt_version": increase_loads(dlt_versions), # new table name -> new schema + "_dlt_version": dlt_versions + 1, # new table name -> new schema } diff --git a/tests/pipeline/utils.py b/tests/pipeline/utils.py index c4e1f5314b..99312bf5dd 100644 --- a/tests/pipeline/utils.py +++ b/tests/pipeline/utils.py @@ -96,11 +96,6 @@ def load_file(path: str, file: str) -> Tuple[str, List[Dict[str, Any]]]: # table name will be last element of path table_name = path.split("/")[-1] - - # skip loads table - if table_name == "_dlt_loads": - return table_name, [] - full_path = posixpath.join(path, file) # load jsonl @@ -157,10 +152,6 @@ def load_files(p: dlt.Pipeline, *table_names: str) -> Dict[str, List[Dict[str, A else: result[table_name] = items - # loads file is special case - if LOADS_TABLE_NAME in table_names and file.find(".{LOADS_TABLE_NAME}."): - result[LOADS_TABLE_NAME] = [] - return result diff --git a/tests/utils.py b/tests/utils.py index 69338b2f72..6defd08f5a 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -62,7 +62,7 @@ # filter out active destinations for current tests ACTIVE_DESTINATIONS = set(dlt.config.get("ACTIVE_DESTINATIONS", list) or IMPLEMENTED_DESTINATIONS) -ACTIVE_DESTINATIONS = {"filesystem"} +ACTIVE_DESTINATIONS = {"filesystem", "duckdb"} ACTIVE_SQL_DESTINATIONS = SQL_DESTINATIONS.intersection(ACTIVE_DESTINATIONS) ACTIVE_NON_SQL_DESTINATIONS = NON_SQL_DESTINATIONS.intersection(ACTIVE_DESTINATIONS) From 5e8c233e59e8340b3451f4d53841a7dd92bb088b Mon Sep 17 00:00:00 2001 From: Dave Date: Mon, 15 Apr 2024 17:37:28 +0200 Subject: [PATCH 14/27] remove dev code --- fs_testing_pipe.py | 39 --------------------------------------- tests/load/utils.py | 2 -- tests/utils.py | 2 -- 3 files changed, 43 deletions(-) delete mode 100644 fs_testing_pipe.py diff --git a/fs_testing_pipe.py b/fs_testing_pipe.py deleted file mode 100644 index 7f95f1f389..0000000000 --- a/fs_testing_pipe.py +++ /dev/null @@ -1,39 +0,0 @@ -import dlt -import os -import shutil -import random - -if __name__ == "__main__": - - # shutil.rmtree("./my_files", ignore_errors=True) - os.environ["DESTINATION__FILESYSTEM__BUCKET_URL"] = "file://my_files" - os.environ["DATA_WRITER__DISABLE_COMPRESSION"] = "TRUE" - - # resource with incremental for testing restoring of pipeline state - @dlt.resource(name="my_table") - def my_resouce(id=dlt.sources.incremental("id")): - yield from [ - {"id": 1}, - {"id": 2}, - {"id": 3}, - {"id": 4}, - {"id": 5} - ] - - pipeline_name = f"dave" - - pipe = dlt.pipeline(pipeline_name=pipeline_name, destination="filesystem") - pipe.run(my_resouce(), table_name="my_table") #, loader_file_format="parquet") - - # resource with incremental for testing restoring of pipeline state - @dlt.resource(name="my_table") - def updated_resouce(id=dlt.sources.incremental("id")): - yield from [ - {"id": 1}, - {"id": 2}, - {"id": 3}, - {"id": 4}, - {"id": 5}, - {"id": 6} - ] - pipe.run(updated_resouce(), table_name="my_table") #, loader_file_format="parquet") diff --git a/tests/load/utils.py b/tests/load/utils.py index 487fd588be..110c2b433d 100644 --- a/tests/load/utils.py +++ b/tests/load/utils.py @@ -64,8 +64,6 @@ "r2", ] -ALL_FILESYSTEM_DRIVERS = ["memory", "file"] - # Filter out buckets not in all filesystem drivers WITH_GDRIVE_BUCKETS = [GCS_BUCKET, AWS_BUCKET, FILE_BUCKET, MEMORY_BUCKET, AZ_BUCKET, GDRIVE_BUCKET] WITH_GDRIVE_BUCKETS = [ diff --git a/tests/utils.py b/tests/utils.py index 6defd08f5a..1ccb7fc5e4 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -62,8 +62,6 @@ # filter out active destinations for current tests ACTIVE_DESTINATIONS = set(dlt.config.get("ACTIVE_DESTINATIONS", list) or IMPLEMENTED_DESTINATIONS) -ACTIVE_DESTINATIONS = {"filesystem", "duckdb"} - ACTIVE_SQL_DESTINATIONS = SQL_DESTINATIONS.intersection(ACTIVE_DESTINATIONS) ACTIVE_NON_SQL_DESTINATIONS = NON_SQL_DESTINATIONS.intersection(ACTIVE_DESTINATIONS) From e7e0192546f81b15ea378c24fc01188859e4291f Mon Sep 17 00:00:00 2001 From: Dave Date: Mon, 15 Apr 2024 18:20:53 +0200 Subject: [PATCH 15/27] create init file also to mark datasets --- dlt/common/storages/load_package.py | 2 +- dlt/destinations/impl/filesystem/filesystem.py | 9 +++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/dlt/common/storages/load_package.py b/dlt/common/storages/load_package.py index 8870024de9..6d4f4a0521 100644 --- a/dlt/common/storages/load_package.py +++ b/dlt/common/storages/load_package.py @@ -692,7 +692,7 @@ def destination_state() -> DictStrAny: def load_package_source_state() -> DictStrAny: - """Get segment of load package state that is specific to the current destination.""" + """Get segment of load package state that is specific to the sources.""" lp = load_package() return lp["state"].setdefault("source_state", {}) diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index 8e3f65f4de..e057fbc52f 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -178,6 +178,10 @@ def initialize_storage(self, truncate_tables: Iterable[str] = None) -> None: " should be created previously!" ) + # we mark the storage folder as initialized + self.fs_client.makedirs(self.dataset_path, exist_ok=True) + self.fs_client.touch(posixpath.join(self.dataset_path, INIT_FILE_NAME)) + def update_stored_schema( self, load_id: str = None, @@ -215,7 +219,7 @@ def _get_table_dirs(self, table_names: Iterable[str]) -> List[str]: return table_dirs def is_storage_initialized(self) -> bool: - return self.fs_client.isdir(self.dataset_path) # type: ignore[no-any-return] + return self.fs_client.exists(posixpath.join(self.dataset_path, INIT_FILE_NAME)) # type: ignore[no-any-return] def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob: # skip the state table, we create a jsonl file in the complete_load step @@ -363,8 +367,9 @@ def _get_stored_schema_by_hash_or_newest( break if selected_path: + print("got state") return StorageSchemaInfo(**json.loads(self.fs_client.read_text(selected_path))) - + print("no state") return None def _store_current_schema(self, load_id: str) -> None: From 7cd51b46989aadebd9109820c5055785ea2e2e0d Mon Sep 17 00:00:00 2001 From: Dave Date: Tue, 16 Apr 2024 11:03:20 +0200 Subject: [PATCH 16/27] fix tests to respect new init file change filesystem to fallback, to old state loading when used as staging destination --- dlt/destinations/impl/filesystem/filesystem.py | 12 +++++++++--- tests/load/filesystem/test_filesystem_client.py | 6 +++++- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index e057fbc52f..55e19d3cb1 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -197,8 +197,9 @@ def update_stored_schema( if tables_name in self.schema.dlt_table_names(): self.fs_client.touch(posixpath.join(directory, INIT_FILE_NAME)) - # write schema to destination - self._store_current_schema(load_id or "1") + # don't store schema when used as staging + if not self.config.as_staging: + self._store_current_schema(load_id or "1") return expected_update @@ -223,7 +224,9 @@ def is_storage_initialized(self) -> bool: def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob: # skip the state table, we create a jsonl file in the complete_load step - if table["name"] == self.schema.state_table_name: + # this does not apply to scenarios where we are using filesystem as staging + # where we want to load the state the regular way + if table["name"] == self.schema.state_table_name and not self.config.as_staging: return DoNothingJob(file_path) cls = FollowupFilesystemJob if self.config.as_staging else LoadFilesystemJob @@ -299,6 +302,9 @@ def _get_state_file_name(self, pipeline_name: str, version_hash: str, load_id: s return f"{self.dataset_path}/{self.schema.state_table_name}/{pipeline_name}__{load_id}__{self._to_path_safe_string(version_hash)}.jsonl" def _store_current_state(self, load_id: str) -> None: + # don't save the state this way when used as staging + if self.config.as_staging: + return # get state doc from current pipeline from dlt.pipeline.current import load_package_source_state from dlt.pipeline.state_sync import LOAD_PACKAGE_STATE_KEY diff --git a/tests/load/filesystem/test_filesystem_client.py b/tests/load/filesystem/test_filesystem_client.py index 1cc0b61856..9ec1d88541 100644 --- a/tests/load/filesystem/test_filesystem_client.py +++ b/tests/load/filesystem/test_filesystem_client.py @@ -130,9 +130,11 @@ def test_replace_write_disposition(layout: str, default_buckets_env: str) -> Non client.dataset_path, detail=False, refresh=True ): # remove internal paths - if "_dlt" in basedir: + if "_dlt" in basedir or "init" in basedir: continue for f in files: + if f == "init": + continue paths.append(posixpath.join(basedir, f)) ls = set(paths) assert ls == {job_2_load_1_path, job_1_load_2_path} @@ -173,5 +175,7 @@ def test_append_write_disposition(layout: str, default_buckets_env: str) -> None if "_dlt" in basedir: continue for f in files: + if f == "init": + continue paths.append(posixpath.join(basedir, f)) assert list(sorted(paths)) == expected_files From c40660082ebffe7b9e83214602bc2a9cde7790b2 Mon Sep 17 00:00:00 2001 From: Dave Date: Tue, 16 Apr 2024 11:17:22 +0200 Subject: [PATCH 17/27] update filesystem docs --- .../docs/dlt-ecosystem/destinations/filesystem.md | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/docs/website/docs/dlt-ecosystem/destinations/filesystem.md b/docs/website/docs/dlt-ecosystem/destinations/filesystem.md index a8b2b084b9..6fe47bbc6d 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/filesystem.md +++ b/docs/website/docs/dlt-ecosystem/destinations/filesystem.md @@ -238,8 +238,7 @@ to work, you have to - have a separator after the table_name placeholder Please note: -- `dlt` will not dump the current schema content to the bucket -- `dlt` will mark complete loads by creating an empty file that corresponds to `_dlt_loads` table. For example, if `chess._dlt_loads.1685299832` file is present in dataset folders, you can be sure that all files for the load package `1685299832` are completely loaded +- `dlt` will mark complete loads by creating a json file in the `./_dlt_loads` folders that corresponds to the`_dlt_loads` table. For example, if `chess__1685299832.jsonl` file is present in the loads folder, you can be sure that all files for the load package `1685299832` are completely loaded ## Supported file formats You can choose the following file formats: @@ -249,7 +248,13 @@ You can choose the following file formats: ## Syncing of `dlt` state -This destination does not support restoring the `dlt` state. You can change that by requesting the [feature](https://github.com/dlt-hub/dlt/issues/new/choose) or contributing to the core library 😄 -You can however easily [backup and restore the pipeline working folder](https://gist.github.com/rudolfix/ee6e16d8671f26ac4b9ffc915ad24b6e) - reusing the bucket and credentials used to store files. +This destination fully supports [dlt state sync](../../general-usage/state#syncing-state-with-destination). To this end, special folders and files that will be created at your destination which hold information about your pipeline state, schemas and completed loads. These folders DO NOT respect your +settings in the layout section. When using filesystem as a staging destination, not all of these folders are created, as the state and schemas are +managed in the regular way by the final destination you have configured. + +You will also notice `init` files being present in the root folder and the special `dlt` folders. In the absence of the concepts of schemas and tables +in blob storages and directories, `dlt` uses these special files to harmonize the behavior of the `filesystem` destination with the other implemented destinations. + + From f0635b2076887e52f1a4b243d4ff5f1fc9838254 Mon Sep 17 00:00:00 2001 From: Dave Date: Tue, 16 Apr 2024 11:34:31 +0200 Subject: [PATCH 18/27] fix incoming tests of placeholders --- tests/load/pipeline/test_filesystem_pipeline.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tests/load/pipeline/test_filesystem_pipeline.py b/tests/load/pipeline/test_filesystem_pipeline.py index f83b846f78..63c90d91ef 100644 --- a/tests/load/pipeline/test_filesystem_pipeline.py +++ b/tests/load/pipeline/test_filesystem_pipeline.py @@ -248,6 +248,9 @@ def count(*args, **kwargs) -> Any: expected_files = set() known_files = set() for basedir, _dirs, files in client.fs_client.walk(client.dataset_path): # type: ignore[attr-defined] + # strip out special tables + if "_dlt" in basedir: + continue for file in files: if file.endswith("jsonl"): expected_files.add(os.path.join(basedir, file)) @@ -255,6 +258,9 @@ def count(*args, **kwargs) -> Any: for load_package in load_info.load_packages: for load_info in load_package.jobs["completed_jobs"]: # type: ignore[assignment] job_info = ParsedLoadJobFileName.parse(load_info.file_path) # type: ignore[attr-defined] + # state file gets loaded a differentn way + if job_info.table_name == "_dlt_pipeline_state": + continue path = create_path( layout, file_name=job_info.file_name(), @@ -270,6 +276,7 @@ def count(*args, **kwargs) -> Any: known_files.add(full_path) assert expected_files == known_files + assert known_files # 6 is because simple_row contains two rows # and in this test scenario we have 3 callbacks assert call_count >= 6 From bdaf094d4d3d33ea1ea2e188cd12f53b73d1acba Mon Sep 17 00:00:00 2001 From: Dave Date: Tue, 16 Apr 2024 12:11:27 +0200 Subject: [PATCH 19/27] small fixes --- dlt/destinations/impl/filesystem/filesystem.py | 3 +-- tests/load/pipeline/test_filesystem_pipeline.py | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index f79faff903..e6fc1de83f 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -372,9 +372,8 @@ def _get_stored_schema_by_hash_or_newest( break if selected_path: - print("got state") return StorageSchemaInfo(**json.loads(self.fs_client.read_text(selected_path))) - print("no state") + return None def _store_current_schema(self, load_id: str) -> None: diff --git a/tests/load/pipeline/test_filesystem_pipeline.py b/tests/load/pipeline/test_filesystem_pipeline.py index 63c90d91ef..9330503985 100644 --- a/tests/load/pipeline/test_filesystem_pipeline.py +++ b/tests/load/pipeline/test_filesystem_pipeline.py @@ -252,7 +252,7 @@ def count(*args, **kwargs) -> Any: if "_dlt" in basedir: continue for file in files: - if file.endswith("jsonl"): + if ".jsonl" in file: expected_files.add(os.path.join(basedir, file)) for load_package in load_info.load_packages: @@ -272,7 +272,7 @@ def count(*args, **kwargs) -> Any: ) full_path = os.path.join(client.dataset_path, path) # type: ignore[attr-defined] assert os.path.exists(full_path) - if full_path.endswith("jsonl"): + if ".jsonl" in full_path: known_files.add(full_path) assert expected_files == known_files From a09f896289dc5dbf8d1d0007f7cee5307849f1b1 Mon Sep 17 00:00:00 2001 From: Dave Date: Tue, 16 Apr 2024 13:52:56 +0200 Subject: [PATCH 20/27] adds some tests for filesystem state also fixes table count loading to work for all bucket destinations --- .../impl/filesystem/filesystem.py | 3 + .../load/pipeline/test_filesystem_pipeline.py | 149 +++++++++++++++++- tests/pipeline/utils.py | 47 +++--- 3 files changed, 175 insertions(+), 24 deletions(-) diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index e6fc1de83f..8c4c0ca5b1 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -313,6 +313,9 @@ def _store_current_state(self, load_id: str) -> None: if not doc: return + # this is not done in other destinations... + doc["dlt_load_id"] = load_id + # get paths pipeline_name = doc["pipeline_name"] hash_path = self._get_state_file_name( diff --git a/tests/load/pipeline/test_filesystem_pipeline.py b/tests/load/pipeline/test_filesystem_pipeline.py index 9330503985..37afbcb07a 100644 --- a/tests/load/pipeline/test_filesystem_pipeline.py +++ b/tests/load/pipeline/test_filesystem_pipeline.py @@ -2,7 +2,7 @@ import os import posixpath from pathlib import Path -from typing import Any, Callable +from typing import Any, Callable, List, Dict, cast import dlt import pytest @@ -19,6 +19,8 @@ from tests.common.utils import load_json_case from tests.utils import ALL_TEST_DATA_ITEM_FORMATS, TestDataItemFormat, skip_if_not_active from dlt.destinations.path_utils import create_path +from tests.load.pipeline.utils import destinations_configs, DestinationTestConfiguration +from tests.load.pipeline.utils import load_table_counts skip_if_not_active("filesystem") @@ -280,3 +282,148 @@ def count(*args, **kwargs) -> Any: # 6 is because simple_row contains two rows # and in this test scenario we have 3 callbacks assert call_count >= 6 + + +@pytest.mark.parametrize( + "destination_config", + destinations_configs(all_buckets_filesystem_configs=True), + ids=lambda x: x.name, +) +def test_state_files(destination_config: DestinationTestConfiguration) -> None: + def _collect_files(p) -> List[str]: + client = p.destination_client() + found = [] + for basedir, _dirs, files in client.fs_client.walk(client.dataset_path): + for file in files: + found.append(os.path.join(basedir, file).replace(client.dataset_path, "")) + return found + + def _collect_table_counts(p) -> Dict[str, int]: + return load_table_counts( + p, "items", "items2", "items3", "_dlt_loads", "_dlt_version", "_dlt_pipeline_state" + ) + + # generate 4 loads from 2 pipelines, store load ids + p1 = destination_config.setup_pipeline("p1", dataset_name="layout_test") + p2 = destination_config.setup_pipeline("p2", dataset_name="layout_test") + c1 = cast(FilesystemClient, p1.destination_client()) + c2 = cast(FilesystemClient, p2.destination_client()) + + # first two loads + p1.run([1, 2, 3], table_name="items").loads_ids[0] + load_id_2_1 = p2.run([4, 5, 6], table_name="items").loads_ids[0] + assert _collect_table_counts(p1) == { + "items": 6, + "_dlt_loads": 2, + "_dlt_pipeline_state": 2, + "_dlt_version": 2, + } + sc1_old = c1.get_stored_schema() + sc2_old = c2.get_stored_schema() + s1_old = c1.get_stored_state("p1") + s2_old = c1.get_stored_state("p2") + + created_files = _collect_files(p1) + # 4 init files, 2 item files, 2 load files, 2 state files, 2 version files + assert len(created_files) == 12 + + # second two loads + @dlt.resource(table_name="items2") + def some_data(): + dlt.current.resource_state()["state"] = {"some": "state"} + yield from [1, 2, 3] + + load_id_1_2 = p1.run(some_data(), table_name="items2").loads_ids[ + 0 + ] # force state and migration bump here + p2.run([4, 5, 6], table_name="items").loads_ids[0] # no migration here + + # 4 loads for 2 pipelines, one schema and state change on p2 changes so 3 versions and 3 states + assert _collect_table_counts(p1) == { + "items": 9, + "items2": 3, + "_dlt_loads": 4, + "_dlt_pipeline_state": 3, + "_dlt_version": 3, + } + + # test accessors for state + s1 = c1.get_stored_state("p1") + s2 = c1.get_stored_state("p2") + assert s1.dlt_load_id == load_id_1_2 # second load + assert s2.dlt_load_id == load_id_2_1 # first load + assert s1_old.version != s1.version + assert s2_old.version == s2.version + + # test accessors for schema + sc1 = c1.get_stored_schema() + sc2 = c2.get_stored_schema() + assert sc1.version_hash != sc1_old.version_hash + assert sc2.version_hash == sc2_old.version_hash + assert sc1.version_hash != sc2.version_hash + + assert not c1.get_stored_schema_by_hash("blah") + assert c2.get_stored_schema_by_hash(sc1_old.version_hash) + + created_files = _collect_files(p1) + # 4 init files, 4 item files, 4 load files, 3 state files, 3 version files + assert len(created_files) == 18 + + # drop it + p1.destination_client().drop_storage() + created_files = _collect_files(p1) + assert len(created_files) == 0 + + +@pytest.mark.parametrize( + "destination_config", + destinations_configs(all_buckets_filesystem_configs=True), + ids=lambda x: x.name, +) +def test_knows_dataset_state(destination_config: DestinationTestConfiguration) -> None: + # check if pipeline knows initializisation state of dataset + p1 = destination_config.setup_pipeline("p1", dataset_name="layout_test") + assert not p1.destination_client().is_storage_initialized() + p1.run([1, 2, 3], table_name="items") + assert p1.destination_client().is_storage_initialized() + p1.destination_client().drop_storage() + assert not p1.destination_client().is_storage_initialized() + + +@pytest.mark.parametrize( + "destination_config", + destinations_configs(all_buckets_filesystem_configs=True), + ids=lambda x: x.name, +) +@pytest.mark.parametrize("restore", [True, False]) +def test_simple_incremental( + destination_config: DestinationTestConfiguration, + restore: bool, +) -> None: + os.environ["RESTORE_FROM_DESTINATION"] = str(restore) + + p = destination_config.setup_pipeline("p1", dataset_name="incremental_test") + + @dlt.resource(name="items") + def my_resource(prim_key=dlt.sources.incremental("id")): + yield from [ + {"id": 1}, + {"id": 2}, + ] + + @dlt.resource(name="items") + def my_resource_inc(prim_key=dlt.sources.incremental("id")): + yield from [ + {"id": 1}, + {"id": 2}, + {"id": 3}, + {"id": 4}, + ] + + p.run(my_resource) + p._wipe_working_folder() + + p = destination_config.setup_pipeline("p1", dataset_name="incremental_test") + p.run(my_resource_inc) + + assert load_table_counts(p, "items") == {"items": 4 if restore else 6} diff --git a/tests/pipeline/utils.py b/tests/pipeline/utils.py index 99312bf5dd..bb5cbd3930 100644 --- a/tests/pipeline/utils.py +++ b/tests/pipeline/utils.py @@ -80,7 +80,7 @@ def assert_data_table_counts(p: dlt.Pipeline, expected_counts: DictStrAny) -> No ), f"Table counts do not match, expected {expected_counts}, got {table_counts}" -def load_file(path: str, file: str) -> Tuple[str, List[Dict[str, Any]]]: +def load_file(fs_client, path: str, file: str) -> Tuple[str, List[Dict[str, Any]]]: """ util function to load a filesystem destination file and return parsed content values may not be cast to the right type, especially for insert_values, please @@ -100,17 +100,18 @@ def load_file(path: str, file: str) -> Tuple[str, List[Dict[str, Any]]]: # load jsonl if ext == "jsonl": - with open(full_path, "rU", encoding="utf-8") as f: - for line in f: + file_text = fs_client.read_text(full_path) + for line in file_text.split("\n"): + if line: result.append(json.loads(line)) # load insert_values (this is a bit volatile if the exact format of the source file changes) elif ext == "insert_values": - with open(full_path, "rU", encoding="utf-8") as f: - lines = f.readlines() - # extract col names - cols = lines[0][15:-2].split(",") - for line in lines[2:]: + file_text = fs_client.read_text(full_path) + lines = file_text.split("\n") + cols = lines[0][15:-2].split(",") + for line in lines[2:]: + if line: values = line[1:-3].split(",") result.append(dict(zip(cols, values))) @@ -118,20 +119,20 @@ def load_file(path: str, file: str) -> Tuple[str, List[Dict[str, Any]]]: elif ext == "parquet": import pyarrow.parquet as pq - with open(full_path, "rb") as f: - table = pq.read_table(f) - cols = table.column_names - count = 0 - for column in table: - column_name = cols[count] - item_count = 0 - for item in column.to_pylist(): - if len(result) <= item_count: - result.append({column_name: item}) - else: - result[item_count][column_name] = item - item_count += 1 - count += 1 + file_bytes = fs_client.read_bytes() + table = pq.read_table(file_bytes) + cols = table.column_names + count = 0 + for column in table: + column_name = cols[count] + item_count = 0 + for item in column.to_pylist(): + if len(result) <= item_count: + result.append({column_name: item}) + else: + result[item_count][column_name] = item + item_count += 1 + count += 1 return table_name, result @@ -144,7 +145,7 @@ def load_files(p: dlt.Pipeline, *table_names: str) -> Dict[str, List[Dict[str, A client.dataset_path, detail=False, refresh=True ): for file in files: - table_name, items = load_file(basedir, file) + table_name, items = load_file(client.fs_client, basedir, file) if table_name not in table_names: continue if table_name in result: From fce47c6c03a7d8e914f7f1d4f03636cdfb18a8e3 Mon Sep 17 00:00:00 2001 From: Dave Date: Tue, 16 Apr 2024 15:08:44 +0200 Subject: [PATCH 21/27] fix test helper --- tests/pipeline/utils.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/pipeline/utils.py b/tests/pipeline/utils.py index bb5cbd3930..036154b582 100644 --- a/tests/pipeline/utils.py +++ b/tests/pipeline/utils.py @@ -3,6 +3,7 @@ import pytest import random from os import environ +import io import dlt from dlt.common import json, sleep @@ -119,8 +120,8 @@ def load_file(fs_client, path: str, file: str) -> Tuple[str, List[Dict[str, Any] elif ext == "parquet": import pyarrow.parquet as pq - file_bytes = fs_client.read_bytes() - table = pq.read_table(file_bytes) + file_bytes = fs_client.read_bytes(full_path) + table = pq.read_table(io.BytesIO(file_bytes)) cols = table.column_names count = 0 for column in table: From 0d5423cf83f7d6beccceffb5ce4ed5c10e5dc104 Mon Sep 17 00:00:00 2001 From: Dave Date: Tue, 16 Apr 2024 15:56:54 +0200 Subject: [PATCH 22/27] save schema with timestamp instead of load_id --- dlt/common/destination/reference.py | 2 -- dlt/destinations/impl/destination/destination.py | 3 +-- dlt/destinations/impl/dummy/dummy.py | 3 +-- dlt/destinations/impl/filesystem/filesystem.py | 10 +++++----- dlt/destinations/impl/qdrant/qdrant_client.py | 3 +-- dlt/destinations/impl/weaviate/weaviate_client.py | 3 +-- dlt/destinations/job_client_impl.py | 3 +-- dlt/load/load.py | 2 -- dlt/load/utils.py | 7 ++----- 9 files changed, 12 insertions(+), 24 deletions(-) diff --git a/dlt/common/destination/reference.py b/dlt/common/destination/reference.py index 738a11c069..5422414cf3 100644 --- a/dlt/common/destination/reference.py +++ b/dlt/common/destination/reference.py @@ -274,7 +274,6 @@ def drop_storage(self) -> None: def update_stored_schema( self, - load_id: str = None, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None, ) -> Optional[TSchemaTables]: @@ -284,7 +283,6 @@ def update_stored_schema( destination has single writer and no other processes modify the schema. Args: - load_id (str, optional): Load id during which the schema is updated only_tables (Sequence[str], optional): Updates only listed tables. Defaults to None. expected_update (TSchemaTables, optional): Update that is expected to be applied to the destination Returns: diff --git a/dlt/destinations/impl/destination/destination.py b/dlt/destinations/impl/destination/destination.py index 4a526720d2..69d1d1d98a 100644 --- a/dlt/destinations/impl/destination/destination.py +++ b/dlt/destinations/impl/destination/destination.py @@ -48,11 +48,10 @@ def drop_storage(self) -> None: def update_stored_schema( self, - load_id: str = None, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None, ) -> Optional[TSchemaTables]: - return super().update_stored_schema(load_id, only_tables, expected_update) + return super().update_stored_schema(only_tables, expected_update) def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob: # skip internal tables and remove columns from schema if so configured diff --git a/dlt/destinations/impl/dummy/dummy.py b/dlt/destinations/impl/dummy/dummy.py index 4650a7651c..16affbc164 100644 --- a/dlt/destinations/impl/dummy/dummy.py +++ b/dlt/destinations/impl/dummy/dummy.py @@ -127,11 +127,10 @@ def drop_storage(self) -> None: def update_stored_schema( self, - load_id: str = None, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None, ) -> Optional[TSchemaTables]: - applied_update = super().update_stored_schema(load_id, only_tables, expected_update) + applied_update = super().update_stored_schema(only_tables, expected_update) if self.config.fail_schema_update: raise DestinationTransientException( "Raise on schema update due to fail_schema_update config flag" diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index 8c4c0ca5b1..8edfa7f744 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -10,7 +10,7 @@ import re import dlt -from dlt.common import logger +from dlt.common import logger, time from dlt.common.schema import Schema, TSchemaTables, TTableSchema from dlt.common.storages import FileStorage, fsspec_from_config from dlt.common.destination import DestinationCapabilitiesContext @@ -183,7 +183,6 @@ def initialize_storage(self, truncate_tables: Iterable[str] = None) -> None: def update_stored_schema( self, - load_id: str = None, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None, ) -> TSchemaTables: @@ -198,7 +197,7 @@ def update_stored_schema( # don't store schema when used as staging if not self.config.as_staging: - self._store_current_schema(load_id or "1") + self._store_current_schema() return expected_update @@ -379,14 +378,15 @@ def _get_stored_schema_by_hash_or_newest( return None - def _store_current_schema(self, load_id: str) -> None: + def _store_current_schema(self) -> None: # check if schema with hash exists current_hash = self.schema.stored_version_hash if self._get_stored_schema_by_hash_or_newest(current_hash): return # get paths - filepath = self._get_schema_file_name(self.schema.stored_version_hash, load_id) + schema_id = str(time.precise_time()) + filepath = self._get_schema_file_name(self.schema.stored_version_hash, schema_id) # TODO: duplicate of weaviate implementation, should be abstracted out version_info = { diff --git a/dlt/destinations/impl/qdrant/qdrant_client.py b/dlt/destinations/impl/qdrant/qdrant_client.py index 970580eb51..9898b28c86 100644 --- a/dlt/destinations/impl/qdrant/qdrant_client.py +++ b/dlt/destinations/impl/qdrant/qdrant_client.py @@ -284,11 +284,10 @@ def _delete_sentinel_collection(self) -> None: def update_stored_schema( self, - load_id: str = None, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None, ) -> Optional[TSchemaTables]: - super().update_stored_schema(load_id, only_tables, expected_update) + super().update_stored_schema(only_tables, expected_update) applied_update: TSchemaTables = {} schema_info = self.get_stored_schema_by_hash(self.schema.stored_version_hash) if schema_info is None: diff --git a/dlt/destinations/impl/weaviate/weaviate_client.py b/dlt/destinations/impl/weaviate/weaviate_client.py index 3bbc2a8c7d..2d75ca0809 100644 --- a/dlt/destinations/impl/weaviate/weaviate_client.py +++ b/dlt/destinations/impl/weaviate/weaviate_client.py @@ -425,11 +425,10 @@ def _delete_sentinel_class(self) -> None: @wrap_weaviate_error def update_stored_schema( self, - load_id: str = None, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None, ) -> Optional[TSchemaTables]: - super().update_stored_schema(load_id, only_tables, expected_update) + super().update_stored_schema(only_tables, expected_update) # Retrieve the schema from Weaviate applied_update: TSchemaTables = {} try: diff --git a/dlt/destinations/job_client_impl.py b/dlt/destinations/job_client_impl.py index cfb95b0af2..5838ab2ab7 100644 --- a/dlt/destinations/job_client_impl.py +++ b/dlt/destinations/job_client_impl.py @@ -181,11 +181,10 @@ def is_storage_initialized(self) -> bool: def update_stored_schema( self, - load_id: str = None, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None, ) -> Optional[TSchemaTables]: - super().update_stored_schema(load_id, only_tables, expected_update) + super().update_stored_schema(only_tables, expected_update) applied_update: TSchemaTables = {} schema_info = self.get_stored_schema_by_hash(self.schema.stored_version_hash) if schema_info is None: diff --git a/dlt/load/load.py b/dlt/load/load.py index 7bedb3dfa6..66ddb1c308 100644 --- a/dlt/load/load.py +++ b/dlt/load/load.py @@ -375,7 +375,6 @@ def load_single_package(self, load_id: str, schema: Schema) -> None: if isinstance(job_client, WithStagingDataset) else None ), - load_id=load_id, ) # init staging client @@ -392,7 +391,6 @@ def load_single_package(self, load_id: str, schema: Schema) -> None: expected_update, job_client.should_truncate_table_before_load_on_staging_destination, job_client.should_load_data_to_staging_dataset_on_staging_destination, - load_id=load_id, ) self.load_storage.commit_schema_update(load_id, applied_update) diff --git a/dlt/load/utils.py b/dlt/load/utils.py index 1635d21efe..067ae33613 100644 --- a/dlt/load/utils.py +++ b/dlt/load/utils.py @@ -66,7 +66,6 @@ def init_client( expected_update: TSchemaTables, truncate_filter: Callable[[TTableSchema], bool], load_staging_filter: Callable[[TTableSchema], bool], - load_id: str = None, ) -> TSchemaTables: """Initializes destination storage including staging dataset if supported @@ -98,7 +97,7 @@ def init_client( ) applied_update = _init_dataset_and_update_schema( - job_client, expected_update, tables_with_jobs | dlt_tables, truncate_tables, load_id=load_id + job_client, expected_update, tables_with_jobs | dlt_tables, truncate_tables ) # update the staging dataset if client supports this @@ -118,7 +117,6 @@ def init_client( staging_tables | {schema.version_table_name}, # keep only schema version staging_tables, # all eligible tables must be also truncated staging_info=True, - load_id=load_id, ) return applied_update @@ -130,7 +128,6 @@ def _init_dataset_and_update_schema( update_tables: Iterable[str], truncate_tables: Iterable[str] = None, staging_info: bool = False, - load_id: str = None, ) -> TSchemaTables: staging_text = "for staging dataset" if staging_info else "" logger.info( @@ -143,7 +140,7 @@ def _init_dataset_and_update_schema( f" {staging_text}" ) applied_update = job_client.update_stored_schema( - load_id=load_id, only_tables=update_tables, expected_update=expected_update + only_tables=update_tables, expected_update=expected_update ) logger.info( f"Client for {job_client.config.destination_type} will truncate tables {staging_text}" From b2b591303c9a650644b11da4017f1af257a22489 Mon Sep 17 00:00:00 2001 From: Dave Date: Wed, 17 Apr 2024 12:03:09 +0200 Subject: [PATCH 23/27] pr fixes and move pipeline state saving to committing of extracted packages --- dlt/common/storages/load_package.py | 22 ++++++---- .../impl/filesystem/filesystem.py | 43 ++++++++++++------- dlt/extract/__init__.py | 2 +- dlt/extract/extract.py | 9 +++- dlt/extract/extractors.py | 9 +--- dlt/extract/resource.py | 16 ------- dlt/pipeline/current.py | 1 - dlt/pipeline/mark.py | 1 - dlt/pipeline/pipeline.py | 11 ++--- dlt/pipeline/state_sync.py | 16 ++++--- .../load/filesystem/test_filesystem_client.py | 7 +-- .../load/pipeline/test_filesystem_pipeline.py | 7 ++- 12 files changed, 75 insertions(+), 69 deletions(-) diff --git a/dlt/common/storages/load_package.py b/dlt/common/storages/load_package.py index 6d4f4a0521..1752039775 100644 --- a/dlt/common/storages/load_package.py +++ b/dlt/common/storages/load_package.py @@ -54,15 +54,27 @@ """Loader file formats with internal job types""" +class TPipelineStateDoc(TypedDict, total=False): + """Corresponds to the StateInfo Tuple""" + + version: int + engine_version: int + pipeline_name: str + state: str + version_hash: str + created_at: datetime.datetime + dlt_load_id: NotRequired[str] + + class TLoadPackageState(TVersionedState, total=False): created_at: DateTime """Timestamp when the load package was created""" + pipeline_state: NotRequired[TPipelineStateDoc] + """Pipeline state, added at the end of the extraction phase""" """A section of state that does not participate in change merging and version control""" destination_state: NotRequired[Dict[str, Any]] """private space for destinations to store state relevant only to the load package""" - source_state: NotRequired[Dict[str, Any]] - """private space for source to store state relevant only to the load package, currently used for storing pipeline state""" class TLoadPackage(TypedDict, total=False): @@ -691,12 +703,6 @@ def destination_state() -> DictStrAny: return lp["state"].setdefault("destination_state", {}) -def load_package_source_state() -> DictStrAny: - """Get segment of load package state that is specific to the sources.""" - lp = load_package() - return lp["state"].setdefault("source_state", {}) - - def clear_destination_state(commit: bool = True) -> None: """Clear segment of load package state that is specific to the current destination. Optionally commit to load package.""" lp = load_package() diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index 8edfa7f744..5dae4bf295 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -1,7 +1,8 @@ import posixpath import os +import base64 from types import TracebackType -from typing import ClassVar, List, Type, Iterable, Set, Iterator, Optional, Tuple +from typing import ClassVar, List, Type, Iterable, Set, Iterator, Optional, Tuple, cast from fsspec import AbstractFileSystem from contextlib import contextmanager from dlt.common import json, pendulum @@ -35,6 +36,7 @@ INIT_FILE_NAME = "init" +FILENAME_SEPARATOR = "__" class LoadFilesystemJob(LoadJob): @@ -261,14 +263,15 @@ def _write_to_json_file(self, filepath: str, data: DictStrAny) -> None: self.fs_client.write_text(filepath, json.dumps(data), "utf-8") def _to_path_safe_string(self, s: str) -> str: - return "".join([c for c in s if re.match(r"\w", c)]) if s else None + """for base64 strings""" + return base64.b64decode(s).hex() if s else None def _list_dlt_dir(self, dirname: str) -> Iterator[Tuple[str, List[str]]]: if not self.fs_client.exists(posixpath.join(dirname, INIT_FILE_NAME)): raise DestinationUndefinedEntity({"dir": dirname}) - for filepath in self.fs_client.listdir(dirname, detail=False): + for filepath in self.fs_client.ls(dirname, detail=False, refresh=True): filename = os.path.splitext(os.path.basename(filepath))[0] - fileparts = filename.split("__") + fileparts = filename.split(FILENAME_SEPARATOR) if len(fileparts) != 3: continue yield filepath, fileparts @@ -283,7 +286,11 @@ def _store_load(self, load_id: str) -> None: "inserted_at": pendulum.now().isoformat(), "schema_version_hash": self.schema.version_hash, } - filepath = f"{self.dataset_path}/{self.schema.loads_table_name}/{self.schema.name}__{load_id}.jsonl" + filepath = posixpath.join( + self.dataset_path, + self.schema.loads_table_name, + f"{self.schema.name}{FILENAME_SEPARATOR}{load_id}.jsonl", + ) self._write_to_json_file(filepath, load_data) def complete_load(self, load_id: str) -> None: @@ -297,32 +304,32 @@ def complete_load(self, load_id: str) -> None: def _get_state_file_name(self, pipeline_name: str, version_hash: str, load_id: str) -> str: """gets full path for schema file for a given hash""" - return f"{self.dataset_path}/{self.schema.state_table_name}/{pipeline_name}__{load_id}__{self._to_path_safe_string(version_hash)}.jsonl" + return posixpath.join( + self.dataset_path, + self.schema.state_table_name, + f"{pipeline_name}{FILENAME_SEPARATOR}{load_id}{FILENAME_SEPARATOR}{self._to_path_safe_string(version_hash)}.jsonl", + ) def _store_current_state(self, load_id: str) -> None: # don't save the state this way when used as staging if self.config.as_staging: return # get state doc from current pipeline - from dlt.pipeline.current import load_package_source_state - from dlt.pipeline.state_sync import LOAD_PACKAGE_STATE_KEY + from dlt.pipeline.current import load_package - doc = load_package_source_state().get(LOAD_PACKAGE_STATE_KEY, {}) + pipeline_state_doc = load_package()["state"].get("pipeline_state") - if not doc: + if not pipeline_state_doc: return - # this is not done in other destinations... - doc["dlt_load_id"] = load_id - # get paths - pipeline_name = doc["pipeline_name"] + pipeline_name = pipeline_state_doc["pipeline_name"] hash_path = self._get_state_file_name( pipeline_name, self.schema.stored_version_hash, load_id ) # write - self._write_to_json_file(hash_path, doc) + self._write_to_json_file(hash_path, cast(DictStrAny, pipeline_state_doc)) def get_stored_state(self, pipeline_name: str) -> Optional[StateInfo]: # get base dir @@ -350,7 +357,11 @@ def get_stored_state(self, pipeline_name: str) -> Optional[StateInfo]: def _get_schema_file_name(self, version_hash: str, load_id: str) -> str: """gets full path for schema file for a given hash""" - return f"{self.dataset_path}/{self.schema.version_table_name}/{self.schema.name}__{load_id}__{self._to_path_safe_string(version_hash)}.jsonl" + return posixpath.join( + self.dataset_path, + self.schema.version_table_name, + f"{self.schema.name}{FILENAME_SEPARATOR}{load_id}{FILENAME_SEPARATOR}{self._to_path_safe_string(version_hash)}.jsonl", + ) def _get_stored_schema_by_hash_or_newest( self, version_hash: str = None diff --git a/dlt/extract/__init__.py b/dlt/extract/__init__.py index 7e0dd3d0fc..03b2e59539 100644 --- a/dlt/extract/__init__.py +++ b/dlt/extract/__init__.py @@ -1,4 +1,4 @@ -from dlt.extract.resource import DltResource, with_table_name, with_hints, with_package_state +from dlt.extract.resource import DltResource, with_table_name, with_hints from dlt.extract.hints import make_hints from dlt.extract.source import DltSource from dlt.extract.decorators import source, resource, transformer, defer diff --git a/dlt/extract/extract.py b/dlt/extract/extract.py index f07c4ccbc6..159a5e7c23 100644 --- a/dlt/extract/extract.py +++ b/dlt/extract/extract.py @@ -17,6 +17,7 @@ WithStepInfo, reset_resource_state, ) +from dlt.common.typing import DictStrAny from dlt.common.runtime import signals from dlt.common.runtime.collector import Collector, NULL_COLLECTOR from dlt.common.schema import Schema, utils @@ -30,6 +31,7 @@ from dlt.common.storages.load_package import ( ParsedLoadJobFileName, LoadPackageStateInjectableContext, + TPipelineStateDoc, ) @@ -400,10 +402,13 @@ def extract( ) return load_id - def commit_packages(self) -> None: - """Commits all extracted packages to normalize storage""" + def commit_packages(self, pipline_state_doc: TPipelineStateDoc = None) -> None: + """Commits all extracted packages to normalize storage, and adds the pipeline state to the load package""" # commit load packages for load_id, metrics in self._load_id_metrics.items(): + package_state = self.extract_storage.new_packages.get_load_package_state(load_id) + package_state["pipeline_state"] = pipline_state_doc + self.extract_storage.new_packages.save_load_package_state(load_id, package_state) self.extract_storage.commit_new_load_package( load_id, self.schema_storage[metrics[0]["schema_name"]] ) diff --git a/dlt/extract/extractors.py b/dlt/extract/extractors.py index 86888fed0a..b4afc5b1f8 100644 --- a/dlt/extract/extractors.py +++ b/dlt/extract/extractors.py @@ -19,7 +19,7 @@ TPartialTableSchema, ) from dlt.extract.hints import HintsMeta -from dlt.extract.resource import DltResource, LoadPackageStateMeta +from dlt.extract.resource import DltResource from dlt.extract.items import TableNameMeta from dlt.extract.storage import ExtractorItemStorage @@ -88,13 +88,6 @@ def write_items(self, resource: DltResource, items: TDataItems, meta: Any) -> No meta = TableNameMeta(meta.hints["name"]) # type: ignore[arg-type] self._reset_contracts_cache() - # if we have a load package state meta, store to load package - if isinstance(meta, LoadPackageStateMeta): - from dlt.pipeline.current import load_package_source_state, commit_load_package_state - - load_package_source_state()[meta.state_key_name] = items - commit_load_package_state() - if table_name := self._get_static_table_name(resource, meta): # write item belonging to table with static name self._write_to_static_table(resource, table_name, items, meta) diff --git a/dlt/extract/resource.py b/dlt/extract/resource.py index 87e4fd7f76..4776158bbb 100644 --- a/dlt/extract/resource.py +++ b/dlt/extract/resource.py @@ -76,22 +76,6 @@ def with_hints( return DataItemWithMeta(HintsMeta(hints, create_table_variant), item) -class LoadPackageStateMeta: - __slots__ = "state_key_name" - - def __init__(self, state_key_name: str) -> None: - self.state_key_name = state_key_name - - -def with_package_state(item: TDataItems, state_key_name: str) -> DataItemWithMeta: - """Marks `item` to also be inserted into the package state. - - Will create a separate variant of hints for a table if `name` is provided in `hints` and `create_table_variant` is set. - - """ - return DataItemWithMeta(LoadPackageStateMeta(state_key_name), item) - - class DltResource(Iterable[TDataItem], DltResourceHints): """Implements dlt resource. Contains a data pipe that wraps a generating item and table schema that can be adjusted""" diff --git a/dlt/pipeline/current.py b/dlt/pipeline/current.py index 4bbe74a123..25fd398623 100644 --- a/dlt/pipeline/current.py +++ b/dlt/pipeline/current.py @@ -7,7 +7,6 @@ load_package, commit_load_package_state, destination_state, - load_package_source_state, clear_destination_state, ) from dlt.extract.decorators import get_source_schema, get_source diff --git a/dlt/pipeline/mark.py b/dlt/pipeline/mark.py index 0b753539be..3956d9bbe2 100644 --- a/dlt/pipeline/mark.py +++ b/dlt/pipeline/mark.py @@ -3,6 +3,5 @@ with_table_name, with_hints, make_hints, - with_package_state, materialize_schema_item as materialize_table_schema, ) diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index 5d719882a5..2b6d5c7a85 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -138,6 +138,7 @@ mark_state_extracted, migrate_pipeline_state, state_resource, + state_doc, default_pipeline_state, ) from dlt.pipeline.warnings import credentials_argument_deprecated @@ -427,13 +428,13 @@ def extract( raise SourceExhausted(source.name) self._extract_source(extract_step, source, max_parallel_items, workers) # extract state + state = None if self.config.restore_from_destination: # this will update state version hash so it will not be extracted again by with_state_sync - self._bump_version_and_extract_state( - self._container[StateInjectableContext].state, True, extract_step - ) - # commit load packages - extract_step.commit_packages() + state = self._container[StateInjectableContext].state + self._bump_version_and_extract_state(state, True, extract_step) + # commit load packages with state + extract_step.commit_packages(state_doc(state) if state else None) return self._get_step_info(extract_step) except Exception as exc: # emit step info diff --git a/dlt/pipeline/state_sync.py b/dlt/pipeline/state_sync.py index 70a58d1f98..e5d937d05a 100644 --- a/dlt/pipeline/state_sync.py +++ b/dlt/pipeline/state_sync.py @@ -5,7 +5,7 @@ from dlt.common.pendulum import pendulum from dlt.common.typing import DictStrAny from dlt.common.schema.typing import STATE_TABLE_NAME, TTableSchemaColumns -from dlt.common.destination.reference import WithStateSync, Destination +from dlt.common.destination.reference import WithStateSync, Destination, StateInfo from dlt.common.versioned_state import ( generate_state_version_hash, bump_state_version_if_modified, @@ -14,7 +14,7 @@ decompress_state, ) from dlt.common.pipeline import TPipelineState - +from dlt.common.storages.load_package import TPipelineStateDoc from dlt.extract import DltResource from dlt.pipeline.exceptions import ( @@ -94,11 +94,11 @@ def migrate_pipeline_state( return cast(TPipelineState, state) -def state_doc(state: TPipelineState) -> DictStrAny: +def state_doc(state: TPipelineState, load_id: str = None) -> TPipelineStateDoc: state = copy(state) state.pop("_local") state_str = compress_state(state) - doc = { + doc: TPipelineStateDoc = { "version": state["_state_version"], "engine_version": state["_state_engine_version"], "pipeline_name": state["pipeline_name"], @@ -106,13 +106,17 @@ def state_doc(state: TPipelineState) -> DictStrAny: "created_at": pendulum.now(), "version_hash": state["_version_hash"], } + if load_id: + doc["dlt_load_id"] = load_id return doc def state_resource(state: TPipelineState) -> DltResource: - doc = dlt.mark.with_package_state(state_doc(state), LOAD_PACKAGE_STATE_KEY) return dlt.resource( - [doc], name=STATE_TABLE_NAME, write_disposition="append", columns=STATE_TABLE_COLUMNS + [state_doc(state)], + name=STATE_TABLE_NAME, + write_disposition="append", + columns=STATE_TABLE_COLUMNS, ) diff --git a/tests/load/filesystem/test_filesystem_client.py b/tests/load/filesystem/test_filesystem_client.py index 65dd947367..41b98bdec3 100644 --- a/tests/load/filesystem/test_filesystem_client.py +++ b/tests/load/filesystem/test_filesystem_client.py @@ -9,6 +9,7 @@ from dlt.destinations.impl.filesystem.filesystem import ( FilesystemDestinationClientConfiguration, + INIT_FILE_NAME, ) from dlt.destinations.path_utils import create_path, prepare_datetime_params @@ -156,10 +157,10 @@ def test_replace_write_disposition(layout: str, default_buckets_env: str) -> Non client.dataset_path, detail=False, refresh=True ): # remove internal paths - if "_dlt" in basedir or "init" in basedir: + if "_dlt" in basedir: continue for f in files: - if f == "init": + if f == INIT_FILE_NAME: continue paths.append(posixpath.join(basedir, f)) @@ -222,7 +223,7 @@ def test_append_write_disposition(layout: str, default_buckets_env: str) -> None if "_dlt" in basedir: continue for f in files: - if f == "init": + if f == INIT_FILE_NAME: continue paths.append(posixpath.join(basedir, f)) assert list(sorted(paths)) == expected_files diff --git a/tests/load/pipeline/test_filesystem_pipeline.py b/tests/load/pipeline/test_filesystem_pipeline.py index 37afbcb07a..20f326b160 100644 --- a/tests/load/pipeline/test_filesystem_pipeline.py +++ b/tests/load/pipeline/test_filesystem_pipeline.py @@ -19,8 +19,11 @@ from tests.common.utils import load_json_case from tests.utils import ALL_TEST_DATA_ITEM_FORMATS, TestDataItemFormat, skip_if_not_active from dlt.destinations.path_utils import create_path -from tests.load.pipeline.utils import destinations_configs, DestinationTestConfiguration -from tests.load.pipeline.utils import load_table_counts +from tests.load.pipeline.utils import ( + destinations_configs, + DestinationTestConfiguration, + load_table_counts, +) skip_if_not_active("filesystem") From cd4dd2378beeae203808fe80057c53147837106b Mon Sep 17 00:00:00 2001 From: Dave Date: Wed, 17 Apr 2024 12:16:39 +0200 Subject: [PATCH 24/27] ensure pipeline state is only saved to load package if it has changed --- dlt/extract/extract.py | 7 ++++--- dlt/pipeline/pipeline.py | 16 ++++++++++------ dlt/pipeline/state_sync.py | 16 ++++++++++------ tests/load/pipeline/test_restore_state.py | 2 +- 4 files changed, 25 insertions(+), 16 deletions(-) diff --git a/dlt/extract/extract.py b/dlt/extract/extract.py index 159a5e7c23..d4298f2f6b 100644 --- a/dlt/extract/extract.py +++ b/dlt/extract/extract.py @@ -406,9 +406,10 @@ def commit_packages(self, pipline_state_doc: TPipelineStateDoc = None) -> None: """Commits all extracted packages to normalize storage, and adds the pipeline state to the load package""" # commit load packages for load_id, metrics in self._load_id_metrics.items(): - package_state = self.extract_storage.new_packages.get_load_package_state(load_id) - package_state["pipeline_state"] = pipline_state_doc - self.extract_storage.new_packages.save_load_package_state(load_id, package_state) + if pipline_state_doc: + package_state = self.extract_storage.new_packages.get_load_package_state(load_id) + package_state["pipeline_state"] = {**pipline_state_doc, "dlt_load_id": load_id} + self.extract_storage.new_packages.save_load_package_state(load_id, package_state) self.extract_storage.commit_new_load_package( load_id, self.schema_storage[metrics[0]["schema_name"]] ) diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index 2b6d5c7a85..1ba7896e1c 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -63,6 +63,7 @@ LoadJobInfo, LoadPackageInfo, ) +from dlt.common.storages.load_package import TPipelineStateDoc from dlt.common.destination import ( DestinationCapabilitiesContext, merge_caps_file_formats, @@ -428,13 +429,14 @@ def extract( raise SourceExhausted(source.name) self._extract_source(extract_step, source, max_parallel_items, workers) # extract state - state = None + state: TPipelineStateDoc = None if self.config.restore_from_destination: # this will update state version hash so it will not be extracted again by with_state_sync - state = self._container[StateInjectableContext].state - self._bump_version_and_extract_state(state, True, extract_step) + state = self._bump_version_and_extract_state( + self._container[StateInjectableContext].state, True, extract_step + ) # commit load packages with state - extract_step.commit_packages(state_doc(state) if state else None) + extract_step.commit_packages(state) return self._get_step_info(extract_step) except Exception as exc: # emit step info @@ -1513,7 +1515,7 @@ def _props_to_state(self, state: TPipelineState) -> TPipelineState: def _bump_version_and_extract_state( self, state: TPipelineState, extract_state: bool, extract: Extract = None - ) -> None: + ) -> TPipelineStateDoc: """Merges existing state into `state` and extracts state using `storage` if extract_state is True. Storage will be created on demand. In that case the extracted package will be immediately committed. @@ -1521,7 +1523,7 @@ def _bump_version_and_extract_state( _, hash_, _ = bump_pipeline_state_version_if_modified(self._props_to_state(state)) should_extract = hash_ != state["_local"].get("_last_extracted_hash") if should_extract and extract_state: - data = state_resource(state) + data, doc = state_resource(state) extract_ = extract or Extract( self._schema_storage, self._normalize_storage_config(), original_data=data ) @@ -1533,6 +1535,8 @@ def _bump_version_and_extract_state( # commit only if we created storage if not extract: extract_.commit_packages() + return doc + return None def _list_schemas_sorted(self) -> List[str]: """Lists schema names sorted to have deterministic state""" diff --git a/dlt/pipeline/state_sync.py b/dlt/pipeline/state_sync.py index e5d937d05a..41009f2909 100644 --- a/dlt/pipeline/state_sync.py +++ b/dlt/pipeline/state_sync.py @@ -111,12 +111,16 @@ def state_doc(state: TPipelineState, load_id: str = None) -> TPipelineStateDoc: return doc -def state_resource(state: TPipelineState) -> DltResource: - return dlt.resource( - [state_doc(state)], - name=STATE_TABLE_NAME, - write_disposition="append", - columns=STATE_TABLE_COLUMNS, +def state_resource(state: TPipelineState) -> Tuple[DltResource, TPipelineStateDoc]: + doc = state_doc(state) + return ( + dlt.resource( + [doc], + name=STATE_TABLE_NAME, + write_disposition="append", + columns=STATE_TABLE_COLUMNS, + ), + doc, ) diff --git a/tests/load/pipeline/test_restore_state.py b/tests/load/pipeline/test_restore_state.py index ffd921f8a8..6518ca46ae 100644 --- a/tests/load/pipeline/test_restore_state.py +++ b/tests/load/pipeline/test_restore_state.py @@ -76,7 +76,7 @@ def test_restore_state_utils(destination_config: DestinationTestConfiguration) - initial_state["_local"]["_last_extracted_at"] = pendulum.now() initial_state["_local"]["_last_extracted_hash"] = initial_state["_version_hash"] # add _dlt_id and _dlt_load_id - resource = state_resource(initial_state) + resource, _ = state_resource(initial_state) resource.apply_hints( columns={ "_dlt_id": {"name": "_dlt_id", "data_type": "text", "nullable": False}, From c8b3429538c4aae162fd12ffd24556c4680c59b5 Mon Sep 17 00:00:00 2001 From: Dave Date: Wed, 17 Apr 2024 13:30:18 +0200 Subject: [PATCH 25/27] adds missing state injection into state package --- dlt/pipeline/pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index 1ba7896e1c..e1821a9ac8 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -1534,7 +1534,7 @@ def _bump_version_and_extract_state( mark_state_extracted(state, hash_) # commit only if we created storage if not extract: - extract_.commit_packages() + extract_.commit_packages(doc) return doc return None From 6522f87a5b19d4186acf815352c841a35eb0eca4 Mon Sep 17 00:00:00 2001 From: Dave Date: Wed, 17 Apr 2024 14:02:01 +0200 Subject: [PATCH 26/27] fix athena iceberg locations --- dlt/destinations/impl/athena/athena.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dlt/destinations/impl/athena/athena.py b/dlt/destinations/impl/athena/athena.py index 1beb249386..25152ec06f 100644 --- a/dlt/destinations/impl/athena/athena.py +++ b/dlt/destinations/impl/athena/athena.py @@ -367,7 +367,7 @@ def _get_table_update_sql( if is_iceberg: sql.append(f"""CREATE TABLE {qualified_table_name} ({columns}) - LOCATION '{location}' + LOCATION '{location.rstrip('/')}' TBLPROPERTIES ('table_type'='ICEBERG', 'format'='parquet');""") elif table_format == "jsonl": sql.append(f"""CREATE EXTERNAL TABLE {qualified_table_name} From de34a48e430df1e7d62a738695098e9761dff576 Mon Sep 17 00:00:00 2001 From: Dave Date: Wed, 17 Apr 2024 17:35:02 +0200 Subject: [PATCH 27/27] fix google drive filesystem with missing argument --- dlt/common/storages/fsspecs/google_drive.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dlt/common/storages/fsspecs/google_drive.py b/dlt/common/storages/fsspecs/google_drive.py index 1be862668c..258a8622d1 100644 --- a/dlt/common/storages/fsspecs/google_drive.py +++ b/dlt/common/storages/fsspecs/google_drive.py @@ -237,7 +237,7 @@ def export(self, path: str, mime_type: str) -> Any: fileId=file_id, mimeType=mime_type, supportsAllDrives=True ).execute() - def ls(self, path: str, detail: Optional[bool] = False) -> Any: + def ls(self, path: str, detail: Optional[bool] = False, refresh: Optional[bool] = False) -> Any: """List files in a directory. Args: