sloths.ext.asyncio

asyncio native stream class.

Classes

AsyncStream

Async version of sloths.Stream but async iterators.

AsyncPipe

A stream representing a source stream passed through a transform function.

Functions

make_async(→ collections.abc.AsyncIterable[T])

Wrap a synchronous iterator in an asynchronous one.

Module Contents

class sloths.ext.asyncio.AsyncStream(source: collections.abc.AsyncIterable[T])[source]

Bases: Generic[T], collections.abc.AsyncIterable[T]

Async version of sloths.Stream but async iterators.

It works essentially the same and expose the same interface but in an async/await compatible manner.

Some functions which take callbacks such as map() also have prefixed async equivalent amap() which take an async callback instead.

classmethod range(*args: SupportsIndex) AsyncStream[int][source]

Create a simple async stream over range().

chain(*others: collections.abc.AsyncIterable[T]) AsyncStream[T][source]

Chain one or more async iterables to the current ones.

pipe(fn: AsyncTransform[T, P, U], *args: P, **kwargs: P) AsyncPipe[T, P, U][source]

Chain a transform to a stream and return the resulting stream.

Transforms are the core composability primitive and are simply callables which take an iterable and return another iterable. Usually these are lazy generators.

>>> import asyncio
>>> async def to_str(iterable: AsyncIterable[int]) ->             AsyncIterable[str]:
...     async for x in iterable:
...         yield str(x)
...
>>> asyncio.run(AsyncStream.range(10).pipe(to_str).collect())
['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']

Transforms can also decide to short-circuit or selectively yield for control-flow:

>>> async def to_str_if_odd(iterable: AsyncIterable[int]) ->             AsyncIterable[str]:
...     async for x in iterable:
...         if x % 2:
...             yield str(x)
...
>>> asyncio.run(AsyncStream.range(10).pipe(to_str_if_odd).collect())
['1', '3', '5', '7', '9']

And all the same properties as the sync version.

Warning

When writing transforms be careful not to accidentally consume the iterable as this would negate much of the benefit of chaining generators in the first place.

inspect(cb: collections.abc.Callable[[T], Any]) AsyncStream[T][source]

Execute a function on each element without modifying it.

This is mostly useful for debugging but could be used as the base for monitoring and metrics or any other side-effects.

enumerate() AsyncStream[tuple[int, T]][source]

Python’s enumerate as a transform.

map(fn: collections.abc.Callable[[T], U]) AsyncStream[U][source]

Run a synchronous element-wise transform over the stream.

try_map(fn: collections.abc.Callable[[T], U], exc_cls: tuple[type[Exception], Ellipsis] = (Exception,), *, cb: collections.abc.Callable[[Exception, T], None] | None = None) AsyncStream[U][source]

Run a synchronous element-wise transform over the stream and discard errors.

amap(fn: collections.abc.Callable[[T], collections.abc.Awaitable[U]]) AsyncStream[U][source]

Run an asynchronous element-wise transform over the stream.

This is equivalent to AsyncStream(...).map(...).flatten().

atry_map(fn: collections.abc.Callable[[T], collections.abc.Awaitable[U]], exc_cls: tuple[type[Exception], Ellipsis] = (Exception,), *, cb: collections.abc.Callable[[Exception, T], None] | None = None) AsyncStream[U][source]

Run an asynchronous element-wise transform over the stream discard errors.

try_(exc_cls: tuple[type[Exception], Ellipsis] = (Exception,), *, cb: collections.abc.Callable[[Exception], None] | None = None) AsyncStream[T][source]

Stop on the first exception and discard it.

This is more generic than try_map() and will catch error that happened when calling next() on the upstream transform but will stop iteration on the first exception.

batch(by: int) AsyncStream[tuple[T, Ellipsis]][source]

Buffer the stream and provide groups to downstream consumers.

Warning

This partially unwinds the stream and will increase memory usage. Only buffer to amounts you’re comfortable holding in memory at once.

