Source code for sloths.ext.concurrent

"""
Extensions to work with :py:mod:`concurrent.futures` executors.
"""

from __future__ import annotations

import contextlib
from collections import deque
from concurrent.futures.thread import ThreadPoolExecutor
from itertools import islice
from time import perf_counter
from typing import TYPE_CHECKING, Literal, ParamSpec, TypeVar

if TYPE_CHECKING:
    from collections.abc import Callable, Iterable
    from concurrent.futures import Executor, Future


P = ParamSpec("P")
T = TypeVar("T")
U = TypeVar("U")


if TYPE_CHECKING:
    from collections.abc import Iterable


[docs] def imap_with_executor( iterable: Iterable[T], fn: Callable[[T], U], *, executor: Executor, timeout_seconds: float | None = None, prefetch: int = 128, greedy: bool = False, ) -> Iterable[U]: """ Lazy version of :py:class:`concurrent.futures.Executor.map`. Execute an element-wise transform within an execurtor and return values in order. :py:class:`concurrent.futures.Executor.map` consumes the source iterator early which is not desirable when working with lazy iterator. This is a version which does not consume the entire source iterator at once and maintains the order of the source iterator. Exceptions will interrupt the iteration. :param iterable: The source iterable :param fn: The mapper function to apply element-wise. :param executor: The :py:class:`concurrent.futures.Executor` to use. :param timeout_seconds: The overall timeout for the entire operation. :param prefetch: Control the number of items to read from the source stream and feed into the executor on startup. Must be 1 or higher. :param greedy: Control whether we pull from the source iterable when any task completes or only when the head task completes. The latter means this will only consume the source up last completed task (or ``prefetch`` if it's higher) but risk keeping the executor idle if the head task is slower than average. The former will maximise keeping the executor fed but will possibly consume past the head task in case of short circuit. """ from concurrent.futures import wait if timeout_seconds is not None: if timeout_seconds < 0: msg = "`timeout_seconds` must be at least 0" raise ValueError(msg) end_time = timeout_seconds + perf_counter() else: end_time = None if prefetch < 1: msg = "`prefetch` must be at least 1" raise ValueError(msg) args = iter(iterable) futures = deque(executor.submit(fn, x) for x in islice(args, prefetch)) try: while True: if futures: if greedy: # To keep the executor fed as much as possible at the cost # of possibly consuming the source values while we may not # need the results. wait_timeout = ( end_time - perf_counter() if end_time is not None else None ) done, _ = wait( [*futures], timeout=wait_timeout, return_when="FIRST_COMPLETED", ) with contextlib.suppress(StopIteration): for x in islice(args, len(done)): futures.append(executor.submit(fn, x)) wait_timeout = ( end_time - perf_counter() if end_time is not None else None ) yield futures.popleft().result(timeout=wait_timeout) if not greedy: with contextlib.suppress(StopIteration): # This may not be optimal in term of keeping the # executor fed. We're only consuming from the source # iterator one by one as we complete in order, so in the # worse case where the first future resolves last we'd # only feed the prefetch number into the executor and # then wait. futures.append(executor.submit(fn, next(args))) else: return finally: for f in futures: f.cancel()
[docs] def imap_with_executor_as_completed( iterable: Iterable[T], fn: Callable[[T], U], *, executor: Executor, timeout_seconds: float | None = None, prefetch: int = 128, ) -> Iterable[U]: """ Lazy version of :py:func:`concurrent.futures.as_completed`. Execute an element-wise transform within an execurtor and return values as they complete. :py:func:`concurrent.futures.as_completed` consumes the source iterator early which is not desirable when working with lazy iterator. This is a version which does not consume the entire source iterator at once and also yields futures in the order they complete. Exceptions will interrupt the iteration. :param iterable: The source iterable :param fn: The mapper function to apply element-wise. :param executor: The :py:class:`concurrent.futures.Executor` to use. :param timeout_seconds: The overall timeout for the entire operation. :param prefetch: Control the number of items to read from the source stream and feed into the executor on startup. Must be 1 or higher. """ from concurrent.futures import wait if timeout_seconds is not None: if timeout_seconds < 0: msg = "`timeout_seconds` must be at least 0" raise ValueError(msg) end_time = timeout_seconds + perf_counter() else: end_time = None if prefetch < 1: msg = "`prefetch` must be at least 1" raise ValueError(msg) args = iter(iterable) pending: set[Future[U]] = set() finished: list[Future[U]] = [] def _submit(x: T) -> None: f = executor.submit(fn, x) pending.add(f) f.add_done_callback(finished.append) for x in islice(args, prefetch): _submit(x) try: while True: wait_timeout = ( end_time - perf_counter() if end_time is not None else None ) _, pending = wait( pending, timeout=wait_timeout, return_when="FIRST_COMPLETED", ) done = [*finished] finished.clear() with contextlib.suppress(StopIteration): for x in islice(args, len(done)): _submit(x) for x in done: yield x.result() if not pending: return finally: for f in pending: f.cancel()
[docs] def threaded_map( it: Iterable[T], fn: Callable[[T], U], max_workers: int, timeout_seconds: float | None = None, prefetch: int | None = None, mode: Literal["ordered", "as_completed"] = "ordered", ) -> Iterable[U]: """ Threaded element-wise transform. This instantiates a :py:class:`~concurrent.futures.ThreadPoolExecutor` behind the scenes. :param iterable: The source iterable :param fn: The mapper function to apply element-wise. :param max_workers: The number of workers to use. :param timeout_seconds: The overall timeout for the entire operation. :param prefetch: Control the number of items to read from the source stream and feed into the executor on startup. Must be 1 or higher, defaults to ``max_workers``. :param ordered: If ``True`` (default) the the results will be returned in the order of the source iterator regardless of their order of execution by the threadpool. .. warning:: If ``False`` then worst case scenario (first element resolves last) will buffer all results in memory which may not be acceptable. If this is likely to happen; consumer can chunk or rate limit their input to create an implicit ceiling to the memory usage. Usage: >>> from sloths import Stream >>> def proc(n): ... # Do something blocking ... return n + 1 >>> Stream(range(10)).pipe(threaded_map, proc, max_workers=4).collect() [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] """ with ThreadPoolExecutor(max_workers=max_workers) as e: if mode == "ordered": yield from imap_with_executor( it, fn, timeout_seconds=timeout_seconds, executor=e, prefetch=prefetch or max_workers, ) else: yield from imap_with_executor_as_completed( it, fn, timeout_seconds=timeout_seconds, executor=e, prefetch=prefetch or max_workers, )