Skip to content

Commit

Permalink
Refactor URL conversion and staging for Clickhouse
Browse files Browse the repository at this point in the history
Signed-off-by: Marcel Coetzee <[email protected]>
  • Loading branch information
Pipboyguy committed Mar 18, 2024
1 parent 57ceeee commit 6cdf086
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 74 deletions.
106 changes: 57 additions & 49 deletions dlt/destinations/impl/clickhouse/clickhouse.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import os
from copy import deepcopy
from typing import ClassVar, Optional, Dict, List, Sequence
from urllib.parse import urlparse

from dlt.common.configuration.specs import (
CredentialsConfiguration,
AwsCredentialsWithoutDefaults,
AzureCredentialsWithoutDefaults,
)
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.destination.reference import (
Expand All @@ -26,9 +28,8 @@
)
from dlt.destinations.impl.clickhouse.sql_client import ClickhouseSqlClient
from dlt.destinations.impl.clickhouse.utils import (
convert_storage_url_to_http_url,
convert_storage_to_http_scheme,
render_s3_table_function,
render_azure_blob_storage_table_function,
)
from dlt.destinations.job_client_impl import (
SqlJobClientWithStaging,
Expand Down Expand Up @@ -94,7 +95,6 @@ def __init__(
self,
file_path: str,
table_name: str,
load_id: str,
client: ClickhouseSqlClient,
staging_credentials: Optional[CredentialsConfiguration] = None,
) -> None:
Expand All @@ -111,54 +111,63 @@ def __init__(
file_name = (
FileStorage.get_file_name_from_file_path(bucket_path) if bucket_path else file_name
)
file_extension = os.path.splitext(file_name)[1].lower()
if file_extension not in ["parquet", "jsonl"]:
raise ValueError("Clickhouse staging only supports 'parquet' and 'jsonl' file formats.")

if bucket_path:
bucket_url = urlparse(bucket_path)
bucket_http_url = convert_storage_url_to_http_url(bucket_url)
bucket_scheme = bucket_url.scheme

table_function: str

if bucket_scheme in ("s3", "gs", "gcs"):
if isinstance(staging_credentials, AwsCredentialsWithoutDefaults):
# Authenticated access.
table_function = render_s3_table_function(
bucket_http_url,
staging_credentials.aws_secret_access_key,
staging_credentials.aws_secret_access_key,
)
else:
# Unsigned access.
table_function = render_s3_table_function(bucket_http_url)
elif bucket_scheme in ("az", "abfs"):
if isinstance(staging_credentials, AwsCredentialsWithoutDefaults):
# Authenticated access.
table_function = render_azure_blob_storage_table_function(
bucket_http_url,
staging_credentials.aws_secret_access_key,
staging_credentials.aws_secret_access_key,
)
else:
# Unsigned access.
table_function = render_azure_blob_storage_table_function(bucket_http_url)
else:
# Local file.
raise NotImplementedError
if not bucket_path:
# Local filesystem.
raise NotImplementedError("Only object storage is supported.")

with client.begin_transaction():
# PUT and COPY in one transaction if local file, otherwise only copy.
if not bucket_path:
client.execute_sql(
f'PUT file://{file_path} @{stage_name}/"{load_id}" OVERWRITE = TRUE,'
" AUTO_COMPRESS = FALSE"
bucket_url = urlparse(bucket_path)
bucket_scheme = bucket_url.scheme

table_function: str

if bucket_scheme in ("s3", "gs", "gcs"):
bucket_http_url = convert_storage_to_http_scheme(bucket_url)

table_function = (
render_s3_table_function(
bucket_http_url,
staging_credentials.aws_secret_access_key,
staging_credentials.aws_secret_access_key,
file_format=file_extension, # type: ignore[arg-type]
)
if isinstance(staging_credentials, AwsCredentialsWithoutDefaults)
else render_s3_table_function(
bucket_http_url,
file_format=file_extension, # type: ignore[arg-type]
)
)
elif bucket_scheme in ("az", "abfs"):
if isinstance(staging_credentials, AzureCredentialsWithoutDefaults):
# Authenticated access.
account_name = staging_credentials.azure_storage_account_name
storage_account_url = (
f"{staging_credentials.azure_storage_account_name}.blob.core.windows.net"
)
client.execute_sql(f"""COPY INTO {qualified_table_name}
{from_clause}
{files_clause}
{credentials_clause}
FILE_FORMAT = {source_format}
MATCH_BY_COLUMN_NAME='CASE_INSENSITIVE'
""")
account_key = staging_credentials.azure_storage_sas_token
container_name = bucket_url.netloc
blobpath = bucket_url.path

format_mapping = {"jsonl": "JSONEachRow", "parquet": "Parquet"}
clickhouse_format = format_mapping[file_extension]

table_function = (
f"azureBlobStorage('{storage_account_url}','{container_name}','{ blobpath }','{ account_name }','{ account_key }','{ clickhouse_format}')"
)

else:
# Unsigned access.
raise NotImplementedError(
"Unsigned Azure Blob Storage access from Clickhouse isn't supported as yet."
)

with client.begin_transaction():
client.execute_sql(
f"""INSERT INTO {qualified_table_name} SELECT * FROM {table_function}"""
)

def state(self) -> TLoadJobState:
return "completed"
Expand Down Expand Up @@ -188,7 +197,6 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) ->
return super().start_file_load(table, file_path, load_id) or ClickhouseLoadJob(
file_path,
table["name"],
load_id,
self.sql_client,
staging_credentials=(
self.config.staging_config.credentials if self.config.staging_config else None
Expand Down
14 changes: 5 additions & 9 deletions dlt/destinations/impl/clickhouse/utils.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from typing import Union, Optional, Literal
from urllib.parse import urlparse, ParseResult
from urllib.parse import urlparse, ParseResult, urlunparse

from jinja2 import Template


S3_TABLE_FUNCTION_FILE_FORMATS = Literal["jsonl", "parquet"]


def convert_storage_url_to_http_url(
def convert_storage_to_http_scheme(
url: Union[str, ParseResult], use_https: bool = False, endpoint: str = None, region: str = None
) -> str:
try:
Expand All @@ -33,11 +33,9 @@ def convert_storage_url_to_http_url(
"gs": "storage.googleapis.com",
"gcs": "storage.googleapis.com",
}

domain = storage_domains[parsed_url.scheme]

return f"{protocol}://{bucket_name}.{domain}/{object_key}"

except Exception as e:
raise Exception(f"Error converting storage URL to HTTP protocol: '{url}'") from e

Expand All @@ -54,15 +52,13 @@ def render_s3_table_function(
format_mapping = {"jsonl": "JSONEachRow", "parquet": "Parquet"}
clickhouse_format = format_mapping[file_format]

template = Template("""s3('{{ url }}'{% if access_key_id and secret_access_key %},'{{ access_key_id }}','{{ secret_access_key }}'{% else %},NOSIGN{% endif %},'{{ clickhouse_format }}')""")
template = Template(
"""s3('{{ url }}'{% if access_key_id and secret_access_key %},'{{ access_key_id }}','{{ secret_access_key }}'{% else %},NOSIGN{% endif %},'{{ clickhouse_format }}')"""
)

return template.render(
url=url,
access_key_id=access_key_id,
secret_access_key=secret_access_key,
clickhouse_format=clickhouse_format,
).strip()


def render_azure_blob_storage_table_function():
raise NotImplementedError
33 changes: 17 additions & 16 deletions tests/load/clickhouse/test_utls.py
Original file line number Diff line number Diff line change
@@ -1,52 +1,52 @@
import pytest

from dlt.destinations.impl.clickhouse.utils import (
convert_storage_url_to_http_url,
convert_storage_to_http_scheme,
render_s3_table_function,
)


def test_convert_s3_url_to_http() -> None:
s3_url: str = "s3://my-bucket/path/to/file.txt"
expected_http_url: str = "http://my-bucket.s3.amazonaws.com/path/to/file.txt"
assert convert_storage_url_to_http_url(s3_url) == expected_http_url
assert convert_storage_to_http_scheme(s3_url) == expected_http_url


def test_convert_s3_url_to_https() -> None:
s3_url: str = "s3://my-bucket/path/to/file.txt"
expected_https_url: str = "https://my-bucket.s3.amazonaws.com/path/to/file.txt"
assert convert_storage_url_to_http_url(s3_url, use_https=True) == expected_https_url
assert convert_storage_to_http_scheme(s3_url, use_https=True) == expected_https_url


def test_convert_gs_url_to_http() -> None:
gs_url: str = "gs://my-bucket/path/to/file.txt"
expected_http_url: str = "http://my-bucket.storage.googleapis.com/path/to/file.txt"
assert convert_storage_url_to_http_url(gs_url) == expected_http_url
assert convert_storage_to_http_scheme(gs_url) == expected_http_url
gcs_url = "gcs://my-bucket/path/to/file.txt"
expected_http_url = "http://my-bucket.storage.googleapis.com/path/to/file.txt"
assert convert_storage_url_to_http_url(gcs_url) == expected_http_url
assert convert_storage_to_http_scheme(gcs_url) == expected_http_url


def test_convert_gs_url_to_https() -> None:
gs_url: str = "gs://my-bucket/path/to/file.txt"
expected_https_url: str = "https://my-bucket.storage.googleapis.com/path/to/file.txt"
assert convert_storage_url_to_http_url(gs_url, use_https=True) == expected_https_url
assert convert_storage_to_http_scheme(gs_url, use_https=True) == expected_https_url
gcs_url = "gcs://my-bucket/path/to/file.txt"
expected_https_url = "https://my-bucket.storage.googleapis.com/path/to/file.txt"
assert convert_storage_url_to_http_url(gcs_url, use_https=True) == expected_https_url
assert convert_storage_to_http_scheme(gcs_url, use_https=True) == expected_https_url


def test_convert_s3_url_to_http_with_region() -> None:
s3_url: str = "s3://my-bucket/path/to/file.txt"
expected_http_url: str = "http://my-bucket.s3-us-west-2.amazonaws.com/path/to/file.txt"
assert convert_storage_url_to_http_url(s3_url, region="us-west-2") == expected_http_url
assert convert_storage_to_http_scheme(s3_url, region="us-west-2") == expected_http_url


def test_convert_s3_url_to_https_with_region() -> None:
s3_url: str = "s3://my-bucket/path/to/file.txt"
expected_https_url: str = "https://my-bucket.s3-us-east-1.amazonaws.com/path/to/file.txt"
assert (
convert_storage_url_to_http_url(s3_url, use_https=True, region="us-east-1")
convert_storage_to_http_scheme(s3_url, use_https=True, region="us-east-1")
== expected_https_url
)

Expand All @@ -55,7 +55,7 @@ def test_convert_s3_url_to_http_with_endpoint() -> None:
s3_url: str = "s3://my-bucket/path/to/file.txt"
expected_http_url: str = "http://my-bucket.s3.custom-endpoint.com/path/to/file.txt"
assert (
convert_storage_url_to_http_url(s3_url, endpoint="s3.custom-endpoint.com")
convert_storage_to_http_scheme(s3_url, endpoint="s3.custom-endpoint.com")
== expected_http_url
)

Expand All @@ -64,7 +64,7 @@ def test_convert_s3_url_to_https_with_endpoint() -> None:
s3_url: str = "s3://my-bucket/path/to/file.txt"
expected_https_url: str = "https://my-bucket.s3.custom-endpoint.com/path/to/file.txt"
assert (
convert_storage_url_to_http_url(s3_url, use_https=True, endpoint="s3.custom-endpoint.com")
convert_storage_to_http_scheme(s3_url, use_https=True, endpoint="s3.custom-endpoint.com")
== expected_https_url
)

Expand All @@ -73,12 +73,12 @@ def test_convert_gs_url_to_http_with_endpoint() -> None:
gs_url: str = "gs://my-bucket/path/to/file.txt"
expected_http_url: str = "http://my-bucket.custom-endpoint.com/path/to/file.txt"
assert (
convert_storage_url_to_http_url(gs_url, endpoint="custom-endpoint.com") == expected_http_url
convert_storage_to_http_scheme(gs_url, endpoint="custom-endpoint.com") == expected_http_url
)
gcs_url = "gcs://my-bucket/path/to/file.txt"
expected_http_url = "http://my-bucket.custom-endpoint.com/path/to/file.txt"
assert (
convert_storage_url_to_http_url(gcs_url, endpoint="custom-endpoint.com")
convert_storage_to_http_scheme(gcs_url, endpoint="custom-endpoint.com")
== expected_http_url
)

Expand All @@ -87,13 +87,13 @@ def test_convert_gs_url_to_https_with_endpoint() -> None:
gs_url: str = "gs://my-bucket/path/to/file.txt"
expected_https_url: str = "https://my-bucket.custom-endpoint.com/path/to/file.txt"
assert (
convert_storage_url_to_http_url(gs_url, use_https=True, endpoint="custom-endpoint.com")
convert_storage_to_http_scheme(gs_url, use_https=True, endpoint="custom-endpoint.com")
== expected_https_url
)
gcs_url = "gcs://my-bucket/path/to/file.txt"
expected_https_url = "https://my-bucket.custom-endpoint.com/path/to/file.txt"
assert (
convert_storage_url_to_http_url(gcs_url, use_https=True, endpoint="custom-endpoint.com")
convert_storage_to_http_scheme(gcs_url, use_https=True, endpoint="custom-endpoint.com")
== expected_https_url
)

Expand Down Expand Up @@ -129,6 +129,7 @@ def test_render_without_credentials() -> None:
assert render_s3_table_function(url, file_format=file_format) == expected_output # type: ignore[arg-type]



def test_render_invalid_file_format() -> None:
url = "https://example.com/data.unknown"
access_key_id = "test_access_key"
Expand All @@ -141,7 +142,7 @@ def test_render_invalid_file_format() -> None:

def test_invalid_url_format() -> None:
with pytest.raises(Exception) as exc_info:
convert_storage_url_to_http_url("invalid-url")
convert_storage_to_http_scheme("invalid-url")
assert str(exc_info.value) == "Error converting storage URL to HTTP protocol: 'invalid-url'"


Expand Down

0 comments on commit 6cdf086

Please sign in to comment.