Skip to content

Commit

Permalink
Revert non-applicable changes #1055
Browse files Browse the repository at this point in the history
Signed-off-by: Marcel Coetzee <[email protected]>
  • Loading branch information
Pipboyguy committed Apr 3, 2024
1 parent 6bded9a commit eca4d2d
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 34 deletions.
4 changes: 2 additions & 2 deletions dlt/destinations/impl/clickhouse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
def capabilities() -> DestinationCapabilitiesContext:
caps = DestinationCapabilitiesContext()
# Clickhouse only supports loading from staged files on s3 for now.
caps.preferred_loader_file_format = "insert_values"
caps.preferred_loader_file_format = "jsonl"
caps.supported_loader_file_formats = ["parquet", "jsonl", "insert_values"]
caps.preferred_staging_file_format = "parquet"
caps.preferred_staging_file_format = "jsonl"
caps.supported_staging_file_formats = ["parquet", "jsonl"]

caps.escape_identifier = escape_clickhouse_identifier
Expand Down
64 changes: 35 additions & 29 deletions dlt/destinations/impl/clickhouse/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@
FILE_FORMAT_TO_TABLE_FUNCTION_MAPPING,
SUPPORTED_FILE_FORMATS,
)
from dlt.destinations.insert_job_client import InsertValuesJobClient
from dlt.destinations.job_client_impl import (
SqlJobClientWithStaging,
SqlJobClientBase,
)
from dlt.destinations.job_impl import NewReferenceJob, EmptyLoadJob
Expand Down Expand Up @@ -365,7 +365,7 @@ def gen_merge_sql(
return sql


class ClickhouseClient(SqlJobClientWithStaging, SupportsStagingDestination):
class ClickhouseClient(InsertValuesJobClient, SupportsStagingDestination):
capabilities: ClassVar[DestinationCapabilitiesContext] = capabilities()

def __init__(
Expand All @@ -381,15 +381,46 @@ def __init__(
self.active_hints = deepcopy(HINT_TO_CLICKHOUSE_ATTR)
self.type_mapper = ClickhouseTypeMapper(self.capabilities)

def _create_merge_followup_jobs(self, table_chain: Sequence[TTableSchema]) -> List[NewLoadJob]:
return [ClickhouseMergeJob.from_table_chain(table_chain, self.sql_client)]

def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = None) -> str:
# Build column definition.
# The primary key and sort order definition is defined outside column specification.
hints_str = " ".join(
self.active_hints.get(hint)
for hint in self.active_hints.keys()
if c.get(hint, False) is True
and hint not in ("primary_key", "sort")
and hint in self.active_hints
)

# Alter table statements only accept `Nullable` modifiers.
type_with_nullability_modifier = (
f"Nullable({self.type_mapper.to_db_type(c)})"
if c.get("nullable", True)
else self.type_mapper.to_db_type(c)
)

return (
f"{self.capabilities.escape_identifier(c['name'])} {type_with_nullability_modifier} {hints_str}"
.strip()
)

def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob:
return super().start_file_load(table, file_path, load_id) or ClickhouseLoadJob(
job = super().start_file_load(table, file_path, load_id) or ClickhouseLoadJob(
file_path,
table["name"],
self.sql_client,
staging_credentials=(
self.config.staging_config.credentials if self.config.staging_config else None
),
)
if not job:
assert NewReferenceJob.is_reference_job(
file_path
), "Clickhouse must use staging to load files."
return job

def _get_table_update_sql(
self, table_name: str, new_columns: Sequence[TColumnSchema], generate_alter: bool
Expand Down Expand Up @@ -419,29 +450,6 @@ def _get_table_update_sql(

return sql

def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = None) -> str:
# Build column definition.
# The primary key and sort order definition is defined outside column specification.
hints_str = " ".join(
self.active_hints.get(hint)
for hint in self.active_hints.keys()
if c.get(hint, False) is True
and hint not in ("primary_key", "sort")
and hint in self.active_hints
)

# Alter table statements only accept `Nullable` modifiers.
type_with_nullability_modifier = (
f"Nullable({self.type_mapper.to_db_type(c)})"
if c.get("nullable", True)
else self.type_mapper.to_db_type(c)
)

return (
f"{self.capabilities.escape_identifier(c['name'])} {type_with_nullability_modifier} {hints_str}"
.strip()
)

def get_storage_table(self, table_name: str) -> Tuple[bool, TTableSchemaColumns]:
fields = self._get_storage_table_query_columns()
db_params = self.sql_client.make_qualified_table_name(table_name, escape=False).split(
Expand Down Expand Up @@ -471,6 +479,7 @@ def get_storage_table(self, table_name: str) -> Tuple[bool, TTableSchemaColumns]
return True, schema_table

# Clickhouse fields are not nullable by default.

@staticmethod
def _gen_not_null(v: bool) -> str:
# We use the `Nullable` modifier instead of NULL / NOT NULL modifiers to cater for ALTER statement.
Expand All @@ -483,6 +492,3 @@ def _from_db_type(

def restore_file_load(self, file_path: str) -> LoadJob:
return EmptyLoadJob.from_file_path(file_path, "completed")

def _create_merge_followup_jobs(self, table_chain: Sequence[TTableSchema]) -> List[NewLoadJob]:
return [ClickhouseMergeJob.from_table_chain(table_chain, self.sql_client)]
5 changes: 2 additions & 3 deletions dlt/destinations/impl/clickhouse/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,10 @@ def open_connection(self) -> clickhouse_driver.dbapi.connection.Connection:
dsn=self.credentials.to_native_representation()
)
with self._conn.cursor() as cur:
# Set session settings. There doesn't seem to be a way to set these
# without using the library's top-level, non-dbapi2 client.
# Toggle experimental settings.
# These are necessary for nested datatypes and other operations to work.
cur.execute("set allow_experimental_object_type = 1")
cur.execute("set allow_experimental_lightweight_delete = 1")

return self._conn

@raise_open_connection_error
Expand Down
15 changes: 15 additions & 0 deletions dlt/sources/helpers/rest_client/paginators.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from abc import ABC, abstractmethod
from typing import Optional
from urllib.parse import urlparse, urljoin

from dlt.sources.helpers.requests import Response, Request
from dlt.common import jsonpath
Expand Down Expand Up @@ -85,6 +86,14 @@ def update_state(self, response: Response) -> None:
if total is None:
raise ValueError(f"Total count not found in response for {self.__class__.__name__}")

try:
total = int(total)
except ValueError:
raise ValueError(
f"Total count is not an integer in response for {self.__class__.__name__}. "
f"Expected an integer, got {total}"
)

self.offset += self.limit

if self.offset >= total:
Expand All @@ -100,6 +109,12 @@ def update_request(self, request: Request) -> None:

class BaseNextUrlPaginator(BasePaginator):
def update_request(self, request: Request) -> None:
# Handle relative URLs
if self.next_reference:
parsed_url = urlparse(self.next_reference)
if not parsed_url.scheme:
self.next_reference = urljoin(request.url, self.next_reference)

request.url = self.next_reference


Expand Down

0 comments on commit eca4d2d

Please sign in to comment.