Skip to content

Commit

Permalink
fixes has_dataset, recognizes more exceptions
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix committed Apr 10, 2024
1 parent 463ca1d commit e7e5925
Showing 1 changed file with 21 additions and 15 deletions.
36 changes: 21 additions & 15 deletions dlt/destinations/impl/clickhouse/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
Iterator,
AnyStr,
Any,
List,
Optional,
Sequence,
ClassVar,
Expand Down Expand Up @@ -44,7 +45,7 @@ class ClickHouseDBApiCursorImpl(DBApiCursorImpl):
class ClickHouseSqlClient(
SqlClientBase[clickhouse_driver.dbapi.connection.Connection], DBTransaction
):
dbapi: ClassVar[DBApi] = clickhouse_driver.dbapi.connection.Connection
dbapi: ClassVar[DBApi] = clickhouse_driver.dbapi
capabilities: ClassVar[DestinationCapabilitiesContext] = capabilities()

def __init__(self, dataset_name: str, credentials: ClickHouseCredentials) -> None:
Expand All @@ -54,7 +55,7 @@ def __init__(self, dataset_name: str, credentials: ClickHouseCredentials) -> Non
self.database_name = credentials.database

def has_dataset(self) -> bool:
return super().has_dataset()
return len(self._list_tables()) > 0

def open_connection(self) -> clickhouse_driver.dbapi.connection.Connection:
self._conn = clickhouse_driver.dbapi.connect(
Expand Down Expand Up @@ -98,7 +99,17 @@ def create_dataset(self) -> None:
def drop_dataset(self) -> None:
# Since ClickHouse doesn't have schemas, we need to drop all tables in our virtual schema,
# or collection of tables, that has the `dataset_name` as a prefix.
to_drop_results = self.execute_sql(
to_drop_results = self._list_tables()
for table in to_drop_results:
# The "DROP TABLE" clause is discarded if we allow clickhouse_driver to handle parameter substitution.
# This is because the driver incorrectly substitutes the entire query string, causing the "DROP TABLE" keyword to be omitted.
# To resolve this, we are forced to provide the full query string here.
self.execute_sql(
f"""DROP TABLE {self.capabilities.escape_identifier(self.database_name)}.{self.capabilities.escape_identifier(table)} SYNC"""
)

def _list_tables(self) -> List[str]:
rows = self.execute_sql(
"""
SELECT name
FROM system.tables
Expand All @@ -110,14 +121,7 @@ def drop_dataset(self) -> None:
f"{self.dataset_name}%",
),
)
for to_drop_result in to_drop_results:
table = to_drop_result[0]
# The "DROP TABLE" clause is discarded if we allow clickhouse_driver to handle parameter substitution.
# This is because the driver incorrectly substitutes the entire query string, causing the "DROP TABLE" keyword to be omitted.
# To resolve this, we are forced to provide the full query string here.
self.execute_sql(
f"""DROP TABLE {self.capabilities.escape_identifier(self.database_name)}.{self.capabilities.escape_identifier(table)} SYNC"""
)
return [row[0] for row in rows]

@contextmanager
@raise_database_error
Expand Down Expand Up @@ -172,12 +176,14 @@ def make_qualified_table_name(self, table_name: str, escape: bool = True) -> str
return f"{database_name}.{dataset_and_table}"

@classmethod
def _make_database_exception(cls, ex: Exception) -> Exception: # type: ignore[return]
def _make_database_exception(cls, ex: Exception) -> Exception:
if isinstance(ex, clickhouse_driver.dbapi.errors.OperationalError):
if "Code: 57." in str(ex) or "Code: 82." in str(ex):
raise DatabaseTerminalException(ex)
if "Code: 57." in str(ex) or "Code: 82." in str(ex) or "Code: 47." in str(ex):
return DatabaseTerminalException(ex)
elif "Code: 60." in str(ex) or "Code: 81." in str(ex):
raise DatabaseUndefinedRelation(ex)
return DatabaseUndefinedRelation(ex)
else:
return DatabaseTransientException(ex)
elif isinstance(
ex,
(
Expand Down

0 comments on commit e7e5925

Please sign in to comment.