Skip to content

Commit

Permalink
No staging test case #1055
Browse files Browse the repository at this point in the history
Signed-off-by: Marcel Coetzee <[email protected]>
  • Loading branch information
Pipboyguy committed Apr 4, 2024
1 parent 79d9b80 commit c5d8709
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 14 deletions.
33 changes: 21 additions & 12 deletions dlt/destinations/impl/clickhouse/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from urllib.parse import urlparse

import dlt
from dlt import config
from dlt.common.configuration.specs import (
CredentialsConfiguration,
AzureCredentialsWithoutDefaults,
Expand Down Expand Up @@ -34,7 +35,7 @@
get_dedup_sort_tuple,
)
from dlt.common.storages import FileStorage
from dlt.destinations.exceptions import MergeDispositionException
from dlt.destinations.exceptions import MergeDispositionException, LoadJobTerminalException
from dlt.destinations.impl.clickhouse import capabilities
from dlt.destinations.impl.clickhouse.clickhouse_adapter import (
TTableEngineType,
Expand Down Expand Up @@ -163,12 +164,19 @@ def __init__(
file_extension = os.path.splitext(file_name)[1][
1:
].lower() # Remove dot (.) from file extension.

if file_extension not in ["parquet", "jsonl"]:
raise ValueError("Clickhouse staging only supports 'parquet' and 'jsonl' file formats.")
raise LoadJobTerminalException(
file_path, "Clickhouse loader Only supports parquet and jsonl files."
)

print("File Path:", file_path)
print("Table Name:", table_name)
print("Bucket Path:", bucket_path)
if not config.get("data_writer.disable_compression"):
raise LoadJobTerminalException(
file_path,
"Clickhouse loader does not support gzip compressed files. Please disable"
" compression in the data writer configuration:"
" https://dlthub.com/docs/reference/performance#disabling-and-enabling-file-compression.",
)

if not bucket_path:
# Local filesystem.
Expand All @@ -180,6 +188,7 @@ def __init__(
file_extension = cast(SUPPORTED_FILE_FORMATS, file_extension)
table_function: str

table_function = ""
if bucket_scheme in ("s3", "gs", "gcs"):
bucket_http_url = convert_storage_to_http_scheme(bucket_url)

Expand All @@ -201,17 +210,17 @@ def __init__(
)

elif bucket_scheme in ("az", "abfs"):
if not isinstance(
staging_credentials, AzureCredentialsWithoutDefaults
):
# Unsigned access.
raise NotImplementedError(
"Unsigned Azure Blob Storage access from Clickhouse isn't supported as yet."
if not isinstance(staging_credentials, AzureCredentialsWithoutDefaults):
raise LoadJobTerminalException(
file_path,
"Unsigned Azure Blob Storage access from Clickhouse isn't supported as yet.",
)

# Authenticated access.
account_name = staging_credentials.azure_storage_account_name
storage_account_url = f"https://{staging_credentials.azure_storage_account_name}.blob.core.windows.net"
storage_account_url = (
f"https://{staging_credentials.azure_storage_account_name}.blob.core.windows.net"
)
account_key = staging_credentials.azure_storage_account_key
container_name = bucket_url.netloc
blobpath = bucket_url.path
Expand Down
2 changes: 0 additions & 2 deletions tests/load/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,6 @@ def destinations_configs(
DestinationTestConfiguration(
destination="clickhouse",
file_format="jsonl",
bucket_url=AWS_BUCKET,
extra_info="s3-authorization",
disable_compression=True,
)
]
Expand Down

0 comments on commit c5d8709

Please sign in to comment.