sloths.ext.concurrent¶
Extensions to work with concurrent.futures executors.
Functions¶
|
Lazy version of |
Lazy version of |
|
|
Threaded element-wise transform. |
Module Contents¶
- sloths.ext.concurrent.imap_with_executor(iterable: collections.abc.Iterable[T], fn: collections.abc.Callable[[T], U], *, executor: concurrent.futures.Executor, timeout_seconds: float | None = None, prefetch: int = 128, greedy: bool = False) collections.abc.Iterable[U][source]¶
Lazy version of
concurrent.futures.Executor.map.Execute an element-wise transform within an execurtor and return values in order.
concurrent.futures.Executor.mapconsumes the source iterator early which is not desirable when working with lazy iterator. This is a version which does not consume the entire source iterator at once and maintains the order of the source iterator.Exceptions will interrupt the iteration.
- Parameters:
iterable – The source iterable
fn – The mapper function to apply element-wise.
executor – The
concurrent.futures.Executorto use.timeout_seconds – The overall timeout for the entire operation.
prefetch – Control the number of items to read from the source stream and feed into the executor on startup. Must be 1 or higher.
greedy –
Control whether we pull from the source iterable when any task completes or only when the head task completes.
The latter means this will only consume the source up last completed task (or
prefetchif it’s higher) but risk keeping the executor idle if the head task is slower than average. The former will maximise keeping the executor fed but will possibly consume past the head task in case of short circuit.
- sloths.ext.concurrent.imap_with_executor_as_completed(iterable: collections.abc.Iterable[T], fn: collections.abc.Callable[[T], U], *, executor: concurrent.futures.Executor, timeout_seconds: float | None = None, prefetch: int = 128) collections.abc.Iterable[U][source]¶
Lazy version of
concurrent.futures.as_completed().Execute an element-wise transform within an execurtor and return values as they complete.
concurrent.futures.as_completed()consumes the source iterator early which is not desirable when working with lazy iterator. This is a version which does not consume the entire source iterator at once and also yields futures in the order they complete.Exceptions will interrupt the iteration.
- Parameters:
iterable – The source iterable
fn – The mapper function to apply element-wise.
executor – The
concurrent.futures.Executorto use.timeout_seconds – The overall timeout for the entire operation.
prefetch – Control the number of items to read from the source stream and feed into the executor on startup. Must be 1 or higher.
- sloths.ext.concurrent.threaded_map(it: collections.abc.Iterable[T], fn: collections.abc.Callable[[T], U], max_workers: int, timeout_seconds: float | None = None, prefetch: int | None = None, mode: Literal['ordered', 'as_completed'] = 'ordered') collections.abc.Iterable[U][source]¶
Threaded element-wise transform.
This instantiates a
ThreadPoolExecutorbehind the scenes.- Parameters:
iterable – The source iterable
fn – The mapper function to apply element-wise.
max_workers – The number of workers to use.
timeout_seconds – The overall timeout for the entire operation.
prefetch – Control the number of items to read from the source stream and feed into the executor on startup. Must be 1 or higher, defaults to
max_workers.ordered –
If
True(default) the the results will be returned in the order of the source iterator regardless of their order of execution by the threadpool.Warning
If
Falsethen worst case scenario (first element resolves last) will buffer all results in memory which may not be acceptable. If this is likely to happen; consumer can chunk or rate limit their input to create an implicit ceiling to the memory usage.
Usage:
>>> from sloths import Stream >>> def proc(n): ... # Do something blocking ... return n + 1 >>> Stream(range(10)).pipe(threaded_map, proc, max_workers=4).collect() [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]