From c5d8709a377cd8cefc32bc5869775f8b6fb531ce Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Thu, 4 Apr 2024 23:53:12 +0200 Subject: [PATCH] No staging test case #1055 Signed-off-by: Marcel Coetzee --- .../impl/clickhouse/clickhouse.py | 33 ++++++++++++------- tests/load/utils.py | 2 -- 2 files changed, 21 insertions(+), 14 deletions(-) diff --git a/dlt/destinations/impl/clickhouse/clickhouse.py b/dlt/destinations/impl/clickhouse/clickhouse.py index 4d1734189b..a6b9d8da78 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse.py +++ b/dlt/destinations/impl/clickhouse/clickhouse.py @@ -5,6 +5,7 @@ from urllib.parse import urlparse import dlt +from dlt import config from dlt.common.configuration.specs import ( CredentialsConfiguration, AzureCredentialsWithoutDefaults, @@ -34,7 +35,7 @@ get_dedup_sort_tuple, ) from dlt.common.storages import FileStorage -from dlt.destinations.exceptions import MergeDispositionException +from dlt.destinations.exceptions import MergeDispositionException, LoadJobTerminalException from dlt.destinations.impl.clickhouse import capabilities from dlt.destinations.impl.clickhouse.clickhouse_adapter import ( TTableEngineType, @@ -163,12 +164,19 @@ def __init__( file_extension = os.path.splitext(file_name)[1][ 1: ].lower() # Remove dot (.) from file extension. + if file_extension not in ["parquet", "jsonl"]: - raise ValueError("Clickhouse staging only supports 'parquet' and 'jsonl' file formats.") + raise LoadJobTerminalException( + file_path, "Clickhouse loader Only supports parquet and jsonl files." + ) - print("File Path:", file_path) - print("Table Name:", table_name) - print("Bucket Path:", bucket_path) + if not config.get("data_writer.disable_compression"): + raise LoadJobTerminalException( + file_path, + "Clickhouse loader does not support gzip compressed files. Please disable" + " compression in the data writer configuration:" + " https://dlthub.com/docs/reference/performance#disabling-and-enabling-file-compression.", + ) if not bucket_path: # Local filesystem. @@ -180,6 +188,7 @@ def __init__( file_extension = cast(SUPPORTED_FILE_FORMATS, file_extension) table_function: str + table_function = "" if bucket_scheme in ("s3", "gs", "gcs"): bucket_http_url = convert_storage_to_http_scheme(bucket_url) @@ -201,17 +210,17 @@ def __init__( ) elif bucket_scheme in ("az", "abfs"): - if not isinstance( - staging_credentials, AzureCredentialsWithoutDefaults - ): - # Unsigned access. - raise NotImplementedError( - "Unsigned Azure Blob Storage access from Clickhouse isn't supported as yet." + if not isinstance(staging_credentials, AzureCredentialsWithoutDefaults): + raise LoadJobTerminalException( + file_path, + "Unsigned Azure Blob Storage access from Clickhouse isn't supported as yet.", ) # Authenticated access. account_name = staging_credentials.azure_storage_account_name - storage_account_url = f"https://{staging_credentials.azure_storage_account_name}.blob.core.windows.net" + storage_account_url = ( + f"https://{staging_credentials.azure_storage_account_name}.blob.core.windows.net" + ) account_key = staging_credentials.azure_storage_account_key container_name = bucket_url.netloc blobpath = bucket_url.path diff --git a/tests/load/utils.py b/tests/load/utils.py index 078c26bf71..93055cbd2b 100644 --- a/tests/load/utils.py +++ b/tests/load/utils.py @@ -201,8 +201,6 @@ def destinations_configs( DestinationTestConfiguration( destination="clickhouse", file_format="jsonl", - bucket_url=AWS_BUCKET, - extra_info="s3-authorization", disable_compression=True, ) ]