Skip to content

Commit

Permalink
adds sources at the end of pipe, closes generators before futures so …
Browse files Browse the repository at this point in the history
…wrapped generators are notified
  • Loading branch information
rudolfix committed Jan 26, 2024
1 parent 05d0c55 commit 614b80b
Showing 1 changed file with 29 additions and 33 deletions.
62 changes: 29 additions & 33 deletions dlt/extract/pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,9 +515,9 @@ def __init__(
self._thread_pool: ThreadPoolExecutor = None
self._sources = sources
self._futures: List[FuturePipeItem] = []
self._next_item_mode = next_item_mode
self._next_item_mode: TPipeNextItemMode = next_item_mode
self._initial_sources_count = len(sources)
self._current_source_index: int = -1
self._current_source_index: int = 0

@classmethod
@with_config(spec=PipeIteratorConfiguration)
Expand Down Expand Up @@ -582,6 +582,8 @@ def _fork_pipeline(pipe: Pipe) -> None:
if not any(i.pipe == pipe for i in sources):
sources.append(SourcePipeItem(pipe.gen, 0, pipe, None))

# reverse pipes for current mode, as we start processing from the back
pipes.reverse()
for pipe in pipes:
_fork_pipeline(pipe)

Expand Down Expand Up @@ -614,16 +616,15 @@ def __next__(self) -> PipeItem:
# if item is iterator, then add it as a new source
if isinstance(item, Iterator):
# print(f"adding iterable {item}")
self._sources.insert(
0, SourcePipeItem(item, pipe_item.step, pipe_item.pipe, pipe_item.meta)
self._sources.append(
SourcePipeItem(item, pipe_item.step, pipe_item.pipe, pipe_item.meta)
)
pipe_item = None
continue

# handle async iterator items as new source
if isinstance(item, AsyncIterator):
self._sources.insert(
0,
self._sources.append(
SourcePipeItem(
wrap_async_iterator(item), pipe_item.step, pipe_item.pipe, pipe_item.meta
),
Expand Down Expand Up @@ -701,19 +702,18 @@ def close(self) -> None:
def stop_background_loop(loop: asyncio.AbstractEventLoop) -> None:
loop.stop()

# stop all futures
for f, _, _, _ in self._futures:
if not f.done():
f.cancel()
self._futures.clear()

# close all generators
for gen, _, _, _ in self._sources:
if inspect.isgenerator(gen):
gen.close()
self._sources.clear()

# print("stopping loop")
# stop all futures
for f, _, _, _ in self._futures:
if not f.done():
f.cancel()

# let tasks cancel
if self._async_pool:
# wait for all async generators to be closed
future = asyncio.run_coroutine_threadsafe(
Expand All @@ -730,6 +730,8 @@ def stop_background_loop(loop: asyncio.AbstractEventLoop) -> None:
self._thread_pool.shutdown(wait=True)
self._thread_pool = None

self._futures.clear()

def _ensure_async_pool(self) -> asyncio.AbstractEventLoop:
# lazily create async pool is separate thread
if self._async_pool:
Expand Down Expand Up @@ -814,23 +816,18 @@ def _get_source_item(self) -> ResolvablePipeItem:
# no more sources to iterate
if sources_count == 0:
return None
# while we have more new sources than available future slots, we do strict fifo where we
# only ever check the first source, this is to prevent an uncontrolled number of sources
# being created in certain scenarios
force_strict_fifo = (sources_count - self._initial_sources_count) >= self.max_parallel_items
try:
# always reset to start of list for fifo mode
if self._next_item_mode == "fifo":
self._current_source_index = -1
first_evaluated_index = -1
first_evaluated_index: int = None
# always reset to end of list for fifo mode, also take into account that new sources can be added
# if too many new sources is added we switch to fifo not to exhaust them
if (
self._next_item_mode == "fifo"
or (sources_count - self._initial_sources_count) >= self.max_parallel_items
):
self._current_source_index = sources_count - 1
else:
self._current_source_index = (self._current_source_index - 1) % sources_count
while True:
# in strict fifo mode we never check more than the top most source
if force_strict_fifo:
print("strict")
self._current_source_index = 0
else:
self._current_source_index = (self._current_source_index + 1) % sources_count
print(self._current_source_index)
# if we have checked all sources once and all returned None, then we can sleep a bit
if self._current_source_index == first_evaluated_index:
sleep(self.futures_poll_interval)
Expand All @@ -849,17 +846,16 @@ def _get_source_item(self) -> ResolvablePipeItem:
else:
return ResolvablePipeItem(item, step, pipe, meta)
# remember the first evaluated index
if first_evaluated_index == -1:
if first_evaluated_index is None:
first_evaluated_index = self._current_source_index
# always go round robin if None was returned
self._current_source_index = (self._current_source_index - 1) % sources_count
except StopIteration:
# remove empty iterator and try another source
self._sources.pop(self._current_source_index)
# decrease initial source count if we popped an initial source
if self._current_source_index >= abs(self._initial_sources_count - sources_count):
if self._current_source_index < self._initial_sources_count:
self._initial_sources_count -= 1
# we need to decrease the index to keep the round robin order
if self._next_item_mode == "round_robin":
self._current_source_index -= 1
return self._get_source_item()
except (PipelineException, ExtractorException, DltSourceException, PipeException):
raise
Expand Down

0 comments on commit 614b80b

Please sign in to comment.