Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filesystem state sync #1184

Merged
merged 30 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
0369496
clean some stuff
sh-rp Apr 3, 2024
9a87f0f
first messy version of filesystem state sync
sh-rp Apr 3, 2024
f6d5c9c
clean up a bit
sh-rp Apr 3, 2024
d58a38b
fix bug in state sync
sh-rp Apr 4, 2024
2913c33
enable state tests for all bucket providers
sh-rp Apr 4, 2024
e32ad95
do not store state to uninitialized dataset folders
sh-rp Apr 4, 2024
cd21ff6
fix linter errors
sh-rp Apr 4, 2024
6b7c16d
get current pipeline from pipeline context
sh-rp Apr 4, 2024
95cc882
fix bug in filesystem table init
sh-rp Apr 4, 2024
b5eb47d
Merge branch 'devel' into d#/filesystem_state_sync
sh-rp Apr 15, 2024
15ac9bf
update testing pipe
sh-rp Apr 15, 2024
a6ce1b1
move away from "current" file, rather iterator bucket path contents
sh-rp Apr 15, 2024
bce2837
store pipeline state in load package state and send to filesystem des…
sh-rp Apr 15, 2024
40f1f3e
fix tests for changed number of files in filesystem destination
sh-rp Apr 15, 2024
5e8c233
remove dev code
sh-rp Apr 15, 2024
e7e0192
create init file also to mark datasets
sh-rp Apr 15, 2024
7cd51b4
fix tests to respect new init file
sh-rp Apr 16, 2024
c406600
update filesystem docs
sh-rp Apr 16, 2024
0c52fcd
Merge branch 'devel' into d#/filesystem_state_sync
sh-rp Apr 16, 2024
f0635b2
fix incoming tests of placeholders
sh-rp Apr 16, 2024
bdaf094
small fixes
sh-rp Apr 16, 2024
a09f896
adds some tests for filesystem state
sh-rp Apr 16, 2024
fce47c6
fix test helper
sh-rp Apr 16, 2024
0d5423c
save schema with timestamp instead of load_id
sh-rp Apr 16, 2024
b2b5913
pr fixes and move pipeline state saving to committing of extracted pa…
sh-rp Apr 17, 2024
cd4dd23
ensure pipeline state is only saved to load package if it has changed
sh-rp Apr 17, 2024
c8b3429
adds missing state injection into state package
sh-rp Apr 17, 2024
6522f87
fix athena iceberg locations
sh-rp Apr 17, 2024
de34a48
fix google drive filesystem with missing argument
sh-rp Apr 17, 2024
abfc170
Merge branch 'devel' into d#/filesystem_state_sync
sh-rp Apr 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,14 +273,18 @@ def drop_storage(self) -> None:
pass

