diff --git a/dlt/destinations/impl/clickhouse/clickhouse.py b/dlt/destinations/impl/clickhouse/clickhouse.py index d8a56eb9ff..0170ac3d3b 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse.py +++ b/dlt/destinations/impl/clickhouse/clickhouse.py @@ -1,3 +1,4 @@ +import os from copy import deepcopy from typing import ClassVar, Optional, Dict, List, Sequence from urllib.parse import urlparse @@ -5,6 +6,7 @@ from dlt.common.configuration.specs import ( CredentialsConfiguration, AwsCredentialsWithoutDefaults, + AzureCredentialsWithoutDefaults, ) from dlt.common.destination import DestinationCapabilitiesContext from dlt.common.destination.reference import ( @@ -26,9 +28,8 @@ ) from dlt.destinations.impl.clickhouse.sql_client import ClickhouseSqlClient from dlt.destinations.impl.clickhouse.utils import ( - convert_storage_url_to_http_url, + convert_storage_to_http_scheme, render_s3_table_function, - render_azure_blob_storage_table_function, ) from dlt.destinations.job_client_impl import ( SqlJobClientWithStaging, @@ -94,7 +95,6 @@ def __init__( self, file_path: str, table_name: str, - load_id: str, client: ClickhouseSqlClient, staging_credentials: Optional[CredentialsConfiguration] = None, ) -> None: @@ -111,54 +111,63 @@ def __init__( file_name = ( FileStorage.get_file_name_from_file_path(bucket_path) if bucket_path else file_name ) + file_extension = os.path.splitext(file_name)[1].lower() + if file_extension not in ["parquet", "jsonl"]: + raise ValueError("Clickhouse staging only supports 'parquet' and 'jsonl' file formats.") - if bucket_path: - bucket_url = urlparse(bucket_path) - bucket_http_url = convert_storage_url_to_http_url(bucket_url) - bucket_scheme = bucket_url.scheme - - table_function: str - - if bucket_scheme in ("s3", "gs", "gcs"): - if isinstance(staging_credentials, AwsCredentialsWithoutDefaults): - # Authenticated access. - table_function = render_s3_table_function( - bucket_http_url, - staging_credentials.aws_secret_access_key, - staging_credentials.aws_secret_access_key, - ) - else: - # Unsigned access. - table_function = render_s3_table_function(bucket_http_url) - elif bucket_scheme in ("az", "abfs"): - if isinstance(staging_credentials, AwsCredentialsWithoutDefaults): - # Authenticated access. - table_function = render_azure_blob_storage_table_function( - bucket_http_url, - staging_credentials.aws_secret_access_key, - staging_credentials.aws_secret_access_key, - ) - else: - # Unsigned access. - table_function = render_azure_blob_storage_table_function(bucket_http_url) - else: - # Local file. - raise NotImplementedError + if not bucket_path: + # Local filesystem. + raise NotImplementedError("Only object storage is supported.") - with client.begin_transaction(): - # PUT and COPY in one transaction if local file, otherwise only copy. - if not bucket_path: - client.execute_sql( - f'PUT file://{file_path} @{stage_name}/"{load_id}" OVERWRITE = TRUE,' - " AUTO_COMPRESS = FALSE" + bucket_url = urlparse(bucket_path) + bucket_scheme = bucket_url.scheme + + table_function: str + + if bucket_scheme in ("s3", "gs", "gcs"): + bucket_http_url = convert_storage_to_http_scheme(bucket_url) + + table_function = ( + render_s3_table_function( + bucket_http_url, + staging_credentials.aws_secret_access_key, + staging_credentials.aws_secret_access_key, + file_format=file_extension, # type: ignore[arg-type] + ) + if isinstance(staging_credentials, AwsCredentialsWithoutDefaults) + else render_s3_table_function( + bucket_http_url, + file_format=file_extension, # type: ignore[arg-type] + ) + ) + elif bucket_scheme in ("az", "abfs"): + if isinstance(staging_credentials, AzureCredentialsWithoutDefaults): + # Authenticated access. + account_name = staging_credentials.azure_storage_account_name + storage_account_url = ( + f"{staging_credentials.azure_storage_account_name}.blob.core.windows.net" ) - client.execute_sql(f"""COPY INTO {qualified_table_name} - {from_clause} - {files_clause} - {credentials_clause} - FILE_FORMAT = {source_format} - MATCH_BY_COLUMN_NAME='CASE_INSENSITIVE' - """) + account_key = staging_credentials.azure_storage_sas_token + container_name = bucket_url.netloc + blobpath = bucket_url.path + + format_mapping = {"jsonl": "JSONEachRow", "parquet": "Parquet"} + clickhouse_format = format_mapping[file_extension] + + table_function = ( + f"azureBlobStorage('{storage_account_url}','{container_name}','{ blobpath }','{ account_name }','{ account_key }','{ clickhouse_format}')" + ) + + else: + # Unsigned access. + raise NotImplementedError( + "Unsigned Azure Blob Storage access from Clickhouse isn't supported as yet." + ) + + with client.begin_transaction(): + client.execute_sql( + f"""INSERT INTO {qualified_table_name} SELECT * FROM {table_function}""" + ) def state(self) -> TLoadJobState: return "completed" @@ -188,7 +197,6 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> return super().start_file_load(table, file_path, load_id) or ClickhouseLoadJob( file_path, table["name"], - load_id, self.sql_client, staging_credentials=( self.config.staging_config.credentials if self.config.staging_config else None diff --git a/dlt/destinations/impl/clickhouse/utils.py b/dlt/destinations/impl/clickhouse/utils.py index 543a07753b..6297712943 100644 --- a/dlt/destinations/impl/clickhouse/utils.py +++ b/dlt/destinations/impl/clickhouse/utils.py @@ -1,5 +1,5 @@ from typing import Union, Optional, Literal -from urllib.parse import urlparse, ParseResult +from urllib.parse import urlparse, ParseResult, urlunparse from jinja2 import Template @@ -7,7 +7,7 @@ S3_TABLE_FUNCTION_FILE_FORMATS = Literal["jsonl", "parquet"] -def convert_storage_url_to_http_url( +def convert_storage_to_http_scheme( url: Union[str, ParseResult], use_https: bool = False, endpoint: str = None, region: str = None ) -> str: try: @@ -33,11 +33,9 @@ def convert_storage_url_to_http_url( "gs": "storage.googleapis.com", "gcs": "storage.googleapis.com", } - domain = storage_domains[parsed_url.scheme] return f"{protocol}://{bucket_name}.{domain}/{object_key}" - except Exception as e: raise Exception(f"Error converting storage URL to HTTP protocol: '{url}'") from e @@ -54,7 +52,9 @@ def render_s3_table_function( format_mapping = {"jsonl": "JSONEachRow", "parquet": "Parquet"} clickhouse_format = format_mapping[file_format] - template = Template("""s3('{{ url }}'{% if access_key_id and secret_access_key %},'{{ access_key_id }}','{{ secret_access_key }}'{% else %},NOSIGN{% endif %},'{{ clickhouse_format }}')""") + template = Template( + """s3('{{ url }}'{% if access_key_id and secret_access_key %},'{{ access_key_id }}','{{ secret_access_key }}'{% else %},NOSIGN{% endif %},'{{ clickhouse_format }}')""" + ) return template.render( url=url, @@ -62,7 +62,3 @@ def render_s3_table_function( secret_access_key=secret_access_key, clickhouse_format=clickhouse_format, ).strip() - - -def render_azure_blob_storage_table_function(): - raise NotImplementedError diff --git a/tests/load/clickhouse/test_utls.py b/tests/load/clickhouse/test_utls.py index 5176899775..4d672fb7de 100644 --- a/tests/load/clickhouse/test_utls.py +++ b/tests/load/clickhouse/test_utls.py @@ -1,7 +1,7 @@ import pytest from dlt.destinations.impl.clickhouse.utils import ( - convert_storage_url_to_http_url, + convert_storage_to_http_scheme, render_s3_table_function, ) @@ -9,44 +9,44 @@ def test_convert_s3_url_to_http() -> None: s3_url: str = "s3://my-bucket/path/to/file.txt" expected_http_url: str = "http://my-bucket.s3.amazonaws.com/path/to/file.txt" - assert convert_storage_url_to_http_url(s3_url) == expected_http_url + assert convert_storage_to_http_scheme(s3_url) == expected_http_url def test_convert_s3_url_to_https() -> None: s3_url: str = "s3://my-bucket/path/to/file.txt" expected_https_url: str = "https://my-bucket.s3.amazonaws.com/path/to/file.txt" - assert convert_storage_url_to_http_url(s3_url, use_https=True) == expected_https_url + assert convert_storage_to_http_scheme(s3_url, use_https=True) == expected_https_url def test_convert_gs_url_to_http() -> None: gs_url: str = "gs://my-bucket/path/to/file.txt" expected_http_url: str = "http://my-bucket.storage.googleapis.com/path/to/file.txt" - assert convert_storage_url_to_http_url(gs_url) == expected_http_url + assert convert_storage_to_http_scheme(gs_url) == expected_http_url gcs_url = "gcs://my-bucket/path/to/file.txt" expected_http_url = "http://my-bucket.storage.googleapis.com/path/to/file.txt" - assert convert_storage_url_to_http_url(gcs_url) == expected_http_url + assert convert_storage_to_http_scheme(gcs_url) == expected_http_url def test_convert_gs_url_to_https() -> None: gs_url: str = "gs://my-bucket/path/to/file.txt" expected_https_url: str = "https://my-bucket.storage.googleapis.com/path/to/file.txt" - assert convert_storage_url_to_http_url(gs_url, use_https=True) == expected_https_url + assert convert_storage_to_http_scheme(gs_url, use_https=True) == expected_https_url gcs_url = "gcs://my-bucket/path/to/file.txt" expected_https_url = "https://my-bucket.storage.googleapis.com/path/to/file.txt" - assert convert_storage_url_to_http_url(gcs_url, use_https=True) == expected_https_url + assert convert_storage_to_http_scheme(gcs_url, use_https=True) == expected_https_url def test_convert_s3_url_to_http_with_region() -> None: s3_url: str = "s3://my-bucket/path/to/file.txt" expected_http_url: str = "http://my-bucket.s3-us-west-2.amazonaws.com/path/to/file.txt" - assert convert_storage_url_to_http_url(s3_url, region="us-west-2") == expected_http_url + assert convert_storage_to_http_scheme(s3_url, region="us-west-2") == expected_http_url def test_convert_s3_url_to_https_with_region() -> None: s3_url: str = "s3://my-bucket/path/to/file.txt" expected_https_url: str = "https://my-bucket.s3-us-east-1.amazonaws.com/path/to/file.txt" assert ( - convert_storage_url_to_http_url(s3_url, use_https=True, region="us-east-1") + convert_storage_to_http_scheme(s3_url, use_https=True, region="us-east-1") == expected_https_url ) @@ -55,7 +55,7 @@ def test_convert_s3_url_to_http_with_endpoint() -> None: s3_url: str = "s3://my-bucket/path/to/file.txt" expected_http_url: str = "http://my-bucket.s3.custom-endpoint.com/path/to/file.txt" assert ( - convert_storage_url_to_http_url(s3_url, endpoint="s3.custom-endpoint.com") + convert_storage_to_http_scheme(s3_url, endpoint="s3.custom-endpoint.com") == expected_http_url ) @@ -64,7 +64,7 @@ def test_convert_s3_url_to_https_with_endpoint() -> None: s3_url: str = "s3://my-bucket/path/to/file.txt" expected_https_url: str = "https://my-bucket.s3.custom-endpoint.com/path/to/file.txt" assert ( - convert_storage_url_to_http_url(s3_url, use_https=True, endpoint="s3.custom-endpoint.com") + convert_storage_to_http_scheme(s3_url, use_https=True, endpoint="s3.custom-endpoint.com") == expected_https_url ) @@ -73,12 +73,12 @@ def test_convert_gs_url_to_http_with_endpoint() -> None: gs_url: str = "gs://my-bucket/path/to/file.txt" expected_http_url: str = "http://my-bucket.custom-endpoint.com/path/to/file.txt" assert ( - convert_storage_url_to_http_url(gs_url, endpoint="custom-endpoint.com") == expected_http_url + convert_storage_to_http_scheme(gs_url, endpoint="custom-endpoint.com") == expected_http_url ) gcs_url = "gcs://my-bucket/path/to/file.txt" expected_http_url = "http://my-bucket.custom-endpoint.com/path/to/file.txt" assert ( - convert_storage_url_to_http_url(gcs_url, endpoint="custom-endpoint.com") + convert_storage_to_http_scheme(gcs_url, endpoint="custom-endpoint.com") == expected_http_url ) @@ -87,13 +87,13 @@ def test_convert_gs_url_to_https_with_endpoint() -> None: gs_url: str = "gs://my-bucket/path/to/file.txt" expected_https_url: str = "https://my-bucket.custom-endpoint.com/path/to/file.txt" assert ( - convert_storage_url_to_http_url(gs_url, use_https=True, endpoint="custom-endpoint.com") + convert_storage_to_http_scheme(gs_url, use_https=True, endpoint="custom-endpoint.com") == expected_https_url ) gcs_url = "gcs://my-bucket/path/to/file.txt" expected_https_url = "https://my-bucket.custom-endpoint.com/path/to/file.txt" assert ( - convert_storage_url_to_http_url(gcs_url, use_https=True, endpoint="custom-endpoint.com") + convert_storage_to_http_scheme(gcs_url, use_https=True, endpoint="custom-endpoint.com") == expected_https_url ) @@ -129,6 +129,7 @@ def test_render_without_credentials() -> None: assert render_s3_table_function(url, file_format=file_format) == expected_output # type: ignore[arg-type] + def test_render_invalid_file_format() -> None: url = "https://example.com/data.unknown" access_key_id = "test_access_key" @@ -141,7 +142,7 @@ def test_render_invalid_file_format() -> None: def test_invalid_url_format() -> None: with pytest.raises(Exception) as exc_info: - convert_storage_url_to_http_url("invalid-url") + convert_storage_to_http_scheme("invalid-url") assert str(exc_info.value) == "Error converting storage URL to HTTP protocol: 'invalid-url'"