Skip to content

Commit

Permalink
Support compression codec for azure and local #1055
Browse files Browse the repository at this point in the history
Signed-off-by: Marcel Coetzee <[email protected]>
  • Loading branch information
Pipboyguy committed Apr 8, 2024
1 parent 5004b5a commit a646118
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 21 deletions.
2 changes: 2 additions & 0 deletions dlt/destinations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from dlt.destinations.impl.synapse.factory import synapse
from dlt.destinations.impl.databricks.factory import databricks
from dlt.destinations.impl.dremio.factory import dremio
from dlt.destinations.impl.clickhouse.factory import clickhouse


__all__ = [
Expand All @@ -32,5 +33,6 @@
"synapse",
"databricks",
"dremio",
"clickhouse",
"destination",
]
18 changes: 5 additions & 13 deletions dlt/destinations/impl/clickhouse/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from jinja2 import Template

import dlt
from dlt import config
from dlt.common.configuration.specs import (
CredentialsConfiguration,
AzureCredentialsWithoutDefaults,
Expand Down Expand Up @@ -167,20 +168,12 @@ def __init__(
file_path, "ClickHouse loader Only supports parquet and jsonl files."
)

# if not config.get("data_writer.disable_compression"):
# raise LoadJobTerminalException(
# file_path,
# "ClickHouse loader does not support gzip compressed files. Please disable"
# " compression in the data writer configuration:"
# " https://dlthub.com/docs/reference/performance#disabling-and-enabling-file-compression.",
# )

bucket_url = urlparse(bucket_path)
bucket_scheme = bucket_url.scheme

file_extension = cast(SUPPORTED_FILE_FORMATS, file_extension)
clickhouse_format: str = FILE_FORMAT_TO_TABLE_FUNCTION_MAPPING[file_extension]
# compression = "none" if config.get("data_writer.disable_compression") else "gz"
compression = "none" if config.get("data_writer.disable_compression") else "gz"

statement: str = ""

Expand All @@ -191,7 +184,6 @@ def __init__(
access_key_id = staging_credentials.aws_access_key_id
secret_access_key = staging_credentials.aws_secret_access_key
elif isinstance(staging_credentials, GcpCredentials):
# TODO: HMAC keys aren't implemented in `GcpCredentials`.
access_key_id = dlt.config["destination.filesystem.credentials.gcp_access_key_id"]
secret_access_key = dlt.config[
"destination.filesystem.credentials.gcp_secret_access_key"
Expand Down Expand Up @@ -233,7 +225,7 @@ def __init__(

table_function = (
"SELECT * FROM"
f" azureBlobStorage('{storage_account_url}','{container_name}','{blobpath}','{account_name}','{account_key}','{clickhouse_format}')"
f" azureBlobStorage('{storage_account_url}','{container_name}','{blobpath}','{account_name}','{account_key}','{clickhouse_format}','{compression}')"
)
statement = f"INSERT INTO {qualified_table_name} {table_function}"
elif not bucket_path:
Expand All @@ -255,7 +247,9 @@ def __init__(
settings={
"allow_experimental_lightweight_delete": 1,
"allow_experimental_object_type": 1,
"enable_http_compression": 1,
},
compression=None if compression == "none" else compression,
)
except clickhouse_connect.driver.exceptions.Error as e:
raise LoadJobTerminalException(
Expand Down Expand Up @@ -363,8 +357,6 @@ def _get_table_update_sql(
else:
sql[0] += "\nPRIMARY KEY tuple()"

# TODO: Apply sort order and cluster key hints.

return sql

def get_storage_table(self, table_name: str) -> Tuple[bool, TTableSchemaColumns]:
Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/clickhouse/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ def execute_query(
query = (
"set allow_experimental_lightweight_delete = 1;"
"set allow_experimental_object_type = 1;"
"set enable_http_compression= 1;"
f"{query}"
)
with self._conn.cursor() as cursor:
Expand Down
28 changes: 20 additions & 8 deletions tests/load/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,8 @@ def destinations_configs(
destination_configs += [
DestinationTestConfiguration(destination=destination)
for destination in SQL_DESTINATIONS
if destination not in ("athena", "mssql", "synapse", "databricks", "clickhouse")
if destination not in ("athena", "mssql", "synapse", "databricks", "dremio")
if destination
not in ("athena", "mssql", "synapse", "databricks", "dremio", "clickhouse")
]
destination_configs += [
DestinationTestConfiguration(destination="duckdb", file_format="parquet")
Expand Down Expand Up @@ -205,7 +205,17 @@ def destinations_configs(
DestinationTestConfiguration(
destination="clickhouse",
file_format="jsonl",
disable_compression=True,
)
]
destination_configs += [
DestinationTestConfiguration(
destination="clickhouse",
file_format="parquet",
)
]
destination_configs += [
DestinationTestConfiguration(
destination="clickhouse", file_format="parquet", disable_compression=True
)
]
destination_configs += [
Expand Down Expand Up @@ -326,47 +336,49 @@ def destinations_configs(
file_format="parquet",
bucket_url=GCS_BUCKET,
extra_info="gcs-authorization",
disable_compression=True,
),
DestinationTestConfiguration(
destination="clickhouse",
staging="filesystem",
file_format="parquet",
bucket_url=AWS_BUCKET,
extra_info="s3-authorization",
disable_compression=True,
),
DestinationTestConfiguration(
destination="clickhouse",
staging="filesystem",
file_format="parquet",
bucket_url=AZ_BUCKET,
extra_info="az-authorization",
disable_compression=True,
),
DestinationTestConfiguration(
destination="clickhouse",
staging="filesystem",
file_format="jsonl",
bucket_url=AZ_BUCKET,
extra_info="az-authorization",
disable_compression=True,
),
DestinationTestConfiguration(
destination="clickhouse",
staging="filesystem",
file_format="jsonl",
bucket_url=GCS_BUCKET,
extra_info="gcs-authorization",
disable_compression=True,
),
DestinationTestConfiguration(
destination="clickhouse",
staging="filesystem",
file_format="jsonl",
bucket_url=AWS_BUCKET,
extra_info="s3-authorization",
),
DestinationTestConfiguration(
destination="clickhouse",
staging="filesystem",
file_format="jsonl",
bucket_url=AWS_BUCKET,
disable_compression=True,
extra_info="s3-authorization",
),
DestinationTestConfiguration(
destination="dremio",
Expand Down
1 change: 1 addition & 0 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
"destination",
"synapse",
"databricks",
"clickhouse",
"dremio",
}
NON_SQL_DESTINATIONS = {"filesystem", "weaviate", "dummy", "motherduck", "qdrant", "destination"}
Expand Down

0 comments on commit a646118

Please sign in to comment.