Skip to content

Commit

Permalink
Only keep non-default config args in factory
Browse files Browse the repository at this point in the history
  • Loading branch information
steinitzu committed Nov 15, 2023
1 parent 0d4ad49 commit 85c2b86
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 9 deletions.
6 changes: 2 additions & 4 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,19 +356,17 @@ class Destination(ABC, Generic[TDestinationConfig, TDestinationClient]):
with credentials and other config params.
"""
config_params: Optional[Dict[str, Any]] = None
initial_config: DestinationClientConfiguration

def __init__(self, **kwargs: Any) -> None:
# Create initial unresolved destination config
# Argument defaults are filtered out here because we only want arguments passed explicitly
# to supersede config from the environment or pipeline args
sig = inspect.signature(self.__class__)
params = sig.parameters
config_args = {
self.config_params = {
k: v for k, v in kwargs.items()
if k not in params or v != params[k].default
}
self.initial_config = self.spec(**config_args)

@property
@abstractmethod
Expand Down Expand Up @@ -443,7 +441,7 @@ def client(self, schema: Schema, initial_config: TDestinationConfig = config.val
cfg = self.spec(
**dict(
initial_config,
**{k: v for k, v in self.initial_config.items() if v is not None}
**{k: v for k, v in self.config_params.items() if v is not None}
)
)
return self.client_class(schema, self.configuration(cfg))
Expand Down
9 changes: 4 additions & 5 deletions dlt/load/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,23 +47,22 @@ def __init__(
self.initial_client_config = initial_client_config
self.initial_staging_client_config = initial_staging_client_config
self.destination = destination
self.capabilities = destination.capabilities
self.capabilities = destination.capabilities()
self.staging_destination = staging_destination
self.pool = NullExecutor()
self.load_storage: LoadStorage = self.create_storage(is_storage_owner)
self._processed_load_ids: Dict[str, str] = {}
"""Load ids to dataset name"""


def create_storage(self, is_storage_owner: bool) -> LoadStorage:
supported_file_formats = self.capabilities().supported_loader_file_formats
supported_file_formats = self.capabilities.supported_loader_file_formats
if self.staging_destination:
supported_file_formats = self.staging_destination.capabilities().supported_loader_file_formats + ["reference"]
if isinstance(self.get_destination_client(Schema("test")), WithStagingDataset):
supported_file_formats += ["sql"]
load_storage = LoadStorage(
is_storage_owner,
self.capabilities().preferred_loader_file_format,
self.capabilities.preferred_loader_file_format,
supported_file_formats,
config=self.config._load_storage_config
)
Expand Down Expand Up @@ -99,7 +98,7 @@ def w_spool_job(self: "Load", file_path: str, load_id: str, schema: Schema) -> O
with (self.get_staging_destination_client(schema) if is_staging_destination_job else job_client) as client:
job_info = self.load_storage.parse_job_file_name(file_path)
if job_info.file_format not in self.load_storage.supported_file_formats:
raise LoadClientUnsupportedFileFormats(job_info.file_format, self.capabilities().supported_loader_file_formats, file_path)
raise LoadClientUnsupportedFileFormats(job_info.file_format, self.capabilities.supported_loader_file_formats, file_path)
logger.info(f"Will load file {file_path} with table name {job_info.table_name}")
table = client.get_load_table(job_info.table_name)
if table["write_disposition"] not in ["append", "replace", "merge"]:
Expand Down

0 comments on commit 85c2b86

Please sign in to comment.