From de66efa91488071a07817dcb0c66b5477990126a Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Sun, 17 Mar 2024 23:33:37 +0200 Subject: [PATCH] Improve Clickhouse loader code and update comments #1055 Signed-off-by: Marcel Coetzee --- dlt/common/destination/reference.py | 24 +-- dlt/destinations/impl/clickhouse/__init__.py | 2 +- .../impl/clickhouse/clickhouse.py | 137 +++++++++++++++--- .../impl/clickhouse/configuration.py | 6 +- dlt/destinations/job_impl.py | 4 +- 5 files changed, 136 insertions(+), 37 deletions(-) diff --git a/dlt/common/destination/reference.py b/dlt/common/destination/reference.py index d89047a946..d564da567a 100644 --- a/dlt/common/destination/reference.py +++ b/dlt/common/destination/reference.py @@ -201,22 +201,22 @@ def __init__( class LoadJob: - """Represents a job that loads a single file + """Represents a job that loads a single file. - Each job starts in "running" state and ends in one of terminal states: "retry", "failed" or "completed". - Each job is uniquely identified by a file name. The file is guaranteed to exist in "running" state. In terminal state, the file may not be present. - In "running" state, the loader component periodically gets the state via `status()` method. When terminal state is reached, load job is discarded and not called again. + Each job starts in "running" state and ends in one of the terminal states: "retry", "failed" or "completed". + A filename uniquely identifies each job. The file is guaranteed to exist in "running" state. In terminal state, the file may not be present. + In "running" state, the loader component periodically gets the state via `status()` method. When terminal state is reached, a load job is discarded and not called again. `exception` method is called to get error information in "failed" and "retry" states. The `__init__` method is responsible to put the Job in "running" state. It may raise `LoadClientTerminalException` and `LoadClientTransientException` to - immediately transition job into "failed" or "retry" state respectively. + immediately transition a job into "failed" or "retry" state respectively. """ def __init__(self, file_name: str) -> None: """ - File name is also a job id (or job id is deterministically derived) so it must be globally unique + Filename is a job ID (or job ID is deterministically derived), so it must be globally unique. """ - # ensure file name + # Ensure filename. assert file_name == FileStorage.get_file_name_from_file_path(file_name) self._file_name = file_name self._parsed_file_name = ParsedLoadJobFileName.parse(file_name) @@ -231,7 +231,7 @@ def file_name(self) -> str: return self._file_name def job_id(self) -> str: - """The job id that is derived from the file name and does not changes during job lifecycle""" + """The job ID that is derived from the filename and does not change during job lifecycle.""" return self._parsed_file_name.job_id() def job_file_info(self) -> ParsedLoadJobFileName: @@ -239,7 +239,7 @@ def job_file_info(self) -> ParsedLoadJobFileName: @abstractmethod def exception(self) -> str: - """The exception associated with failed or retry states""" + """The exception associated with failed or retry states.""" pass @@ -248,15 +248,15 @@ class NewLoadJob(LoadJob): @abstractmethod def new_file_path(self) -> str: - """Path to a newly created temporary job file. If empty, no followup job should be created""" + """Path to a newly created temporary job file. If empty, no followup job should be created.""" pass class FollowupJob: - """Adds a trait that allows to create a followup job""" + """Adds a trait that allows to create a followup job.""" def create_followup_jobs(self, final_state: TLoadJobState) -> List[NewLoadJob]: - """Return list of new jobs. `final_state` is state to which this job transits""" + """Return list of new jobs. `final_state` is state to which this job transits.""" return [] diff --git a/dlt/destinations/impl/clickhouse/__init__.py b/dlt/destinations/impl/clickhouse/__init__.py index 8613a29cb2..88ea37d014 100644 --- a/dlt/destinations/impl/clickhouse/__init__.py +++ b/dlt/destinations/impl/clickhouse/__init__.py @@ -7,7 +7,7 @@ def capabilities() -> DestinationCapabilitiesContext: caps = DestinationCapabilitiesContext() caps.preferred_loader_file_format = "jsonl" - caps.supported_loader_file_formats = ["jsonl", "parquet", "arrow", "insert_values"] + caps.supported_loader_file_formats = ["jsonl", "parquet", "arrow"] caps.preferred_staging_file_format = "jsonl" caps.supported_staging_file_formats = ["jsonl", "parquet", "arrow"] diff --git a/dlt/destinations/impl/clickhouse/clickhouse.py b/dlt/destinations/impl/clickhouse/clickhouse.py index ee021bfe46..a4874e4e87 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse.py +++ b/dlt/destinations/impl/clickhouse/clickhouse.py @@ -1,10 +1,23 @@ from copy import deepcopy from typing import ClassVar, Optional, Dict, List, Sequence +from urllib.parse import urlparse, urlunparse +from dlt.common.configuration.specs import ( + CredentialsConfiguration, + AwsCredentialsWithoutDefaults, + AzureCredentialsWithoutDefaults, +) from dlt.common.destination import DestinationCapabilitiesContext -from dlt.common.destination.reference import SupportsStagingDestination, TLoadJobState +from dlt.common.destination.reference import ( + SupportsStagingDestination, + TLoadJobState, + FollowupJob, + LoadJob, +) from dlt.common.schema import Schema, TColumnSchema from dlt.common.schema.typing import TTableFormat, TTableSchema, TColumnHint, TColumnType +from dlt.common.storages import FileStorage +from dlt.destinations.exceptions import LoadJobTerminalException from dlt.destinations.impl.clickhouse import capabilities from dlt.destinations.impl.clickhouse.clickhouse_adapter import ( TTableEngineType, @@ -12,15 +25,13 @@ ) from dlt.destinations.impl.clickhouse.configuration import ( ClickhouseClientConfiguration, - ClickhouseCredentials, ) from dlt.destinations.impl.clickhouse.sql_client import ClickhouseSqlClient from dlt.destinations.job_client_impl import ( SqlJobClientWithStaging, - CopyRemoteFileLoadJob, SqlJobClientBase, ) -from dlt.destinations.sql_jobs import SqlMergeJob +from dlt.destinations.job_impl import NewReferenceJob, EmptyLoadJob from dlt.destinations.type_mapping import TypeMapper @@ -75,25 +86,97 @@ def from_db_type( return super().from_db_type(db_type, precision, scale) -class ClickhouseCopyFileLoadJob(CopyRemoteFileLoadJob): +class ClickhouseLoadJob(LoadJob, FollowupJob): def __init__( self, - table: TTableSchema, file_path: str, - sql_client: ClickhouseSqlClient, - staging_credentials: Optional[ClickhouseCredentials] = None, - staging_iam_role: str = None, + table_name: str, + load_id: str, + client: ClickhouseSqlClient, + staging_credentials: Optional[CredentialsConfiguration] = None, ) -> None: - self._staging_iam_role = staging_iam_role - super().__init__(table, file_path, sql_client, staging_credentials) + file_name = FileStorage.get_file_name_from_file_path(file_path) + super().__init__(file_name) - def exception(self) -> str: - pass + qualified_table_name = client.make_qualified_table_name(table_name) + + bucket_path: str = ( + NewReferenceJob.resolve_reference(file_path) + if NewReferenceJob.is_reference_job(file_path) + else "" + ) + file_name = ( + FileStorage.get_file_name_from_file_path(bucket_path) if bucket_path else file_name + ) + credentials_clause = "" + files_clause = "" + + if bucket_path: + bucket_url = urlparse(bucket_path) + bucket_scheme = bucket_url.scheme + # Referencing an external s3/azure stage does not require explicit AWS credentials. + if ( + bucket_scheme == "s3" + and staging_credentials + and isinstance(staging_credentials, AwsCredentialsWithoutDefaults) + ): + credentials_clause = f"""CREDENTIALS=(AWS_KEY_ID='{staging_credentials.aws_access_key_id}' AWS_SECRET_KEY='{staging_credentials.aws_secret_access_key}')""" + from_clause = f"FROM '{bucket_path}'" + elif ( + bucket_scheme in ["az", "abfs"] + and staging_credentials + and isinstance(staging_credentials, AzureCredentialsWithoutDefaults) + ): + # Explicit azure credentials are needed to load from bucket without a named stage + credentials_clause = f"CREDENTIALS=(AZURE_SAS_TOKEN='?{staging_credentials.azure_storage_sas_token}')" + # Converts an az:/// to azure://.blob.core.windows.net// as required by Clickhouse. + _path = f"/{bucket_url.netloc}{bucket_url.path}" + bucket_path = urlunparse( + bucket_url._replace( + scheme="azure", + netloc=f"{staging_credentials.azure_storage_account_name}.blob.core.windows.net", + path=_path, + ) + ) + from_clause = f"FROM '{bucket_path}'" + else: + # Ensure that gcs bucket path starts with gcs://; this is a requirement of Clickhouse. + bucket_path = bucket_path.replace("gs://", "gcs://") + from_clause = f"FROM @{stage_name}/" + files_clause = f"FILES = ('{urlparse(bucket_path).path.lstrip('/')}')" + else: + # This means we have a local file. + if not stage_name: + # Use implicit table stage by default: "SCHEMA_NAME"."%TABLE_NAME". + stage_name = client.make_qualified_table_name(f"%{table_name}") + stage_file_path = f'@{stage_name}/"{load_id}"/{file_name}' + from_clause = f"FROM {stage_file_path}" + # Decide on source format, stage_file_path will either be a local file or a bucket path. + source_format = "( TYPE = 'JSON', BINARY_FORMAT = 'BASE64' )" + if file_name.endswith("parquet"): + source_format = "(TYPE = 'PARQUET', BINARY_AS_TEXT = FALSE)" -class ClickhouseMergeJob(SqlMergeJob): - def __init__(self, file_name: str, status: TLoadJobState): - super().__init__(file_name, status) + with client.begin_transaction(): + # PUT and COPY in one transaction if local file, otherwise only copy. + if not bucket_path: + client.execute_sql( + f'PUT file://{file_path} @{stage_name}/"{load_id}" OVERWRITE = TRUE,' + " AUTO_COMPRESS = FALSE" + ) + client.execute_sql(f"""COPY INTO {qualified_table_name} + {from_clause} + {files_clause} + {credentials_clause} + FILE_FORMAT = {source_format} + MATCH_BY_COLUMN_NAME='CASE_INSENSITIVE' + """) + + def state(self) -> TLoadJobState: + return "completed" + + def exception(self) -> str: + raise NotImplementedError() class ClickhouseClient(SqlJobClientWithStaging, SupportsStagingDestination): @@ -104,15 +187,26 @@ def __init__( schema: Schema, config: ClickhouseClientConfiguration, ) -> None: - self.config: ClickhouseClientConfiguration = config # TODO: There are no schemas in Clickhouse. No point in having schemas, only dataset names and table names for example "dataset1_mytable". - self.sql_client = ClickhouseSqlClient( - self.config.normalize_dataset_name(self.schema), self.config.credentials + self.sql_client: ClickhouseSqlClient = ClickhouseSqlClient( + config.normalize_dataset_name(schema), config.credentials ) - super().__init__(schema, self.config, self.sql_client) + super().__init__(schema, config, self.sql_client) + self.config: ClickhouseClientConfiguration = config self.active_hints = deepcopy(HINT_TO_CLICKHOUSE_ATTR) if self.config.create_indexes else {} self.type_mapper = ClickhouseTypeMapper(self.capabilities) + def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob: + return super().start_file_load(table, file_path, load_id) or ClickhouseLoadJob( + file_path, + table["name"], + load_id, + self.sql_client, + staging_credentials=( + self.config.staging_config.credentials if self.config.staging_config else None + ), + ) + def _get_table_update_sql( self, table_name: str, new_columns: Sequence[TColumnSchema], generate_alter: bool ) -> List[str]: @@ -161,3 +255,6 @@ def _from_db_type( self, ch_t: str, precision: Optional[int], scale: Optional[int] ) -> TColumnType: return self.type_mapper.from_db_type(ch_t, precision, scale) + + def restore_file_load(self, file_path: str) -> LoadJob: + return EmptyLoadJob.from_file_path(file_path, "completed") diff --git a/dlt/destinations/impl/clickhouse/configuration.py b/dlt/destinations/impl/clickhouse/configuration.py index 359f08e47e..e8d01ba4b0 100644 --- a/dlt/destinations/impl/clickhouse/configuration.py +++ b/dlt/destinations/impl/clickhouse/configuration.py @@ -1,4 +1,4 @@ -from typing import ClassVar, List, Any, Final, TYPE_CHECKING +from typing import ClassVar, List, Any, Final, TYPE_CHECKING, Optional from dlt.common.configuration import configspec from dlt.common.configuration.specs import ConnectionStringCredentials @@ -63,7 +63,7 @@ class ClickhouseClientConfiguration(DestinationClientDwhWithStagingConfiguration # but they do not enforce uniqueness constraints. It permits duplicate values even for the primary key # columns within the same granule. # See: https://clickhouse.com/docs/en/optimize/sparse-primary-indexes - create_indexes: bool = False + create_indexes: bool = True """Whether `primary_key` column hint is applied. Note that Clickhouse has no unique constraint, and primary keys don't guarantee uniqueness.""" @@ -83,6 +83,8 @@ def __init__( credentials: ClickhouseCredentials = None, dataset_name: str = None, create_indexes: bool = False, + stage_name: str = None, + keep_staged_files: bool = True, destination_name: str = None, environment: str = None ) -> None: diff --git a/dlt/destinations/job_impl.py b/dlt/destinations/job_impl.py index 7a6b98544c..057f5d606d 100644 --- a/dlt/destinations/job_impl.py +++ b/dlt/destinations/job_impl.py @@ -46,7 +46,7 @@ class NewReferenceJob(NewLoadJobImpl): def __init__( self, file_name: str, status: TLoadJobState, exception: str = None, remote_path: str = None ) -> None: - file_name = os.path.splitext(file_name)[0] + ".reference" + file_name = f"{os.path.splitext(file_name)[0]}.reference" super().__init__(file_name, status, exception) self._remote_path = remote_path self._save_text_file(remote_path) @@ -58,5 +58,5 @@ def is_reference_job(file_path: str) -> bool: @staticmethod def resolve_reference(file_path: str) -> str: with open(file_path, "r+", encoding="utf-8") as f: - # Reading from a file + # Reading from a file. return f.read()