def update_stored_schema(
self, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None
self,
load_id: str = None,
only_tables: Iterable[str] = None,
expected_update: TSchemaTables = None,
) -> Optional[TSchemaTables]:
"""Updates storage to the current schema.

Implementations should not assume that `expected_update` is the exact difference between destination state and the self.schema. This is only the case if
destination has single writer and no other processes modify the schema.

Args:
load_id (str, optional): Load id during which the schema is updated
only_tables (Sequence[str], optional): Updates only listed tables. Defaults to None.
expected_update (TSchemaTables, optional): Update that is expected to be applied to the destination
Returns:
Expand Down Expand Up @@ -434,6 +438,7 @@ def get_stored_schema(self) -> Optional[StorageSchemaInfo]:

@abstractmethod
def get_stored_schema_by_hash(self, version_hash: str) -> StorageSchemaInfo:
"""retrieves the stored schema by hash"""
pass

@abstractmethod
Expand Down
7 changes: 5 additions & 2 deletions dlt/destinations/impl/destination/destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,12 @@ def drop_storage(self) -> None:
pass

def update_stored_schema(
self, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None
self,
load_id: str = None,
only_tables: Iterable[str] = None,
expected_update: TSchemaTables = None,
) -> Optional[TSchemaTables]:
return super().update_stored_schema(only_tables, expected_update)
return super().update_stored_schema(load_id, only_tables, expected_update)

def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob:
# skip internal tables and remove columns from schema if so configured
Expand Down
7 changes: 5 additions & 2 deletions dlt/destinations/impl/dummy/dummy.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,12 @@ def drop_storage(self) -> None:
pass

def update_stored_schema(
self, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None
self,
load_id: str = None,
only_tables: Iterable[str] = None,
expected_update: TSchemaTables = None,
) -> Optional[TSchemaTables]:
applied_update = super().update_stored_schema(only_tables, expected_update)
applied_update = super().update_stored_schema(load_id, only_tables, expected_update)
if self.config.fail_schema_update:
raise DestinationTransientException(
"Raise on schema update due to fail_schema_update config flag"
Expand Down
204 changes: 186 additions & 18 deletions dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import posixpath
import os
from types import TracebackType
from typing import ClassVar, List, Type, Iterable, Set, Iterator
from typing import ClassVar, List, Type, Iterable, Set, Iterator, Optional, Tuple
from fsspec import AbstractFileSystem
from contextlib import contextmanager
from dlt.common import json, pendulum
from dlt.common.typing import DictStrAny

import re

from dlt.common import logger
from dlt.common.schema import Schema, TSchemaTables, TTableSchema
Expand All @@ -16,15 +20,22 @@
JobClientBase,
FollowupJob,
WithStagingDataset,
WithStateSync,
StorageSchemaInfo,
StateInfo,
DoNothingJob,
)

from dlt.common.destination.exceptions import DestinationUndefinedEntity
from dlt.destinations.job_impl import EmptyLoadJob
from dlt.destinations.impl.filesystem import capabilities
from dlt.destinations.impl.filesystem.configuration import FilesystemDestinationClientConfiguration
from dlt.destinations.job_impl import NewReferenceJob
from dlt.destinations import path_utils


INIT_FILE_NAME = "init"
rudolfix marked this conversation as resolved.
Show resolved Hide resolved


class LoadFilesystemJob(LoadJob):
def __init__(
self,
Expand Down Expand Up @@ -87,7 +98,7 @@ def create_followup_jobs(self, final_state: TLoadJobState) -> List[NewLoadJob]:
return jobs


class FilesystemClient(JobClientBase, WithStagingDataset):
class FilesystemClient(JobClientBase, WithStagingDataset, WithStateSync):
"""filesystem client storing jobs in memory"""

capabilities: ClassVar[DestinationCapabilitiesContext] = capabilities()
Expand Down Expand Up @@ -168,30 +179,49 @@ def initialize_storage(self, truncate_tables: Iterable[str] = None) -> None:
)

def update_stored_schema(
self, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None
self,
load_id: str = None,
only_tables: Iterable[str] = None,
expected_update: TSchemaTables = None,
) -> TSchemaTables:
# create destination dirs for all tables
dirs_to_create = self._get_table_dirs(only_tables or self.schema.tables.keys())
for directory in dirs_to_create:
table_names = only_tables or self.schema.tables.keys()
dirs_to_create = self._get_table_dirs(table_names)
for tables_name, directory in zip(table_names, dirs_to_create):
self.fs_client.makedirs(directory, exist_ok=True)
# we need to mark the folders of the data tables as initialized
if tables_name in self.schema.dlt_table_names():
self.fs_client.touch(posixpath.join(directory, INIT_FILE_NAME))

# write schema to destination
self.store_current_schema(load_id or "1")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When "sync_destination" is called, we are not inside the context of a load. I am not quite sure how to handle this case. I first just did not store the schema, but there is a test that verifies that there is a schema in the destination after "sync_destination" is called on a pipeline with nothing in the versions folder. Either we change the the tests or think of some default value. I am not sure..

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also find the newest load_id for this schema present and increase it by one, but that also does not feel right..

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or we use a current timestamp, then it should be in line with the other destinations.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i have changed it to be like this though. For lineage purposes it would be interesting to also have the load_id (if available) in the file/table, but for now it is inline with the other destinations


return expected_update

def _get_table_dirs(self, table_names: Iterable[str]) -> Set[str]:
def _get_table_dirs(self, table_names: Iterable[str]) -> List[str]:
"""Gets unique directories where table data is stored."""
table_dirs: Set[str] = set()
table_dirs: List[str] = []
rudolfix marked this conversation as resolved.
Show resolved Hide resolved
for table_name in table_names:
table_prefix = self.table_prefix_layout.format(
schema_name=self.schema.name, table_name=table_name
)
# dlt tables do not respect layout (for now)
if table_name in self.schema.dlt_table_names():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is totally fine and should stay like that (we need to document this)

table_prefix = posixpath.join(table_name, "")
else:
table_prefix = self.table_prefix_layout.format(
schema_name=self.schema.name, table_name=table_name
)
destination_dir = posixpath.join(self.dataset_path, table_prefix)
# extract the path component
table_dirs.add(os.path.dirname(destination_dir))
table_dirs.append(posixpath.dirname(destination_dir))
return table_dirs

def is_storage_initialized(self) -> bool:
return self.fs_client.isdir(self.dataset_path) # type: ignore[no-any-return]

def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob:
# skip the state table, we create a jsonl file in the complete_load step
if table["name"] == self.schema.state_table_name:
return DoNothingJob(file_path)

cls = FollowupFilesystemJob if self.config.as_staging else LoadFilesystemJob
return cls(
file_path,
Expand All @@ -204,12 +234,6 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) ->
def restore_file_load(self, file_path: str) -> LoadJob:
return EmptyLoadJob.from_file_path(file_path, "completed")

def complete_load(self, load_id: str) -> None:
schema_name = self.schema.name
table_name = self.schema.loads_table_name
file_name = f"{schema_name}.{table_name}.{load_id}"
self.fs_client.touch(posixpath.join(self.dataset_path, file_name))

def __enter__(self) -> "FilesystemClient":
return self

Expand All @@ -220,3 +244,147 @@ def __exit__(

def should_load_data_to_staging_dataset(self, table: TTableSchema) -> bool:
return False

#
# state stuff
#

def _write_to_json_file(self, filepath: str, data: DictStrAny) -> None:
dirname = posixpath.dirname(filepath)
if not self.fs_client.isdir(dirname):
rudolfix marked this conversation as resolved.
Show resolved Hide resolved
return
self.fs_client.write_text(filepath, json.dumps(data), "utf-8")

def _to_path_safe_string(self, s: str) -> str:
return "".join([c for c in s if re.match(r"\w", c)]) if s else None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we instead use c in strings.ascii_letters?

In [2]: import string
In [3]: string.ascii_letters
Out[3]: 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed it to a hex string now


def _list_dlt_dir(self, dirname: str) -> Iterator[Tuple[str, List[str]]]:
if not self.fs_client.exists(posixpath.join(dirname, INIT_FILE_NAME)):
raise DestinationUndefinedEntity({"dir": dirname})
for filepath in self.fs_client.listdir(dirname, detail=False):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

listir is just an alias to fs_client.ls or is it implementing some additional behavior?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when listing directories always request a refresh!

all_files = self.fs_client.ls(truncate_dir, detail=False, refresh=True)

IMO this is the only command you should use. fsspec is a mess and this one is proven to work. please replace all commands that list

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, done!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we could have our own fsclient wrapper that only exposes stuff we think is reliable..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on having our own abstraction with things we really need

filename = os.path.splitext(os.path.basename(filepath))[0]
fileparts = filename.split("__")
if len(fileparts) != 3:
continue
yield filepath, fileparts

def complete_load(self, load_id: str) -> None:
# store current state
self.store_current_state(load_id)

# write entry to load "table"
# TODO: this is also duplicate across all destinations. DRY this.
load_data = {
"load_id": load_id,
"schema_name": self.schema.name,
"status": 0,
"inserted_at": pendulum.now().isoformat(),
"schema_version_hash": self.schema.version_hash,
}
filepath = f"{self.dataset_path}/{self.schema.loads_table_name}/{self.schema.name}__{load_id}.jsonl"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use path.join instead?


self._write_to_json_file(filepath, load_data)

#
# state read/write
#

def _get_state_file_name(self, pipeline_name: str, version_hash: str, load_id: str) -> str:
"""gets full path for schema file for a given hash"""
return f"{self.dataset_path}/{self.schema.state_table_name}/{pipeline_name}__{load_id}__{self._to_path_safe_string(version_hash)}.jsonl"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use path.join here as well?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is only use to "encode" hash. why not convert hash to hex? still not perfect but less hacky


def store_current_state(self, load_id: str) -> None:
# get state doc from current pipeline
rudolfix marked this conversation as resolved.
Show resolved Hide resolved
from dlt.common.configuration.container import Container
from dlt.common.pipeline import PipelineContext
from dlt.pipeline.state_sync import state_doc

pipeline = Container()[PipelineContext].pipeline()
state = pipeline.state
doc = state_doc(state)

# get paths
hash_path = self._get_state_file_name(
pipeline.pipeline_name, self.schema.stored_version_hash, load_id
)

# write
self._write_to_json_file(hash_path, doc)

def get_stored_state(self, pipeline_name: str) -> Optional[StateInfo]:
# get base dir
dirname = posixpath.dirname(self._get_state_file_name(pipeline_name, "", ""))

# search newest state
selected_path = None
newest_load_id = "0"
for filepath, fileparts in self._list_dlt_dir(dirname):
if fileparts[0] == pipeline_name and fileparts[1] > newest_load_id:
newest_load_id = fileparts[1]
selected_path = filepath

"""Loads compressed state from destination storage"""
if selected_path:
state_json = json.loads(self.fs_client.read_text(selected_path))
state_json.pop("version_hash")
return StateInfo(**state_json)

return None

#
# Schema read/write
#

def _get_schema_file_name(self, version_hash: str, load_id: str) -> str:
"""gets full path for schema file for a given hash"""
return f"{self.dataset_path}/{self.schema.version_table_name}/{self.schema.name}__{load_id}__{self._to_path_safe_string(version_hash)}.jsonl"

def get_stored_schema(self) -> Optional[StorageSchemaInfo]:
"""Retrieves newest schema from destination storage"""
return self._get_stored_schema_by_hash_or_newest()

def get_stored_schema_by_hash(self, version_hash: str) -> Optional[StorageSchemaInfo]:
return self._get_stored_schema_by_hash_or_newest(version_hash)

def _get_stored_schema_by_hash_or_newest(
self, version_hash: str = None
) -> Optional[StorageSchemaInfo]:
"""Get the schema by supplied hash, falls back to getting the newest version matching the existing schema name"""
version_hash = self._to_path_safe_string(version_hash)
dirname = posixpath.dirname(self._get_schema_file_name("", ""))
# find newest schema for pipeline or by version hash
selected_path = None
newest_load_id = "0"
for filepath, fileparts in self._list_dlt_dir(dirname):
if (
not version_hash
and fileparts[0] == self.schema.name
and fileparts[1] > newest_load_id
):
newest_load_id = fileparts[1]
selected_path = filepath
elif fileparts[2] == version_hash:
selected_path = filepath
break

if selected_path:
return StorageSchemaInfo(**json.loads(self.fs_client.read_text(selected_path)))

return None

def store_current_schema(self, load_id: str) -> None:
# get paths
hash_path = self._get_schema_file_name(self.schema.stored_version_hash, load_id)

# TODO: duplicate of weaviate implementation, should be abstracted out
version_info = {
rudolfix marked this conversation as resolved.
Show resolved Hide resolved
"version_hash": self.schema.stored_version_hash,
"schema_name": self.schema.name,
"version": self.schema.version,
"engine_version": self.schema.ENGINE_VERSION,
"inserted_at": pendulum.now(),
"schema": json.dumps(self.schema.to_dict()),
}

# we always keep tabs on what the current schema is
self._write_to_json_file(hash_path, version_info)
7 changes: 5 additions & 2 deletions dlt/destinations/impl/qdrant/qdrant_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,9 +283,12 @@ def _delete_sentinel_collection(self) -> None:
self.db_client.delete_collection(self.sentinel_collection)

def update_stored_schema(
self, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None
self,
load_id: str = None,
only_tables: Iterable[str] = None,
expected_update: TSchemaTables = None,
) -> Optional[TSchemaTables]:
super().update_stored_schema(only_tables, expected_update)
super().update_stored_schema(load_id, only_tables, expected_update)
applied_update: TSchemaTables = {}
schema_info = self.get_stored_schema_by_hash(self.schema.stored_version_hash)
if schema_info is None:
Expand Down
18 changes: 5 additions & 13 deletions dlt/destinations/impl/weaviate/weaviate_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,9 +424,12 @@ def _delete_sentinel_class(self) -> None:

@wrap_weaviate_error
def update_stored_schema(
self, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None
self,
load_id: str = None,
only_tables: Iterable[str] = None,
expected_update: TSchemaTables = None,
) -> Optional[TSchemaTables]:
super().update_stored_schema(only_tables, expected_update)
super().update_stored_schema(load_id, only_tables, expected_update)
# Retrieve the schema from Weaviate
applied_update: TSchemaTables = {}
try:
Expand Down Expand Up @@ -524,17 +527,6 @@ def get_stored_state(self, pipeline_name: str) -> Optional[StateInfo]:
state["dlt_load_id"] = state.pop("_dlt_load_id")
return StateInfo(**state)

# def get_stored_states(self, state_table: str) -> List[StateInfo]:
# state_records = self.get_records(state_table,
# sort={
# "path": ["created_at"],
# "order": "desc"
# }, properties=self.state_properties)

# for state in state_records:
# state["dlt_load_id"] = state.pop("_dlt_load_id")
# return [StateInfo(**state) for state in state_records]

def get_stored_schema(self) -> Optional[StorageSchemaInfo]:
"""Retrieves newest schema from destination storage"""
try:
Expand Down
Loading
Loading