Skip to content

Commit

Permalink
Revert some files back to devel
Browse files Browse the repository at this point in the history
Signed-off-by: Marcel Coetzee <[email protected]>
  • Loading branch information
Pipboyguy committed Apr 6, 2024
1 parent b44bea3 commit d6309b3
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 41 deletions.
22 changes: 12 additions & 10 deletions dlt/cli/config_toml_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ def generate_typed_example(name: str, hint: AnyType) -> Any:
if sc_type == "complex":
if issubclass(inner_hint, C_Sequence):
return ["a", "b", "c"]
table = tomlkit.table(False)
table["key"] = "value"
return table
else:
table = tomlkit.table(False)
table["key"] = "value"
return table
if sc_type == "timestamp":
return pendulum.now().to_iso8601_string()
if sc_type == "date":
Expand Down Expand Up @@ -73,14 +74,15 @@ def write_value(
write_spec(inner_table, hint(), overwrite_existing)
if len(inner_table) > 0:
toml_table[name] = inner_table
elif default_value is None:
example_value = generate_typed_example(name, hint)
toml_table[name] = example_value
# tomlkit not supporting comments on boolean
if not isinstance(example_value, bool):
toml_table[name].comment("please set me up!")
else:
toml_table[name] = default_value
if default_value is None:
example_value = generate_typed_example(name, hint)
toml_table[name] = example_value
# tomlkit not supporting comments on boolean
if not isinstance(example_value, bool):
toml_table[name].comment("please set me up!")
else:
toml_table[name] = default_value


def write_spec(toml_table: TOMLTable, config: BaseConfiguration, overwrite_existing: bool) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ def parse_native_representation(self, native_value: Any) -> None:
raise InvalidConnectionString(self.__class__, native_value, self.drivername)
try:
url = make_url(native_value)
# Update only values that are not None.
# update only values that are not None
self.update({k: v for k, v in url._asdict().items() if v is not None})
if self.query is not None:
# Query may be immutable so make it mutable.
# query may be immutable so make it mutable
self.query = dict(self.query)
except Exception as e:
raise InvalidConnectionString(self.__class__, native_value, self.drivername) from e
except Exception:
raise InvalidConnectionString(self.__class__, native_value, self.drivername)

def on_resolved(self) -> None:
if self.password:
Expand Down
29 changes: 16 additions & 13 deletions dlt/common/libs/sql_alchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,18 +117,21 @@ def _assert_port(cls, port: Optional[int]) -> Optional[int]:
return None
try:
return int(port)
except TypeError as e:
raise TypeError("Port argument must be an integer or None") from e
except TypeError:
raise TypeError("Port argument must be an integer or None")

@classmethod
def _assert_str(cls, v: str, paramname: str) -> str:
if not isinstance(v, str):
raise TypeError(f"{paramname} must be a string")
raise TypeError("%s must be a string" % paramname)
return v

@classmethod
def _assert_none_str(cls, v: Optional[str], paramname: str) -> Optional[str]:
return v if v is None else cls._assert_str(v, paramname)
if v is None:
return v

return cls._assert_str(v, paramname)

@classmethod
def _str_dict(
Expand Down Expand Up @@ -251,14 +254,14 @@ def update_query_pairs(

new_query: Mapping[str, Union[str, Sequence[str]]]
if append:
new_query = {
k: (
tuple(to_list(existing_query[k]) + to_list(new_keys[k]))
if k in existing_query
else new_keys[k]
)
for k in new_keys
}
new_query = {}

for k in new_keys:
if k in existing_query:
new_query[k] = tuple(to_list(existing_query[k]) + to_list(new_keys[k]))
else:
new_query[k] = new_keys[k]

new_query.update(
{k: existing_query[k] for k in set(existing_query).difference(new_keys)}
)
Expand All @@ -280,7 +283,7 @@ def update_query_dict(

def render_as_string(self, hide_password: bool = True) -> str:
"""Render this `URL` object as a string."""
s = f"{self.drivername}://"
s = self.drivername + "://"
if self.username is not None:
s += quote(self.username, safe=" +")
if self.password is not None:
Expand Down
11 changes: 6 additions & 5 deletions dlt/destinations/impl/bigquery/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,9 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) ->
if insert_api == "streaming":
if table["write_disposition"] != "append":
raise DestinationTerminalException(
(
"BigQuery streaming insert can only be used with `append` write_disposition, while "
f'the given resource has `{table["write_disposition"]}`.'
)
"BigQuery streaming insert can only be used with `append`"
" write_disposition, while the given resource has"
f" `{table['write_disposition']}`."
)
if file_path.endswith(".jsonl"):
job_cls = DestinationJsonlLoadJob
Expand Down Expand Up @@ -364,7 +363,9 @@ def prepare_load_table(

def _get_column_def_sql(self, column: TColumnSchema, table_format: TTableFormat = None) -> str:
name = self.capabilities.escape_identifier(column["name"])
column_def_sql = f"{name} {self.type_mapper.to_db_type(column, table_format)} {self._gen_not_null(column.get('nullable', True))}"
column_def_sql = (
f"{name} {self.type_mapper.to_db_type(column, table_format)} {self._gen_not_null(column.get('nullable', True))}"
)
if column.get(ROUND_HALF_EVEN_HINT, False):
column_def_sql += " OPTIONS (rounding_mode='ROUND_HALF_EVEN')"
if column.get(ROUND_HALF_AWAY_FROM_ZERO_HINT, False):
Expand Down
6 changes: 2 additions & 4 deletions dlt/destinations/impl/bigquery/bigquery_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,8 @@ def bigquery_adapter(
if insert_api is not None:
if insert_api == "streaming" and data.write_disposition != "append":
raise ValueError(
(
"BigQuery streaming insert can only be used with `append` write_disposition, while "
f"the given resource has `{data.write_disposition}`."
)
"BigQuery streaming insert can only be used with `append` write_disposition, while "
f"the given resource has `{data.write_disposition}`."
)
additional_table_hints |= {"x-insert-api": insert_api} # type: ignore[operator]

Expand Down
5 changes: 2 additions & 3 deletions dlt/destinations/impl/clickhouse/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@
FILE_FORMAT_TO_TABLE_FUNCTION_MAPPING,
SUPPORTED_FILE_FORMATS,
)
from dlt.destinations.insert_job_client import InsertValuesJobClient
from dlt.destinations.job_client_impl import (
SqlJobClientBase,
SqlJobClientWithStaging,
)
from dlt.destinations.job_impl import NewReferenceJob, EmptyLoadJob
from dlt.destinations.sql_jobs import SqlMergeJob
Expand Down Expand Up @@ -187,7 +187,6 @@ def __init__(
clickhouse_format: str = FILE_FORMAT_TO_TABLE_FUNCTION_MAPPING[file_extension]
# compression = "none" if config.get("data_writer.disable_compression") else "gz"

table_function: str = ""
statement: str = ""

if bucket_scheme in ("s3", "gs", "gcs"):
Expand Down Expand Up @@ -445,7 +444,7 @@ def gen_merge_sql(
return sql


class ClickhouseClient(InsertValuesJobClient, SupportsStagingDestination):
class ClickhouseClient(SqlJobClientWithStaging, SupportsStagingDestination):
capabilities: ClassVar[DestinationCapabilitiesContext] = capabilities()

def __init__(
Expand Down
4 changes: 2 additions & 2 deletions dlt/destinations/job_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class NewReferenceJob(NewLoadJobImpl):
def __init__(
self, file_name: str, status: TLoadJobState, exception: str = None, remote_path: str = None
) -> None:
file_name = f"{os.path.splitext(file_name)[0]}.reference"
file_name = os.path.splitext(file_name)[0] + ".reference"
super().__init__(file_name, status, exception)
self._remote_path = remote_path
self._save_text_file(remote_path)
Expand All @@ -68,7 +68,7 @@ def is_reference_job(file_path: str) -> bool:
@staticmethod
def resolve_reference(file_path: str) -> str:
with open(file_path, "r+", encoding="utf-8") as f:
# Reading from a file.
# Reading from a file
return f.read()


Expand Down

0 comments on commit d6309b3

Please sign in to comment.