Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: document that parallelized=True resources with add_limit(x) usually yield x-1 #2142

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dlt/extract/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ def add_limit(self: TDltResourceImpl, max_items: int) -> TDltResourceImpl: # no
1. Transformers won't be limited. They should process all the data they receive fully to avoid inconsistencies in generated datasets.
2. Each yielded item may contain several records. `add_limit` only limits the "number of yields", not the total number of records.
3. Async resources with a limit added may occasionally produce one item more than the limit on some runs. This behavior is not deterministic.
4. Parallelized sync resources with a limit added will usually produce one item less than the limit. This behavior is not deterministic.

Args:
max_items (int): The maximum number of items to yield
Expand Down
13 changes: 13 additions & 0 deletions tests/pipeline/test_resources_evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,19 @@ async def async_resource1():
assert len(result) == 13


@pytest.mark.parametrize("parallelized", [False, True])
def test_limit_sync_resource(parallelized: bool) -> None:
@dlt.resource(parallelized=parallelized)
def sync_resource1():
for i in range(1, 10):
yield i

limit = 5
result = list(sync_resource1().add_limit(limit))
allowed_result_range = range(limit - int(parallelized), limit + 1)
Copy link
Contributor Author

@joscha joscha Dec 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not ideal.
Ideal would be to make the parallel yields exact. I tried this for a few hours to no avail. Next best would be to run this test x times when parallelized=True and then ensure that the threshold of yields lies below 4.5.

assert len(result) in allowed_result_range


@pytest.mark.parametrize("parallelized", [True, False])
def test_parallelized_resource(parallelized: bool) -> None:
os.environ["EXTRACT__NEXT_ITEM_MODE"] = "fifo"
Expand Down
Loading