Skip to content

Commit

Permalink
allows async generator items to be evaluated in add_limit
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix committed Jan 26, 2024
1 parent 614b80b commit fb9c564
Showing 1 changed file with 7 additions and 3 deletions.
10 changes: 7 additions & 3 deletions dlt/extract/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,22 +306,26 @@ def add_limit(self, max_items: int) -> "DltResource": # noqa: A003

def _gen_wrap(gen: TPipeStep) -> TPipeStep:
"""Wrap a generator to take the first `max_items` records"""
nonlocal max_items
count = 0
is_async_gen = False
if inspect.isfunction(gen):
gen = gen()

# wrap async gen already here
if isinstance(gen, AsyncIterator):
gen = wrap_async_iterator(gen)
is_async_gen = True

try:
for i in gen: # type: ignore # TODO: help me fix this later
yield i
if i is not None:
count += 1
if count == max_items:
return
# async gen yields awaitable so we must count one awaitable more
# so the previous one is evaluated and yielded.
# new awaitable will be cancelled
if count == max_items + int(is_async_gen):
return
finally:
if inspect.isgenerator(gen):
gen.close()
Expand Down

0 comments on commit fb9c564

Please sign in to comment.