Skip to content

Commit

Permalink
Improve Clickhouse loader code and update comments #1055
Browse files Browse the repository at this point in the history
Signed-off-by: Marcel Coetzee <[email protected]>
  • Loading branch information
Pipboyguy committed Mar 17, 2024
1 parent 0af45b9 commit de66efa
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 37 deletions.
24 changes: 12 additions & 12 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,22 +201,22 @@ def __init__(


class LoadJob:
"""Represents a job that loads a single file
"""Represents a job that loads a single file.
Each job starts in "running" state and ends in one of terminal states: "retry", "failed" or "completed".
Each job is uniquely identified by a file name. The file is guaranteed to exist in "running" state. In terminal state, the file may not be present.
In "running" state, the loader component periodically gets the state via `status()` method. When terminal state is reached, load job is discarded and not called again.
Each job starts in "running" state and ends in one of the terminal states: "retry", "failed" or "completed".
A filename uniquely identifies each job. The file is guaranteed to exist in "running" state. In terminal state, the file may not be present.
In "running" state, the loader component periodically gets the state via `status()` method. When terminal state is reached, a load job is discarded and not called again.
`exception` method is called to get error information in "failed" and "retry" states.
The `__init__` method is responsible to put the Job in "running" state. It may raise `LoadClientTerminalException` and `LoadClientTransientException` to
immediately transition job into "failed" or "retry" state respectively.
immediately transition a job into "failed" or "retry" state respectively.
"""

def __init__(self, file_name: str) -> None:
"""
File name is also a job id (or job id is deterministically derived) so it must be globally unique
Filename is a job ID (or job ID is deterministically derived), so it must be globally unique.
"""
# ensure file name
# Ensure filename.
assert file_name == FileStorage.get_file_name_from_file_path(file_name)
self._file_name = file_name
self._parsed_file_name = ParsedLoadJobFileName.parse(file_name)
Expand All @@ -231,15 +231,15 @@ def file_name(self) -> str:
return self._file_name

def job_id(self) -> str:
"""The job id that is derived from the file name and does not changes during job lifecycle"""
"""The job ID that is derived from the filename and does not change during job lifecycle."""
return self._parsed_file_name.job_id()

def job_file_info(self) -> ParsedLoadJobFileName:
return self._parsed_file_name

@abstractmethod
def exception(self) -> str:
"""The exception associated with failed or retry states"""
"""The exception associated with failed or retry states."""
pass


Expand All @@ -248,15 +248,15 @@ class NewLoadJob(LoadJob):

@abstractmethod
def new_file_path(self) -> str:
"""Path to a newly created temporary job file. If empty, no followup job should be created"""
"""Path to a newly created temporary job file. If empty, no followup job should be created."""
pass


class FollowupJob:
"""Adds a trait that allows to create a followup job"""
"""Adds a trait that allows to create a followup job."""

def create_followup_jobs(self, final_state: TLoadJobState) -> List[NewLoadJob]:
"""Return list of new jobs. `final_state` is state to which this job transits"""
"""Return list of new jobs. `final_state` is state to which this job transits."""
return []


Expand Down
2 changes: 1 addition & 1 deletion dlt/destinations/impl/clickhouse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ def capabilities() -> DestinationCapabilitiesContext:
caps = DestinationCapabilitiesContext()

caps.preferred_loader_file_format = "jsonl"
caps.supported_loader_file_formats = ["jsonl", "parquet", "arrow", "insert_values"]
caps.supported_loader_file_formats = ["jsonl", "parquet", "arrow"]
caps.preferred_staging_file_format = "jsonl"
caps.supported_staging_file_formats = ["jsonl", "parquet", "arrow"]

Expand Down
137 changes: 117 additions & 20 deletions dlt/destinations/impl/clickhouse/clickhouse.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,37 @@
from copy import deepcopy
from typing import ClassVar, Optional, Dict, List, Sequence
from urllib.parse import urlparse, urlunparse

from dlt.common.configuration.specs import (
CredentialsConfiguration,
AwsCredentialsWithoutDefaults,
AzureCredentialsWithoutDefaults,
)
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.destination.reference import SupportsStagingDestination, TLoadJobState
from dlt.common.destination.reference import (
SupportsStagingDestination,
TLoadJobState,
FollowupJob,
LoadJob,
)
from dlt.common.schema import Schema, TColumnSchema
from dlt.common.schema.typing import TTableFormat, TTableSchema, TColumnHint, TColumnType
from dlt.common.storages import FileStorage
from dlt.destinations.exceptions import LoadJobTerminalException
from dlt.destinations.impl.clickhouse import capabilities
from dlt.destinations.impl.clickhouse.clickhouse_adapter import (
TTableEngineType,
TABLE_ENGINE_TYPE_HINT,
)
from dlt.destinations.impl.clickhouse.configuration import (
ClickhouseClientConfiguration,
ClickhouseCredentials,
)
from dlt.destinations.impl.clickhouse.sql_client import ClickhouseSqlClient
from dlt.destinations.job_client_impl import (
SqlJobClientWithStaging,
CopyRemoteFileLoadJob,
SqlJobClientBase,
)
from dlt.destinations.sql_jobs import SqlMergeJob
from dlt.destinations.job_impl import NewReferenceJob, EmptyLoadJob
from dlt.destinations.type_mapping import TypeMapper


Expand Down Expand Up @@ -75,25 +86,97 @@ def from_db_type(
return super().from_db_type(db_type, precision, scale)


class ClickhouseCopyFileLoadJob(CopyRemoteFileLoadJob):
class ClickhouseLoadJob(LoadJob, FollowupJob):
def __init__(
self,
table: TTableSchema,
file_path: str,
sql_client: ClickhouseSqlClient,
staging_credentials: Optional[ClickhouseCredentials] = None,
staging_iam_role: str = None,
table_name: str,
load_id: str,
client: ClickhouseSqlClient,
staging_credentials: Optional[CredentialsConfiguration] = None,
) -> None:
self._staging_iam_role = staging_iam_role
super().__init__(table, file_path, sql_client, staging_credentials)
file_name = FileStorage.get_file_name_from_file_path(file_path)
super().__init__(file_name)

def exception(self) -> str:
pass
qualified_table_name = client.make_qualified_table_name(table_name)

bucket_path: str = (
NewReferenceJob.resolve_reference(file_path)
if NewReferenceJob.is_reference_job(file_path)
else ""
)
file_name = (
FileStorage.get_file_name_from_file_path(bucket_path) if bucket_path else file_name
)
credentials_clause = ""
files_clause = ""

if bucket_path:
bucket_url = urlparse(bucket_path)
bucket_scheme = bucket_url.scheme
# Referencing an external s3/azure stage does not require explicit AWS credentials.
if (
bucket_scheme == "s3"
and staging_credentials
and isinstance(staging_credentials, AwsCredentialsWithoutDefaults)
):
credentials_clause = f"""CREDENTIALS=(AWS_KEY_ID='{staging_credentials.aws_access_key_id}' AWS_SECRET_KEY='{staging_credentials.aws_secret_access_key}')"""
from_clause = f"FROM '{bucket_path}'"
elif (
bucket_scheme in ["az", "abfs"]
and staging_credentials
and isinstance(staging_credentials, AzureCredentialsWithoutDefaults)
):
# Explicit azure credentials are needed to load from bucket without a named stage
credentials_clause = f"CREDENTIALS=(AZURE_SAS_TOKEN='?{staging_credentials.azure_storage_sas_token}')"
# Converts an az://<container_name>/<path> to azure://<storage_account_name>.blob.core.windows.net/<container_name>/<path> as required by Clickhouse.
_path = f"/{bucket_url.netloc}{bucket_url.path}"
bucket_path = urlunparse(
bucket_url._replace(
scheme="azure",
netloc=f"{staging_credentials.azure_storage_account_name}.blob.core.windows.net",
path=_path,
)
)
from_clause = f"FROM '{bucket_path}'"
else:
# Ensure that gcs bucket path starts with gcs://; this is a requirement of Clickhouse.
bucket_path = bucket_path.replace("gs://", "gcs://")
from_clause = f"FROM @{stage_name}/"
files_clause = f"FILES = ('{urlparse(bucket_path).path.lstrip('/')}')"
else:
# This means we have a local file.
if not stage_name:
# Use implicit table stage by default: "SCHEMA_NAME"."%TABLE_NAME".
stage_name = client.make_qualified_table_name(f"%{table_name}")
stage_file_path = f'@{stage_name}/"{load_id}"/{file_name}'
from_clause = f"FROM {stage_file_path}"

# Decide on source format, stage_file_path will either be a local file or a bucket path.
source_format = "( TYPE = 'JSON', BINARY_FORMAT = 'BASE64' )"
if file_name.endswith("parquet"):
source_format = "(TYPE = 'PARQUET', BINARY_AS_TEXT = FALSE)"

class ClickhouseMergeJob(SqlMergeJob):
def __init__(self, file_name: str, status: TLoadJobState):
super().__init__(file_name, status)
with client.begin_transaction():
# PUT and COPY in one transaction if local file, otherwise only copy.
if not bucket_path:
client.execute_sql(
f'PUT file://{file_path} @{stage_name}/"{load_id}" OVERWRITE = TRUE,'
" AUTO_COMPRESS = FALSE"
)
client.execute_sql(f"""COPY INTO {qualified_table_name}
{from_clause}
{files_clause}
{credentials_clause}
FILE_FORMAT = {source_format}
MATCH_BY_COLUMN_NAME='CASE_INSENSITIVE'
""")

def state(self) -> TLoadJobState:
return "completed"

def exception(self) -> str:
raise NotImplementedError()


class ClickhouseClient(SqlJobClientWithStaging, SupportsStagingDestination):
Expand All @@ -104,15 +187,26 @@ def __init__(
schema: Schema,
config: ClickhouseClientConfiguration,
) -> None:
self.config: ClickhouseClientConfiguration = config
# TODO: There are no schemas in Clickhouse. No point in having schemas, only dataset names and table names for example "dataset1_mytable".
self.sql_client = ClickhouseSqlClient(
self.config.normalize_dataset_name(self.schema), self.config.credentials
self.sql_client: ClickhouseSqlClient = ClickhouseSqlClient(
config.normalize_dataset_name(schema), config.credentials
)
super().__init__(schema, self.config, self.sql_client)
super().__init__(schema, config, self.sql_client)
self.config: ClickhouseClientConfiguration = config
self.active_hints = deepcopy(HINT_TO_CLICKHOUSE_ATTR) if self.config.create_indexes else {}
self.type_mapper = ClickhouseTypeMapper(self.capabilities)

def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob:
return super().start_file_load(table, file_path, load_id) or ClickhouseLoadJob(
file_path,
table["name"],
load_id,
self.sql_client,
staging_credentials=(
self.config.staging_config.credentials if self.config.staging_config else None
),
)

def _get_table_update_sql(
self, table_name: str, new_columns: Sequence[TColumnSchema], generate_alter: bool
) -> List[str]:
Expand Down Expand Up @@ -161,3 +255,6 @@ def _from_db_type(
self, ch_t: str, precision: Optional[int], scale: Optional[int]
) -> TColumnType:
return self.type_mapper.from_db_type(ch_t, precision, scale)

def restore_file_load(self, file_path: str) -> LoadJob:
return EmptyLoadJob.from_file_path(file_path, "completed")
6 changes: 4 additions & 2 deletions dlt/destinations/impl/clickhouse/configuration.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import ClassVar, List, Any, Final, TYPE_CHECKING
from typing import ClassVar, List, Any, Final, TYPE_CHECKING, Optional

from dlt.common.configuration import configspec
from dlt.common.configuration.specs import ConnectionStringCredentials
Expand Down Expand Up @@ -63,7 +63,7 @@ class ClickhouseClientConfiguration(DestinationClientDwhWithStagingConfiguration
# but they do not enforce uniqueness constraints. It permits duplicate values even for the primary key
# columns within the same granule.
# See: https://clickhouse.com/docs/en/optimize/sparse-primary-indexes
create_indexes: bool = False
create_indexes: bool = True
"""Whether `primary_key` column hint is applied. Note that Clickhouse has no unique constraint,
and primary keys don't guarantee uniqueness."""

Expand All @@ -83,6 +83,8 @@ def __init__(
credentials: ClickhouseCredentials = None,
dataset_name: str = None,
create_indexes: bool = False,
stage_name: str = None,
keep_staged_files: bool = True,
destination_name: str = None,
environment: str = None
) -> None:
Expand Down
4 changes: 2 additions & 2 deletions dlt/destinations/job_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class NewReferenceJob(NewLoadJobImpl):
def __init__(
self, file_name: str, status: TLoadJobState, exception: str = None, remote_path: str = None
) -> None:
file_name = os.path.splitext(file_name)[0] + ".reference"
file_name = f"{os.path.splitext(file_name)[0]}.reference"
super().__init__(file_name, status, exception)
self._remote_path = remote_path
self._save_text_file(remote_path)
Expand All @@ -58,5 +58,5 @@ def is_reference_job(file_path: str) -> bool:
@staticmethod
def resolve_reference(file_path: str) -> str:
with open(file_path, "r+", encoding="utf-8") as f:
# Reading from a file
# Reading from a file.
return f.read()

0 comments on commit de66efa

Please sign in to comment.