Skip to content

Commit

Permalink
Adds resource field reference syntax to template strings
Browse files Browse the repository at this point in the history
  • Loading branch information
burnash committed Jan 14, 2025
1 parent 488bd24 commit 5c1e6b2
Show file tree
Hide file tree
Showing 10 changed files with 770 additions and 258 deletions.
16 changes: 4 additions & 12 deletions dlt/sources/_core_source_templates/rest_api_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,18 +75,10 @@ def github_source(access_token: Optional[str] = dlt.secrets.value) -> Any:
{
"name": "issue_comments",
"endpoint": {
# The placeholder {issue_number} will be resolved
# from the parent resource
"path": "issues/{issue_number}/comments",
"params": {
# The value of `issue_number` will be taken
# from the `number` field in the `issues` resource
"issue_number": {
"type": "resolve",
"resource": "issues",
"field": "number",
}
},
# The placeholder `{resources.issues.number}`
# will be replaced with the value of `number` field
# in the `issues` resource data
"path": "issues/{resources.issues.number}/comments",
},
# Include data from `id` field of the parent resource
# in the child data. The field name in the child data
Expand Down
55 changes: 42 additions & 13 deletions dlt/sources/rest_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
)
from .config_setup import (
IncrementalParam,
InterceptingProxy,
create_auth,
create_paginator,
build_resource_dependency_graph,
Expand Down Expand Up @@ -114,16 +115,18 @@ def rest_api_source(
"base_url": "https://pokeapi.co/api/v2/",
"paginator": "json_link",
},
"endpoints": {
"pokemon": {
"params": {
"limit": 100, # Default page size is 20
"resources": [
{
"name": "pokemon",
"endpoint": {
"path": "pokemon",
"params": {
"limit": 100,
},
},
"resource": {
"primary_key": "id",
}
},
},
"primary_key": "id",
}
]
})
"""
# TODO: this must be removed when TypedDicts are supported by resolve_configuration
Expand Down Expand Up @@ -311,6 +314,25 @@ def paginate_resource(
incremental_cursor_transform,
)

# Interpolate incremental object value into path and params
_incremental: Union[Incremental[Any], InterceptingProxy]
if incremental_cursor_transform:
_incremental = InterceptingProxy(
incremental_object,
incremental_cursor_transform,
{"last_value", "end_value"},
)
else:
_incremental = incremental_object

format_kwargs = {"incremental": _incremental}

path = path.format(**format_kwargs)
params = {
k: v.format(**format_kwargs) if isinstance(v, str) else v
for k, v in params.items()
}

yield from client.paginate(
method=method,
path=path,
Expand Down Expand Up @@ -368,14 +390,20 @@ def paginate_dependent_resource(
)

for item in items:
formatted_path, parent_record = process_parent_data_item(
path, item, resolved_params, include_from_parent
formatted_path, expanded_params, parent_record = process_parent_data_item(
path=path,
item=item,
params=params,
resolved_params=resolved_params,
include_from_parent=include_from_parent,
incremental=incremental_object,
incremental_value_convert=incremental_cursor_transform,
)

for child_page in client.paginate(
method=method,
path=formatted_path,
params=params,
params=expanded_params,
paginator=paginator,
data_selector=data_selector,
hooks=hooks,
Expand Down Expand Up @@ -474,7 +502,8 @@ def identity_func(x: Any) -> Any:

if transform is None:
transform = identity_func
params[incremental_param.start] = transform(incremental_object.last_value)
if incremental_param.start:
params[incremental_param.start] = transform(incremental_object.last_value)
if incremental_param.end:
params[incremental_param.end] = transform(incremental_object.end_value)
return params
Expand Down
Loading

0 comments on commit 5c1e6b2

Please sign in to comment.