Skip to content

Commit

Permalink
fixes some tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix committed Apr 10, 2024
1 parent e7e5925 commit 871aa4a
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 6 deletions.
11 changes: 9 additions & 2 deletions tests/load/pipeline/test_arrow_loading.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from datetime import datetime # noqa: I251
from typing import Any, Union, List, Dict, Tuple, Literal
import os

import pytest
import numpy as np
import pyarrow as pa
import pandas as pd
import base64

import dlt
from dlt.common import pendulum
Expand Down Expand Up @@ -42,6 +42,7 @@ def test_load_arrow_item(
"redshift",
"databricks",
"synapse",
"clickhouse",
) # athena/redshift can't load TIME columns
include_binary = not (
destination_config.destination in ("redshift", "databricks")
Expand Down Expand Up @@ -102,11 +103,17 @@ def some_data():
row[i] = row[i].tobytes()

if destination_config.destination == "redshift":
# Binary columns are hex formatted in results
# Redshift needs hex string
for record in records:
if "binary" in record:
record["binary"] = record["binary"].hex()

if destination_config.destination == "clickhouse":
# Clickhouse needs base64 string
for record in records:
if "binary" in record:
record["binary"] = base64.b64encode(record["binary"]).decode("ascii")

for row in rows:
for i in range(len(row)):
if isinstance(row[i], datetime):
Expand Down
11 changes: 7 additions & 4 deletions tests/load/test_sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ def test_database_exceptions(client: SqlJobClientBase) -> None:
with client.sql_client.execute_query(f"DELETE FROM {qualified_name} WHERE 1=1"):
pass
assert client.sql_client.is_dbapi_exception(term_ex.value.dbapi_exception)
if client.config.destination_type != "dremio":
if client.config.destination_type not in ["dremio", "clickhouse"]:
with pytest.raises(DatabaseUndefinedRelation) as term_ex:
with client.sql_client.execute_query("DROP SCHEMA UNKNOWN"):
pass
Expand Down Expand Up @@ -630,18 +630,21 @@ def assert_load_id(sql_client: SqlClientBase[TNativeConn], load_id: str) -> None
def prepare_temp_table(client: SqlJobClientBase) -> str:
uniq_suffix = uniq_id()
table_name = f"tmp_{uniq_suffix}"
iceberg_table_suffix = ""
ddl_suffix = ""
coltype = "numeric"
if client.config.destination_type == "athena":
iceberg_table_suffix = (
ddl_suffix = (
f"LOCATION '{AWS_BUCKET}/ci/{table_name}' TBLPROPERTIES ('table_type'='ICEBERG',"
" 'format'='parquet');"
)
coltype = "bigint"
qualified_table_name = table_name
if client.config.destination_type == "clickhouse":
ddl_suffix = "ENGINE = MergeTree() ORDER BY col"
qualified_table_name = client.sql_client.make_qualified_table_name(table_name)
else:
qualified_table_name = client.sql_client.make_qualified_table_name(table_name)
client.sql_client.execute_sql(
f"CREATE TABLE {qualified_table_name} (col {coltype}) {iceberg_table_suffix};"
f"CREATE TABLE {qualified_table_name} (col {coltype}) {ddl_suffix};"
)
return table_name

0 comments on commit 871aa4a

Please sign in to comment.