diff --git a/dlt/extract/resource.py b/dlt/extract/resource.py index e0fa40bc21..ac7339ec7c 100644 --- a/dlt/extract/resource.py +++ b/dlt/extract/resource.py @@ -306,22 +306,26 @@ def add_limit(self, max_items: int) -> "DltResource": # noqa: A003 def _gen_wrap(gen: TPipeStep) -> TPipeStep: """Wrap a generator to take the first `max_items` records""" - nonlocal max_items count = 0 + is_async_gen = False if inspect.isfunction(gen): gen = gen() # wrap async gen already here if isinstance(gen, AsyncIterator): gen = wrap_async_iterator(gen) + is_async_gen = True try: for i in gen: # type: ignore # TODO: help me fix this later yield i if i is not None: count += 1 - if count == max_items: - return + # async gen yields awaitable so we must count one awaitable more + # so the previous one is evaluated and yielded. + # new awaitable will be cancelled + if count == max_items + int(is_async_gen): + return finally: if inspect.isgenerator(gen): gen.close()