flatten() AsyncStream[U][source]

Flatten iterators into their elements.

This will flatten iterables, async iterables and awaitable, so it has the same utility as sloths.Stream.flatten():

>>> import asyncio
>>> asyncio.run(AsyncStream.range(11).batch(by=2).flatten().collect())
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
>>> asyncio.run(AsyncStream.range(0).batch(by=2).flatten().collect())
[]

But can also be used to flatten async results (this trivial case is equivalent to calling amap()):

>>> async def aadd_2(x):
...     await asyncio.sleep(0.001)
...     return x + 2
>>> asyncio.run(AsyncStream.range(11).map(aadd_2).flatten().collect())
[2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]

Or async iterables:

>>> async def apair(x):
...     await asyncio.sleep(0.001)
...     for _ in range(2):
...         yield x
>>> asyncio.run(AsyncStream.range(5).map(apair).flatten().collect())
[0, 0, 1, 1, 2, 2, 3, 3, 4, 4]
filter(predicate: collections.abc.Callable[[T], bool] | None = None) AsyncStream[T][source]

Filter elements by running them through a predicate function.

afilter(predicate: collections.abc.Callable[[T], collections.abc.Awaitable[bool]]) AsyncStream[T][source]

Filter elements by running them through an asynchronous predicate.

take(count: int) AsyncStream[T][source]

Take up to count element from the stream and interrupt.

Upstream generators will not be polled once we’ve reached the requested number of elements so the source can be consumed to its end separately.

skip(count: int) AsyncStream[T][source]

Skip over count element from the iterator.

take_while(predicate: collections.abc.Callable[[T], bool] | None = None) AsyncStream[T][source]

Consume element from the stream until the predicate returns False.

skip_while(predicate: collections.abc.Callable[[T], bool] | None = None) AsyncStream[T][source]

Skip elements until the predicate returns True.

async consume()[source]

Consume the stream but discard the results.

This is useful for infinite pipelines or processing pipelines where the results are not important.

async collect() list[T][source]
async collect(collector: collections.abc.Callable[[collections.abc.AsyncIterable[T]], collections.abc.Awaitable[U]]) U

Collect the iterator.

By default this collects into a list but custom collectors are also supported as long as they accept async iterables as input.

async count() int[source]

Return the length of the stream after consuming it.

__alen__ would implicitly consume the stream in various places so is unsafe to add.

async nth(nth: int) T[source]
async nth(nth: int, *, default: T) T
async nth(nth: int, *, default: U) T | U

Return the nth value.

Raises IndexError if the stream isn’t long enough and a default value is not provided.

async find(predicate: collections.abc.Callable[[T], bool] | None = None) T | None[source]

Find the first elements that satisfies a predicate.

This short-cirtcuits so it won’t consume the source iterator past the target element:

async afind(predicate: collections.abc.Callable[[T], collections.abc.Awaitable[bool]]) T | None[source]

Find the first elements that satisfies an asynchronous predicate.

This short-cirtcuits so it won’t consume the source iterator past the target element:

async fold(fn: collections.abc.Callable[[U, T], U], acc: U) U[source]

Fold every element into an accumulator function.

async afold(fn: collections.abc.Callable[[U, T], collections.abc.Awaitable[U]], acc: U) U[source]

Fold every element into an asynchronous accumulator function.

class sloths.ext.asyncio.AsyncPipe(source: collections.abc.AsyncIterable[T], fn: AsyncTransform[T, P, U], name: str | None = None, *args: P, **kwargs: P)[source]

Bases: Generic[T, P, U], AsyncStream[U]

A stream representing a source stream passed through a transform function.

This should not be interacted with directly.

See also

Stream.pipe()

async sloths.ext.asyncio.make_async(it: collections.abc.Iterable[T]) collections.abc.AsyncIterable[T][source]

Wrap a synchronous iterator in an asynchronous one.