diff --git a/dlt/cli/config_toml_writer.py b/dlt/cli/config_toml_writer.py index 97a23fa0ef..8cf831d725 100644 --- a/dlt/cli/config_toml_writer.py +++ b/dlt/cli/config_toml_writer.py @@ -36,9 +36,10 @@ def generate_typed_example(name: str, hint: AnyType) -> Any: if sc_type == "complex": if issubclass(inner_hint, C_Sequence): return ["a", "b", "c"] - table = tomlkit.table(False) - table["key"] = "value" - return table + else: + table = tomlkit.table(False) + table["key"] = "value" + return table if sc_type == "timestamp": return pendulum.now().to_iso8601_string() if sc_type == "date": @@ -73,14 +74,15 @@ def write_value( write_spec(inner_table, hint(), overwrite_existing) if len(inner_table) > 0: toml_table[name] = inner_table - elif default_value is None: - example_value = generate_typed_example(name, hint) - toml_table[name] = example_value - # tomlkit not supporting comments on boolean - if not isinstance(example_value, bool): - toml_table[name].comment("please set me up!") else: - toml_table[name] = default_value + if default_value is None: + example_value = generate_typed_example(name, hint) + toml_table[name] = example_value + # tomlkit not supporting comments on boolean + if not isinstance(example_value, bool): + toml_table[name].comment("please set me up!") + else: + toml_table[name] = default_value def write_spec(toml_table: TOMLTable, config: BaseConfiguration, overwrite_existing: bool) -> None: diff --git a/dlt/common/configuration/specs/connection_string_credentials.py b/dlt/common/configuration/specs/connection_string_credentials.py index 21e635a07c..2691c5d886 100644 --- a/dlt/common/configuration/specs/connection_string_credentials.py +++ b/dlt/common/configuration/specs/connection_string_credentials.py @@ -29,13 +29,13 @@ def parse_native_representation(self, native_value: Any) -> None: raise InvalidConnectionString(self.__class__, native_value, self.drivername) try: url = make_url(native_value) - # Update only values that are not None. + # update only values that are not None self.update({k: v for k, v in url._asdict().items() if v is not None}) if self.query is not None: - # Query may be immutable so make it mutable. + # query may be immutable so make it mutable self.query = dict(self.query) - except Exception as e: - raise InvalidConnectionString(self.__class__, native_value, self.drivername) from e + except Exception: + raise InvalidConnectionString(self.__class__, native_value, self.drivername) def on_resolved(self) -> None: if self.password: diff --git a/dlt/common/libs/sql_alchemy.py b/dlt/common/libs/sql_alchemy.py index a8797d1cb5..2f3b51ec0d 100644 --- a/dlt/common/libs/sql_alchemy.py +++ b/dlt/common/libs/sql_alchemy.py @@ -117,18 +117,21 @@ def _assert_port(cls, port: Optional[int]) -> Optional[int]: return None try: return int(port) - except TypeError as e: - raise TypeError("Port argument must be an integer or None") from e + except TypeError: + raise TypeError("Port argument must be an integer or None") @classmethod def _assert_str(cls, v: str, paramname: str) -> str: if not isinstance(v, str): - raise TypeError(f"{paramname} must be a string") + raise TypeError("%s must be a string" % paramname) return v @classmethod def _assert_none_str(cls, v: Optional[str], paramname: str) -> Optional[str]: - return v if v is None else cls._assert_str(v, paramname) + if v is None: + return v + + return cls._assert_str(v, paramname) @classmethod def _str_dict( @@ -251,14 +254,14 @@ def update_query_pairs( new_query: Mapping[str, Union[str, Sequence[str]]] if append: - new_query = { - k: ( - tuple(to_list(existing_query[k]) + to_list(new_keys[k])) - if k in existing_query - else new_keys[k] - ) - for k in new_keys - } + new_query = {} + + for k in new_keys: + if k in existing_query: + new_query[k] = tuple(to_list(existing_query[k]) + to_list(new_keys[k])) + else: + new_query[k] = new_keys[k] + new_query.update( {k: existing_query[k] for k in set(existing_query).difference(new_keys)} ) @@ -280,7 +283,7 @@ def update_query_dict( def render_as_string(self, hide_password: bool = True) -> str: """Render this `URL` object as a string.""" - s = f"{self.drivername}://" + s = self.drivername + "://" if self.username is not None: s += quote(self.username, safe=" +") if self.password is not None: diff --git a/dlt/destinations/impl/bigquery/bigquery.py b/dlt/destinations/impl/bigquery/bigquery.py index 279917d3a0..b2e53f9734 100644 --- a/dlt/destinations/impl/bigquery/bigquery.py +++ b/dlt/destinations/impl/bigquery/bigquery.py @@ -232,10 +232,9 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> if insert_api == "streaming": if table["write_disposition"] != "append": raise DestinationTerminalException( - ( - "BigQuery streaming insert can only be used with `append` write_disposition, while " - f'the given resource has `{table["write_disposition"]}`.' - ) + "BigQuery streaming insert can only be used with `append`" + " write_disposition, while the given resource has" + f" `{table['write_disposition']}`." ) if file_path.endswith(".jsonl"): job_cls = DestinationJsonlLoadJob @@ -364,7 +363,9 @@ def prepare_load_table( def _get_column_def_sql(self, column: TColumnSchema, table_format: TTableFormat = None) -> str: name = self.capabilities.escape_identifier(column["name"]) - column_def_sql = f"{name} {self.type_mapper.to_db_type(column, table_format)} {self._gen_not_null(column.get('nullable', True))}" + column_def_sql = ( + f"{name} {self.type_mapper.to_db_type(column, table_format)} {self._gen_not_null(column.get('nullable', True))}" + ) if column.get(ROUND_HALF_EVEN_HINT, False): column_def_sql += " OPTIONS (rounding_mode='ROUND_HALF_EVEN')" if column.get(ROUND_HALF_AWAY_FROM_ZERO_HINT, False): diff --git a/dlt/destinations/impl/bigquery/bigquery_adapter.py b/dlt/destinations/impl/bigquery/bigquery_adapter.py index 8943b0da79..6b3ef32b0f 100644 --- a/dlt/destinations/impl/bigquery/bigquery_adapter.py +++ b/dlt/destinations/impl/bigquery/bigquery_adapter.py @@ -153,10 +153,8 @@ def bigquery_adapter( if insert_api is not None: if insert_api == "streaming" and data.write_disposition != "append": raise ValueError( - ( - "BigQuery streaming insert can only be used with `append` write_disposition, while " - f"the given resource has `{data.write_disposition}`." - ) + "BigQuery streaming insert can only be used with `append` write_disposition, while " + f"the given resource has `{data.write_disposition}`." ) additional_table_hints |= {"x-insert-api": insert_api} # type: ignore[operator] diff --git a/dlt/destinations/impl/clickhouse/clickhouse.py b/dlt/destinations/impl/clickhouse/clickhouse.py index 362e26a4d1..ca9dc49e66 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse.py +++ b/dlt/destinations/impl/clickhouse/clickhouse.py @@ -53,9 +53,9 @@ FILE_FORMAT_TO_TABLE_FUNCTION_MAPPING, SUPPORTED_FILE_FORMATS, ) -from dlt.destinations.insert_job_client import InsertValuesJobClient from dlt.destinations.job_client_impl import ( SqlJobClientBase, + SqlJobClientWithStaging, ) from dlt.destinations.job_impl import NewReferenceJob, EmptyLoadJob from dlt.destinations.sql_jobs import SqlMergeJob @@ -187,7 +187,6 @@ def __init__( clickhouse_format: str = FILE_FORMAT_TO_TABLE_FUNCTION_MAPPING[file_extension] # compression = "none" if config.get("data_writer.disable_compression") else "gz" - table_function: str = "" statement: str = "" if bucket_scheme in ("s3", "gs", "gcs"): @@ -445,7 +444,7 @@ def gen_merge_sql( return sql -class ClickhouseClient(InsertValuesJobClient, SupportsStagingDestination): +class ClickhouseClient(SqlJobClientWithStaging, SupportsStagingDestination): capabilities: ClassVar[DestinationCapabilitiesContext] = capabilities() def __init__( diff --git a/dlt/destinations/job_impl.py b/dlt/destinations/job_impl.py index b5238ea4ba..8e017fc791 100644 --- a/dlt/destinations/job_impl.py +++ b/dlt/destinations/job_impl.py @@ -56,7 +56,7 @@ class NewReferenceJob(NewLoadJobImpl): def __init__( self, file_name: str, status: TLoadJobState, exception: str = None, remote_path: str = None ) -> None: - file_name = f"{os.path.splitext(file_name)[0]}.reference" + file_name = os.path.splitext(file_name)[0] + ".reference" super().__init__(file_name, status, exception) self._remote_path = remote_path self._save_text_file(remote_path) @@ -68,7 +68,7 @@ def is_reference_job(file_path: str) -> bool: @staticmethod def resolve_reference(file_path: str) -> str: with open(file_path, "r+", encoding="utf-8") as f: - # Reading from a file. + # Reading from a file return f.read()