From a6461185605a04445603abd34f6c60826d098b71 Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Mon, 8 Apr 2024 22:02:45 +0200 Subject: [PATCH] Support compression codec for azure and local #1055 Signed-off-by: Marcel Coetzee --- dlt/destinations/__init__.py | 2 ++ .../impl/clickhouse/clickhouse.py | 18 ++++-------- .../impl/clickhouse/sql_client.py | 1 + tests/load/utils.py | 28 +++++++++++++------ tests/utils.py | 1 + 5 files changed, 29 insertions(+), 21 deletions(-) diff --git a/dlt/destinations/__init__.py b/dlt/destinations/__init__.py index 13b7f7ed99..302de24a6b 100644 --- a/dlt/destinations/__init__.py +++ b/dlt/destinations/__init__.py @@ -14,6 +14,7 @@ from dlt.destinations.impl.synapse.factory import synapse from dlt.destinations.impl.databricks.factory import databricks from dlt.destinations.impl.dremio.factory import dremio +from dlt.destinations.impl.clickhouse.factory import clickhouse __all__ = [ @@ -32,5 +33,6 @@ "synapse", "databricks", "dremio", + "clickhouse", "destination", ] diff --git a/dlt/destinations/impl/clickhouse/clickhouse.py b/dlt/destinations/impl/clickhouse/clickhouse.py index 20304bf284..c1bb9223c0 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse.py +++ b/dlt/destinations/impl/clickhouse/clickhouse.py @@ -9,6 +9,7 @@ from jinja2 import Template import dlt +from dlt import config from dlt.common.configuration.specs import ( CredentialsConfiguration, AzureCredentialsWithoutDefaults, @@ -167,20 +168,12 @@ def __init__( file_path, "ClickHouse loader Only supports parquet and jsonl files." ) - # if not config.get("data_writer.disable_compression"): - # raise LoadJobTerminalException( - # file_path, - # "ClickHouse loader does not support gzip compressed files. Please disable" - # " compression in the data writer configuration:" - # " https://dlthub.com/docs/reference/performance#disabling-and-enabling-file-compression.", - # ) - bucket_url = urlparse(bucket_path) bucket_scheme = bucket_url.scheme file_extension = cast(SUPPORTED_FILE_FORMATS, file_extension) clickhouse_format: str = FILE_FORMAT_TO_TABLE_FUNCTION_MAPPING[file_extension] - # compression = "none" if config.get("data_writer.disable_compression") else "gz" + compression = "none" if config.get("data_writer.disable_compression") else "gz" statement: str = "" @@ -191,7 +184,6 @@ def __init__( access_key_id = staging_credentials.aws_access_key_id secret_access_key = staging_credentials.aws_secret_access_key elif isinstance(staging_credentials, GcpCredentials): - # TODO: HMAC keys aren't implemented in `GcpCredentials`. access_key_id = dlt.config["destination.filesystem.credentials.gcp_access_key_id"] secret_access_key = dlt.config[ "destination.filesystem.credentials.gcp_secret_access_key" @@ -233,7 +225,7 @@ def __init__( table_function = ( "SELECT * FROM" - f" azureBlobStorage('{storage_account_url}','{container_name}','{blobpath}','{account_name}','{account_key}','{clickhouse_format}')" + f" azureBlobStorage('{storage_account_url}','{container_name}','{blobpath}','{account_name}','{account_key}','{clickhouse_format}','{compression}')" ) statement = f"INSERT INTO {qualified_table_name} {table_function}" elif not bucket_path: @@ -255,7 +247,9 @@ def __init__( settings={ "allow_experimental_lightweight_delete": 1, "allow_experimental_object_type": 1, + "enable_http_compression": 1, }, + compression=None if compression == "none" else compression, ) except clickhouse_connect.driver.exceptions.Error as e: raise LoadJobTerminalException( @@ -363,8 +357,6 @@ def _get_table_update_sql( else: sql[0] += "\nPRIMARY KEY tuple()" - # TODO: Apply sort order and cluster key hints. - return sql def get_storage_table(self, table_name: str) -> Tuple[bool, TTableSchemaColumns]: diff --git a/dlt/destinations/impl/clickhouse/sql_client.py b/dlt/destinations/impl/clickhouse/sql_client.py index 184205da78..0af8933ae7 100644 --- a/dlt/destinations/impl/clickhouse/sql_client.py +++ b/dlt/destinations/impl/clickhouse/sql_client.py @@ -134,6 +134,7 @@ def execute_query( query = ( "set allow_experimental_lightweight_delete = 1;" "set allow_experimental_object_type = 1;" + "set enable_http_compression= 1;" f"{query}" ) with self._conn.cursor() as cursor: diff --git a/tests/load/utils.py b/tests/load/utils.py index 947b6dbe3d..9ebdb0f8fa 100644 --- a/tests/load/utils.py +++ b/tests/load/utils.py @@ -175,8 +175,8 @@ def destinations_configs( destination_configs += [ DestinationTestConfiguration(destination=destination) for destination in SQL_DESTINATIONS - if destination not in ("athena", "mssql", "synapse", "databricks", "clickhouse") - if destination not in ("athena", "mssql", "synapse", "databricks", "dremio") + if destination + not in ("athena", "mssql", "synapse", "databricks", "dremio", "clickhouse") ] destination_configs += [ DestinationTestConfiguration(destination="duckdb", file_format="parquet") @@ -205,7 +205,17 @@ def destinations_configs( DestinationTestConfiguration( destination="clickhouse", file_format="jsonl", - disable_compression=True, + ) + ] + destination_configs += [ + DestinationTestConfiguration( + destination="clickhouse", + file_format="parquet", + ) + ] + destination_configs += [ + DestinationTestConfiguration( + destination="clickhouse", file_format="parquet", disable_compression=True ) ] destination_configs += [ @@ -326,7 +336,6 @@ def destinations_configs( file_format="parquet", bucket_url=GCS_BUCKET, extra_info="gcs-authorization", - disable_compression=True, ), DestinationTestConfiguration( destination="clickhouse", @@ -334,7 +343,6 @@ def destinations_configs( file_format="parquet", bucket_url=AWS_BUCKET, extra_info="s3-authorization", - disable_compression=True, ), DestinationTestConfiguration( destination="clickhouse", @@ -342,7 +350,6 @@ def destinations_configs( file_format="parquet", bucket_url=AZ_BUCKET, extra_info="az-authorization", - disable_compression=True, ), DestinationTestConfiguration( destination="clickhouse", @@ -350,7 +357,6 @@ def destinations_configs( file_format="jsonl", bucket_url=AZ_BUCKET, extra_info="az-authorization", - disable_compression=True, ), DestinationTestConfiguration( destination="clickhouse", @@ -358,7 +364,6 @@ def destinations_configs( file_format="jsonl", bucket_url=GCS_BUCKET, extra_info="gcs-authorization", - disable_compression=True, ), DestinationTestConfiguration( destination="clickhouse", @@ -366,7 +371,14 @@ def destinations_configs( file_format="jsonl", bucket_url=AWS_BUCKET, extra_info="s3-authorization", + ), + DestinationTestConfiguration( + destination="clickhouse", + staging="filesystem", + file_format="jsonl", + bucket_url=AWS_BUCKET, disable_compression=True, + extra_info="s3-authorization", ), DestinationTestConfiguration( destination="dremio", diff --git a/tests/utils.py b/tests/utils.py index 410c2363d3..710e95458d 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -48,6 +48,7 @@ "destination", "synapse", "databricks", + "clickhouse", "dremio", } NON_SQL_DESTINATIONS = {"filesystem", "weaviate", "dummy", "motherduck", "qdrant", "destination"}