diff --git a/dlt/destinations/impl/clickhouse/__init__.py b/dlt/destinations/impl/clickhouse/__init__.py index c621f8db43..035e799d38 100644 --- a/dlt/destinations/impl/clickhouse/__init__.py +++ b/dlt/destinations/impl/clickhouse/__init__.py @@ -8,9 +8,9 @@ def capabilities() -> DestinationCapabilitiesContext: caps = DestinationCapabilitiesContext() # Clickhouse only supports loading from staged files on s3 for now. - caps.preferred_loader_file_format = "insert_values" + caps.preferred_loader_file_format = "jsonl" caps.supported_loader_file_formats = ["parquet", "jsonl", "insert_values"] - caps.preferred_staging_file_format = "parquet" + caps.preferred_staging_file_format = "jsonl" caps.supported_staging_file_formats = ["parquet", "jsonl"] caps.escape_identifier = escape_clickhouse_identifier diff --git a/dlt/destinations/impl/clickhouse/clickhouse.py b/dlt/destinations/impl/clickhouse/clickhouse.py index ae2f8a10e1..4910b9c0f6 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse.py +++ b/dlt/destinations/impl/clickhouse/clickhouse.py @@ -49,8 +49,8 @@ FILE_FORMAT_TO_TABLE_FUNCTION_MAPPING, SUPPORTED_FILE_FORMATS, ) +from dlt.destinations.insert_job_client import InsertValuesJobClient from dlt.destinations.job_client_impl import ( - SqlJobClientWithStaging, SqlJobClientBase, ) from dlt.destinations.job_impl import NewReferenceJob, EmptyLoadJob @@ -365,7 +365,7 @@ def gen_merge_sql( return sql -class ClickhouseClient(SqlJobClientWithStaging, SupportsStagingDestination): +class ClickhouseClient(InsertValuesJobClient, SupportsStagingDestination): capabilities: ClassVar[DestinationCapabilitiesContext] = capabilities() def __init__( @@ -381,8 +381,34 @@ def __init__( self.active_hints = deepcopy(HINT_TO_CLICKHOUSE_ATTR) self.type_mapper = ClickhouseTypeMapper(self.capabilities) + def _create_merge_followup_jobs(self, table_chain: Sequence[TTableSchema]) -> List[NewLoadJob]: + return [ClickhouseMergeJob.from_table_chain(table_chain, self.sql_client)] + + def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = None) -> str: + # Build column definition. + # The primary key and sort order definition is defined outside column specification. + hints_str = " ".join( + self.active_hints.get(hint) + for hint in self.active_hints.keys() + if c.get(hint, False) is True + and hint not in ("primary_key", "sort") + and hint in self.active_hints + ) + + # Alter table statements only accept `Nullable` modifiers. + type_with_nullability_modifier = ( + f"Nullable({self.type_mapper.to_db_type(c)})" + if c.get("nullable", True) + else self.type_mapper.to_db_type(c) + ) + + return ( + f"{self.capabilities.escape_identifier(c['name'])} {type_with_nullability_modifier} {hints_str}" + .strip() + ) + def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob: - return super().start_file_load(table, file_path, load_id) or ClickhouseLoadJob( + job = super().start_file_load(table, file_path, load_id) or ClickhouseLoadJob( file_path, table["name"], self.sql_client, @@ -390,6 +416,11 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> self.config.staging_config.credentials if self.config.staging_config else None ), ) + if not job: + assert NewReferenceJob.is_reference_job( + file_path + ), "Clickhouse must use staging to load files." + return job def _get_table_update_sql( self, table_name: str, new_columns: Sequence[TColumnSchema], generate_alter: bool @@ -419,29 +450,6 @@ def _get_table_update_sql( return sql - def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = None) -> str: - # Build column definition. - # The primary key and sort order definition is defined outside column specification. - hints_str = " ".join( - self.active_hints.get(hint) - for hint in self.active_hints.keys() - if c.get(hint, False) is True - and hint not in ("primary_key", "sort") - and hint in self.active_hints - ) - - # Alter table statements only accept `Nullable` modifiers. - type_with_nullability_modifier = ( - f"Nullable({self.type_mapper.to_db_type(c)})" - if c.get("nullable", True) - else self.type_mapper.to_db_type(c) - ) - - return ( - f"{self.capabilities.escape_identifier(c['name'])} {type_with_nullability_modifier} {hints_str}" - .strip() - ) - def get_storage_table(self, table_name: str) -> Tuple[bool, TTableSchemaColumns]: fields = self._get_storage_table_query_columns() db_params = self.sql_client.make_qualified_table_name(table_name, escape=False).split( @@ -471,6 +479,7 @@ def get_storage_table(self, table_name: str) -> Tuple[bool, TTableSchemaColumns] return True, schema_table # Clickhouse fields are not nullable by default. + @staticmethod def _gen_not_null(v: bool) -> str: # We use the `Nullable` modifier instead of NULL / NOT NULL modifiers to cater for ALTER statement. @@ -483,6 +492,3 @@ def _from_db_type( def restore_file_load(self, file_path: str) -> LoadJob: return EmptyLoadJob.from_file_path(file_path, "completed") - - def _create_merge_followup_jobs(self, table_chain: Sequence[TTableSchema]) -> List[NewLoadJob]: - return [ClickhouseMergeJob.from_table_chain(table_chain, self.sql_client)] diff --git a/dlt/destinations/impl/clickhouse/sql_client.py b/dlt/destinations/impl/clickhouse/sql_client.py index 6641f7b752..74d0b217a0 100644 --- a/dlt/destinations/impl/clickhouse/sql_client.py +++ b/dlt/destinations/impl/clickhouse/sql_client.py @@ -57,11 +57,10 @@ def open_connection(self) -> clickhouse_driver.dbapi.connection.Connection: dsn=self.credentials.to_native_representation() ) with self._conn.cursor() as cur: - # Set session settings. There doesn't seem to be a way to set these - # without using the library's top-level, non-dbapi2 client. + # Toggle experimental settings. + # These are necessary for nested datatypes and other operations to work. cur.execute("set allow_experimental_object_type = 1") cur.execute("set allow_experimental_lightweight_delete = 1") - return self._conn @raise_open_connection_error diff --git a/dlt/sources/helpers/rest_client/paginators.py b/dlt/sources/helpers/rest_client/paginators.py index 65605b7dee..11a28c22ea 100644 --- a/dlt/sources/helpers/rest_client/paginators.py +++ b/dlt/sources/helpers/rest_client/paginators.py @@ -1,5 +1,6 @@ from abc import ABC, abstractmethod from typing import Optional +from urllib.parse import urlparse, urljoin from dlt.sources.helpers.requests import Response, Request from dlt.common import jsonpath @@ -85,6 +86,14 @@ def update_state(self, response: Response) -> None: if total is None: raise ValueError(f"Total count not found in response for {self.__class__.__name__}") + try: + total = int(total) + except ValueError: + raise ValueError( + f"Total count is not an integer in response for {self.__class__.__name__}. " + f"Expected an integer, got {total}" + ) + self.offset += self.limit if self.offset >= total: @@ -100,6 +109,12 @@ def update_request(self, request: Request) -> None: class BaseNextUrlPaginator(BasePaginator): def update_request(self, request: Request) -> None: + # Handle relative URLs + if self.next_reference: + parsed_url = urlparse(self.next_reference) + if not parsed_url.scheme: + self.next_reference = urljoin(request.url, self.next_reference) + request.url = self.next_reference