sloths¶
Lazy generator pipelines in Python.
Submodules¶
Classes¶
Package Contents¶
- class sloths.Peekable(source: collections.abc.Iterator[T])[source]¶
Bases:
Stream[T]A
Streamwith apeek()method.Warning
This may have a memory impact as it will buffer elements up to the furthest index peeked at.
- peek(n: int) T[source]¶
- peek(n: int, *, default: T) T
Return the element n positions ahead without consuming the stream.
>>> s = Stream.range(10).peekable() >>> s.peek() 0 >>> next(s) 0
>>> s.peek(4) 4 >>> s.peek(2) 2 >>> next(s) 1
The
Peekableinstance is a regular stream so you can chain calls:>>> s.take(5).collect() [2, 3, 4, 5, 6]
Peeking past the stream raises
IndexError:>>> s.peek(20) Traceback (most recent call last): ... IndexError: 20
Which can be avoided with a default value:
>>> s.peek(20, default=None) is None True
- class sloths.Stream(source: collections.abc.Iterable[T])[source]¶
Bases:
Generic[T],collections.abc.Iterable[T]Typed interface to build lazy generator/coroutines pipelines.
This technically works with any iterable but is primarily built to compose lazy-generator pipelines into a single iterator. When used with generators this provides good memory and throughput controls.
None of this can’t be achieved either by colocating everything in a single loop or composing generators outside-in by hand. This is a fairly light abstraction with almost no runtime cost and is provided mostly for ergonomics. The core benefits are:
flat-definition of the pipeline
stages defined in reading order instead of reverse order
type erasure and safety
composability
The simplest stream just wraps and consumes an iterable:
>>> s = Stream.range(10) >>> list(s) # This will consume the iterator [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
But it becomes really useful when composing transformations. Taking a trivial example of outside-in composition:
>>> def add_2(gen): ... for x in gen: ... yield x + 2 ... >>> def drop_multiples_of_3(gen): ... for x in gen: ... if x % 3 > 0: ... yield x ... >>> gen = drop_multiples_of_3( ... add_2( ... range(10), ... ), ... ) >>> list(gen) [2, 4, 5, 7, 8, 10, 11]
The equivalent form with
Streamis:>>> stream = ( ... Stream.range(10) ... .pipe(add_2) ... .pipe(drop_multiples_of_3) ... ) >>> list(stream) [2, 4, 5, 7, 8, 10, 11]
Streams also provide a chainable API and convenience methods (largely inspired by Rust’s iterator trait) to make it easy to compose readable pipelines without nesting.
Streams are also lazy as long as the transforms are well implemented (i.e. they don’t consume the entire source iterable in memory) and the pipeline will run from the last transform, polling up the stack as needed.
For a simple example:
>>> source = iter(range(100_000_000_000)) # Problematically large >>> ( ... Stream(source) ... .pipe(add_2) ... .batch(10) ... .flatten() ... .pipe(drop_multiples_of_3) ... .inspect(print) ... .take(20) ... .fold(lambda x,y: x+y, 0) ... ) 2 4 5 7 8 10 11 13 14 16 17 19 20 22 23 25 26 28 29 31 330
We can see that we haven’t consumed too far into the source iterable:
>>> next(source) 30
The print calls in the last example also illustrate the laziness of the streams. The final iterators polls from the last step which essentially polls up the stack until any iterable yields data. So in the example above there’s only ever 10 integers passing through the pipeline at any given time. This is primarily useful with lazy generators in order to control peak memory usage.
Warning
Streams are just chained generators and don’t provide any concurrency primitives (threads or async). Everything is executing linearly and behind the GIL. However nothing prevents a transform from using threads, processes or asyncio behind the scene.
- chain(*others: collections.abc.Iterable[T]) Stream[T][source]¶
Chain one or more iterables to the current ones.
Works with other streams:
>>> Stream.range(10).chain( ... Stream.range(5).map(lambda x: x + 20) ... ).collect() [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 20, 21, 22, 23, 24]
And simple iterables:
>>> Stream.range(2).chain(range(3), range(2)).collect() [0, 1, 0, 1, 2, 0, 1]
- pipe(fn: Transform[T, P, U], *args: P, **kwargs: P) Pipe[T, P, U][source]¶
Chain a transform to a stream and return the resulting stream.
Transforms are the core composability primitive and are simply callables which take an iterable and return another iterable. Usually these are lazy generators.
>>> def to_str(iterable: Iterable[int]) -> Iterable[str]: ... for x in iterable: ... yield str(x) ... >>> list(Stream.range(10).pipe(to_str)) ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
Note
Type information of the source stream is preserved, so in the example above the first layer (
Stream.range(10)) is aStream[int, int]while the final stream isStream[int, str]which is also anIterable[str].Transforms can also decide to short-circuit or selectively yield for control-flow:
>>> def to_str_if_odd(iterable: Iterable[int]) -> Iterable[str]: ... for x in iterable: ... if x % 2: ... yield str(x) ... >>> list(Stream.range(10).pipe(to_str_if_odd)) ['1', '3', '5', '7', '9']
As transforms are just generator-factories they can hold state:
>>> def track_bounds(gen: Iterable[int]) -> Iterable[int]: ... m, M = 0, 0 ... for x in gen: ... m, M = min(m, x), max(M, x) ... yield x ... print(f'Min {m}, Max {M}') >>> s = Stream.range(10).pipe(track_bounds) >>> list(s) Min 0, Max 9 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
The flip-side of this being that streams are generally not safe to reuse once iterated upon.
Warning
When writing transforms be careful not to accidentally consume the iterable as this would negate much of the benefit of chaining generators in the first place.
- inspect(cb: collections.abc.Callable[[T], Any]) Stream[T][source]¶
Execute a function on each element without modifying it.
This is mostly useful for debugging but could be used as the base for monitoring and metrics or any other side-effects.
>>> Stream.range(4).inspect(print).collect() 0 1 2 3 [0, 1, 2, 3]
- enumerate() Stream[tuple[int, T]][source]¶
Python’s
enumerateas a transform.>>> Stream.range(5, 11).enumerate().collect() [(0, 5), (1, 6), (2, 7), (3, 8), (4, 9), (5, 10)]
- map(fn: collections.abc.Callable[[T], U]) Stream[U][source]¶
Run an element-wise transform over the stream.
>>> Stream.range(10).map(lambda x: x * 2).collect() [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
- try_map(fn: collections.abc.Callable[[T], U], exc_cls: tuple[type[Exception], Ellipsis] = (Exception,), *, cb: collections.abc.Callable[[Exception, T], None] | None = None) Stream[U][source]¶
Run an element-wise transform over the stream and discard errors.
>>> def no_2(x): ... if x == 2: ... raise ValueError(2) ... return x
>>> list(Stream.range(10).map(no_2)) Traceback (most recent call last): ... ValueError: 2
>>> list(Stream.range(10).try_map(no_2, (ValueError,))) [0, 1, 3, 4, 5, 6, 7, 8, 9]
Optionally you can pass in a callback to handle errors out of band:
>>> list(Stream.range(10).try_map(no_2, (ValueError,), cb=print)) 2 2 [0, 1, 3, 4, 5, 6, 7, 8, 9]
- try_(exc_cls: tuple[type[Exception], Ellipsis] = (Exception,), *, cb: collections.abc.Callable[[Exception], None] | None = None) Stream[T][source]¶
Stop on the first exception and discard it.
This is more generic than
try_map()and will catch error that happened when callingnext()on the upstream transform but will stop iteration on the first exception.>>> def no_2(x): ... if x == 2: ... raise ValueError(2) ... return x
>>> Stream.range(10).map(no_2).collect() Traceback (most recent call last): ... ValueError: 2
>>> Stream.range(10).map(no_2).try_((ValueError,)).collect() [0, 1]
Optionally you can pass in a callback to handle errors out of band:
>>> list(Stream.range(10).map(no_2).try_((ValueError,), cb=print)) 2 [0, 1]
If there are no errors it flows to the end normally:
>>> Stream.range(10).map(lambda x: x + 2).try_((ValueError,), cb=print) .collect() [2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
- batch(by: int) Stream[collections.abc.Iterable[T]][source]¶
Buffer the stream and provide groups to downstream consumers.
Warning
This partially unwinds the stream and will increase memory usage. Only buffer to amounts you’re comfortable holding in memory at once.
>>> Stream.range(11).batch(by=2).collect() [(0, 1), (2, 3), (4, 5), (6, 7), (8, 9), (10,)]
To simply buffer without exposing groups simply chain this with
flatten()which will ensure at least by elements are ready before forwarding them downstream one by one:>>> list(Stream.range(11).batch(by=2).flatten()) [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Batches may not have the number of elements if the end of the stream doesn’t have enough to fill a batch:
>>> list(Stream.range(11).batch(by=3)) [(0, 1, 2), (3, 4, 5), (6, 7, 8), (9, 10)]
- flatten() Stream[U][source]¶
Flatten iterators into their elements.
This is usually most useful after a buffered operation.
>>> Stream.range(11).batch(by=2).flatten().collect() [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
See also
- flat_map(fn: collections.abc.Callable[[T], collections.abc.Iterable[U]]) Stream[U][source]¶
Run an element-wise transform over the stream and flatten results.
>>> Stream.range(10).flat_map(lambda x: [x] * 2).collect() [0, 0, 1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 7, 8, 8, 9, 9]
- filter(predicate: collections.abc.Callable[[T], bool] | None = None) Stream[T][source]¶
Filter elements by running them through a predicate function.
This supports passing no predicate ion which case it checks for truthy values:
>>> Stream([1, 2, None, 0, 4]).filter().collect() [1, 2, 4]
>>> Stream.range(10).filter(lambda x: bool(x % 2)).collect() [1, 3, 5, 7, 9]
- take(count: int) Stream[T][source]¶
Take up to
countelement from the stream and interrupt.Upstream generators will not be polled once we’ve reached the requested number of elements so the source can be consumed to its end separately.
>>> it = iter(range(10)) >>> Stream(it).take(4).collect() [0, 1, 2, 3] >>> list(it) [4, 5, 6, 7, 8, 9]
Taking more than the size in the iterator has no effect:
>>> Stream.range(5).take(10).collect() [0, 1, 2, 3, 4]
- skip(count: int) Stream[T][source]¶
Skip over
countelement from the iterator.>>> list(Stream.range(10).skip(4)) [4, 5, 6, 7, 8, 9]
- take_while(predicate: collections.abc.Callable[[T], bool] | None = None) Stream[T][source]¶
Consume element from the stream until the predicate returns
False.>>> it = iter(range(10)) >>> list(Stream(it).take_while(lambda x: x == 0 or x % 3 != 0)) [0, 1, 2]
Note that the first failing element of the iterator is consumed:
>>> list(it) [4, 5, 6, 7, 8, 9]
Passing no predicate is also supported:
>>> list(Stream([1, 2, 0, 3]).take_while()) [1, 2]
- skip_while(predicate: collections.abc.Callable[[T], bool] | None = None) Stream[T][source]¶
Skip elements until the predicate returns
True.>>> Stream.range(10).skip_while(lambda x: x == 0 or x % 3 != 0) .collect() [3, 4, 5, 6, 7, 8, 9]
Passing no predicate is also supported:
>>> list(Stream([1, 2, 0, None, 1, 2, 0, 3]).skip_while()) [0, None, 1, 2, 0, 3]
- step_by(step: int) Stream[T][source]¶
Consume iterators by a given step size each iteration.
This consumes elements after their predecessor has been consumed.
>>> Stream.range(10).step_by(2).collect() [0, 2, 4, 6, 8]
- window(size: int) Stream[tuple[T, Ellipsis]][source]¶
Transform the stream into a stream of sliding windows.
Each window is a tuple containing
sizeconsecutive elements from the stream. The windows overlap, with each window shifted one element forward from the previous window.If the stream contains fewer elements than the window size, an empty stream is returned.
>>> Stream.range(5).window(3).collect() [(0, 1, 2), (1, 2, 3), (2, 3, 4)]
>>> Stream([1, 2]).window(3).collect() []
>>> Stream([]).window(2).collect() []
- peekable() Peekable[T][source]¶
Return a
Peekableversion of the current stream.>>> s = Stream.range(100).peekable() >>> s.peek() 0
- to_async() sloths.ext.asyncio.AsyncStream[T][source]¶
Return a
sloths.ext.asyncio.AsyncStreamversion.
- consume()[source]¶
Consume the stream but discard the results.
This is useful for infinite pipelines or processing pipelines where the results are not important.
- collect() list[T][source]¶
- collect(collector: collections.abc.Callable[[collections.abc.Iterable[T]], U]) U
Collect the iterator.
By default this collects into a list, so this:
>>> list(Stream.range(10)) [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Is equivalent to:
>>> Stream.range(10).collect() [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Custom collectors are also supported:
>>> Stream.range(10).map(lambda x: x // 2).collect(set) {0, 1, 2, 3, 4}
- count() int[source]¶
Return the length of the stream after consuming it.
__len__would implicitly consume the stream in various places so is unsafe to add.>>> Stream.range(100).count() 100
- nth(nth: int) T[source]¶
- nth(nth: int, *, default: T) T
- nth(nth: int, *, default: U) T | U
Return the
nthvalue.>>> Stream.range(10).nth(0) 0
>>> Stream.range(10).nth(6) 6
Raises
IndexErrorif the stream is too short:>>> Stream.range(10).nth(12) Traceback (most recent call last): ... IndexError: 12
A
defaultcan be provided as a fallback:>>> Stream.range(10).nth(12, default=42) 42
This short-cirtcuits so it won’t consume the source iterator past the target element:
>>> source = iter(range(10)) >>> Stream(source).nth(3) 3 >>> list(source) [4, 5, 6, 7, 8, 9]
- find(predicate: collections.abc.Callable[[T], bool] | None = None) T | None[source]¶
Find the first elements that satisfies a predicate.
>>> Stream.range(10).find(lambda x: x == 3) 3
This short-cirtcuits so it won’t consume the source iterator past the target element:
>>> source = iter(range(10)) >>> Stream(source).find(lambda x: x == 3) 3 >>> list(source) [4, 5, 6, 7, 8, 9]
Returns
Noneif the item is not found:>>> source = iter(range(10)) >>> Stream(source).find(lambda x: x == 102) >>> list(source) []