sloths.ext.concurrent

Extensions to work with concurrent.futures executors.

Functions

imap_with_executor(→ collections.abc.Iterable[U])

Lazy version of concurrent.futures.Executor.map.

imap_with_executor_as_completed(...)

Lazy version of concurrent.futures.as_completed().

threaded_map(→ collections.abc.Iterable[U])

Threaded element-wise transform.

Module Contents

sloths.ext.concurrent.imap_with_executor(iterable: collections.abc.Iterable[T], fn: collections.abc.Callable[[T], U], *, executor: concurrent.futures.Executor, timeout_seconds: float | None = None, prefetch: int = 128, greedy: bool = False) collections.abc.Iterable[U][source]

Lazy version of concurrent.futures.Executor.map.

Execute an element-wise transform within an execurtor and return values in order.

concurrent.futures.Executor.map consumes the source iterator early which is not desirable when working with lazy iterator. This is a version which does not consume the entire source iterator at once and maintains the order of the source iterator.

Exceptions will interrupt the iteration.

Parameters:
  • iterable – The source iterable

  • fn – The mapper function to apply element-wise.

  • executor – The concurrent.futures.Executor to use.

  • timeout_seconds – The overall timeout for the entire operation.

  • prefetch – Control the number of items to read from the source stream and feed into the executor on startup. Must be 1 or higher.

  • greedy

    Control whether we pull from the source iterable when any task completes or only when the head task completes.

    The latter means this will only consume the source up last completed task (or prefetch if it’s higher) but risk keeping the executor idle if the head task is slower than average. The former will maximise keeping the executor fed but will possibly consume past the head task in case of short circuit.

sloths.ext.concurrent.imap_with_executor_as_completed(iterable: collections.abc.Iterable[T], fn: collections.abc.Callable[[T], U], *, executor: concurrent.futures.Executor, timeout_seconds: float | None = None, prefetch: int = 128) collections.abc.Iterable[U][source]

Lazy version of concurrent.futures.as_completed().

Execute an element-wise transform within an execurtor and return values as they complete.

concurrent.futures.as_completed() consumes the source iterator early which is not desirable when working with lazy iterator. This is a version which does not consume the entire source iterator at once and also yields futures in the order they complete.

Exceptions will interrupt the iteration.

Parameters:
  • iterable – The source iterable

  • fn – The mapper function to apply element-wise.

  • executor – The concurrent.futures.Executor to use.

  • timeout_seconds – The overall timeout for the entire operation.

  • prefetch – Control the number of items to read from the source stream and feed into the executor on startup. Must be 1 or higher.

sloths.ext.concurrent.threaded_map(it: collections.abc.Iterable[T], fn: collections.abc.Callable[[T], U], max_workers: int, timeout_seconds: float | None = None, prefetch: int | None = None, mode: Literal['ordered', 'as_completed'] = 'ordered') collections.abc.Iterable[U][source]

Threaded element-wise transform.

This instantiates a ThreadPoolExecutor behind the scenes.

Parameters:
  • iterable – The source iterable

  • fn – The mapper function to apply element-wise.

  • max_workers – The number of workers to use.

  • timeout_seconds – The overall timeout for the entire operation.

  • prefetch – Control the number of items to read from the source stream and feed into the executor on startup. Must be 1 or higher, defaults to max_workers.

  • ordered

    If True (default) the the results will be returned in the order of the source iterator regardless of their order of execution by the threadpool.

    Warning

    If False then worst case scenario (first element resolves last) will buffer all results in memory which may not be acceptable. If this is likely to happen; consumer can chunk or rate limit their input to create an implicit ceiling to the memory usage.

Usage:

>>> from sloths import Stream
>>> def proc(n):
...     # Do something blocking
...     return n + 1
>>> Stream(range(10)).pipe(threaded_map, proc, max_workers=4).collect()
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]