Skip to content

Commit

Permalink
Dataset prefix and dataset-table seperator #1055
Browse files Browse the repository at this point in the history
Signed-off-by: Marcel Coetzee <[email protected]>
  • Loading branch information
Pipboyguy committed Apr 5, 2024
1 parent 939bd35 commit b554693
Show file tree
Hide file tree
Showing 6 changed files with 206 additions and 34 deletions.
4 changes: 2 additions & 2 deletions dlt/destinations/impl/clickhouse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
def capabilities() -> DestinationCapabilitiesContext:
caps = DestinationCapabilitiesContext()
# Clickhouse only supports loading from staged files on s3 for now.
caps.preferred_loader_file_format = "insert_values"
caps.supported_loader_file_formats = ["insert_values"]
caps.preferred_loader_file_format = "jsonl"
caps.supported_loader_file_formats = ["parquet", "jsonl"]
caps.preferred_staging_file_format = "jsonl"
caps.supported_staging_file_formats = ["parquet", "jsonl"]

Expand Down
34 changes: 24 additions & 10 deletions dlt/destinations/impl/clickhouse/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from typing import ClassVar, Optional, Dict, List, Sequence, cast, Tuple
from urllib.parse import urlparse

import clickhouse_connect
from clickhouse_connect.driver.tools import insert_file
from jinja2 import Template

import dlt
Expand Down Expand Up @@ -182,11 +184,11 @@ def __init__(
bucket_scheme = bucket_url.scheme

file_extension = cast(SUPPORTED_FILE_FORMATS, file_extension)
clickhouse_format = FILE_FORMAT_TO_TABLE_FUNCTION_MAPPING[file_extension]
clickhouse_format: str = FILE_FORMAT_TO_TABLE_FUNCTION_MAPPING[file_extension]
# compression = "none" if config.get("data_writer.disable_compression") else "gz"

table_function: str
table_function = ""
table_function: str = ""
statement: str = ""

if bucket_scheme in ("s3", "gs", "gcs"):
bucket_http_url = convert_storage_to_http_scheme(bucket_url)
Expand Down Expand Up @@ -217,6 +219,7 @@ def __init__(
secret_access_key=secret_access_key,
clickhouse_format=clickhouse_format,
).strip()
statement = f"INSERT INTO {qualified_table_name} {table_function}"

elif bucket_scheme in ("az", "abfs"):
if not isinstance(staging_credentials, AzureCredentialsWithoutDefaults):
Expand All @@ -238,22 +241,33 @@ def __init__(
"SELECT * FROM"
f" azureBlobStorage('{storage_account_url}','{container_name}','{blobpath}','{account_name}','{account_key}','{clickhouse_format}')"
)
statement = f"INSERT INTO {qualified_table_name} {table_function}"
elif not bucket_path:
# Local filesystem.
raise LoadJobTerminalException(
file_path,
"Cannot load from local file. Clickhouse does not support loading from local files."
" Configure staging with an s3, gcs or azure storage bucket.",
)
with clickhouse_connect.get_client(
host=client.credentials.host,
port=client.credentials.port,
database=client.credentials.database,
user_name=client.credentials.username,
password=client.credentials.password,
secure=bool(client.credentials.secure),
) as clickhouse_connect_client:
insert_file(
clickhouse_connect_client,
qualified_table_name,
file_path,
fmt=clickhouse_format,
database=client.database_name,
)
statement = ""
else:
raise LoadJobTerminalException(
file_path,
f"Clickhouse loader does not support '{bucket_scheme}' filesystem.",
)

print(table_function)
with client.begin_transaction():
client.execute_sql(f"""INSERT INTO {qualified_table_name} {table_function}""")
client.execute_sql(statement)

def state(self) -> TLoadJobState:
return "completed"
Expand Down
43 changes: 24 additions & 19 deletions dlt/destinations/impl/clickhouse/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from clickhouse_driver.dbapi import OperationalError # type: ignore[import-untyped]
from clickhouse_driver.dbapi.extras import DictCursor # type: ignore[import-untyped]

import dlt
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.destinations.exceptions import (
DatabaseUndefinedRelation,
Expand All @@ -34,6 +35,10 @@
TRANSACTIONS_UNSUPPORTED_WARNING_MESSAGE = (
"Clickhouse does not support transactions! Each statement is auto-committed separately."
)
DATASET_PREFIX = dlt.config["destination.clickhouse.credentials.dataset_prefix"] or "__"
DATASET_TABLE_SEPARATOR = (
dlt.config["destination.clickhouse.credentials.dataset_table_separator"] or "___"
)


class ClickhouseDBApiCursorImpl(DBApiCursorImpl):
Expand Down Expand Up @@ -146,28 +151,28 @@ def execute_query(
yield ClickhouseDBApiCursorImpl(cursor) # type: ignore[abstract]

def fully_qualified_dataset_name(self, escape: bool = True) -> str:
database_name = (
self.capabilities.escape_identifier(self.database_name)
if escape
else self.database_name
)
dataset_name = (
self.capabilities.escape_identifier(self.dataset_name) if escape else self.dataset_name
)
if escape:
database_name = self.capabilities.escape_identifier(self.database_name)
dataset_name = self.capabilities.escape_identifier(
f"{DATASET_PREFIX}{self.dataset_name}"
)
else:
database_name = self.database_name
dataset_name = f"{DATASET_PREFIX}{self.dataset_name}"
return f"{database_name}.{dataset_name}"

def make_qualified_table_name(self, table_name: str, escape: bool = True) -> str:
database_name = (
self.capabilities.escape_identifier(self.database_name)
if escape
else self.database_name
)
dataset_table_name = (
self.capabilities.escape_identifier(f"{self.dataset_name}_{table_name}")
if escape
else f"{self.dataset_name}_{table_name}"
)
return f"{database_name}.{dataset_table_name}"
if escape:
database_name = self.capabilities.escape_identifier(self.database_name)
dataset_and_table = self.capabilities.escape_identifier(
f"{DATASET_PREFIX}{self.dataset_name}{DATASET_TABLE_SEPARATOR}{table_name}"
)
else:
database_name = self.database_name
dataset_and_table = (
f"{DATASET_PREFIX}{self.dataset_name}{DATASET_TABLE_SEPARATOR}{table_name}"
)
return f"{database_name}.{dataset_and_table}"

@classmethod
def _make_database_exception(cls, ex: Exception) -> Exception: # type: ignore[return]
Expand Down
Loading

0 comments on commit b554693

Please sign in to comment.