sloths.ext.asyncio¶
asyncio native stream class.
Classes¶
Async version of |
|
A stream representing a source stream passed through a transform function. |
Functions¶
|
Wrap a synchronous iterator in an asynchronous one. |
Module Contents¶
- class sloths.ext.asyncio.AsyncStream(source: collections.abc.AsyncIterable[T])[source]¶
Bases:
Generic[T],collections.abc.AsyncIterable[T]Async version of
sloths.Streambut async iterators.It works essentially the same and expose the same interface but in an async/await compatible manner.
Some functions which take callbacks such as
map()also have prefixed async equivalentamap()which take an async callback instead.- classmethod range(*args: SupportsIndex) AsyncStream[int][source]¶
Create a simple async stream over
range().
- chain(*others: collections.abc.AsyncIterable[T]) AsyncStream[T][source]¶
Chain one or more async iterables to the current ones.
See also
- pipe(fn: AsyncTransform[T, P, U], *args: P, **kwargs: P) AsyncPipe[T, P, U][source]¶
Chain a transform to a stream and return the resulting stream.
See also
Transforms are the core composability primitive and are simply callables which take an iterable and return another iterable. Usually these are lazy generators.
>>> import asyncio>>> async def to_str(iterable: AsyncIterable[int]) -> AsyncIterable[str]: ... async for x in iterable: ... yield str(x) ... >>> asyncio.run(AsyncStream.range(10).pipe(to_str).collect()) ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
Transforms can also decide to short-circuit or selectively yield for control-flow:
>>> async def to_str_if_odd(iterable: AsyncIterable[int]) -> AsyncIterable[str]: ... async for x in iterable: ... if x % 2: ... yield str(x) ... >>> asyncio.run(AsyncStream.range(10).pipe(to_str_if_odd).collect()) ['1', '3', '5', '7', '9']
And all the same properties as the sync version.
Warning
When writing transforms be careful not to accidentally consume the iterable as this would negate much of the benefit of chaining generators in the first place.
- inspect(cb: collections.abc.Callable[[T], Any]) AsyncStream[T][source]¶
Execute a function on each element without modifying it.
See also
This is mostly useful for debugging but could be used as the base for monitoring and metrics or any other side-effects.
- enumerate() AsyncStream[tuple[int, T]][source]¶
Python’s
enumerateas a transform.See also
- map(fn: collections.abc.Callable[[T], U]) AsyncStream[U][source]¶
Run a synchronous element-wise transform over the stream.
See also
- try_map(fn: collections.abc.Callable[[T], U], exc_cls: tuple[type[Exception], Ellipsis] = (Exception,), *, cb: collections.abc.Callable[[Exception, T], None] | None = None) AsyncStream[U][source]¶
Run a synchronous element-wise transform over the stream and discard errors.
See also
- amap(fn: collections.abc.Callable[[T], collections.abc.Awaitable[U]]) AsyncStream[U][source]¶
Run an asynchronous element-wise transform over the stream.
This is equivalent to
AsyncStream(...).map(...).flatten().
- atry_map(fn: collections.abc.Callable[[T], collections.abc.Awaitable[U]], exc_cls: tuple[type[Exception], Ellipsis] = (Exception,), *, cb: collections.abc.Callable[[Exception, T], None] | None = None) AsyncStream[U][source]¶
Run an asynchronous element-wise transform over the stream discard errors.
- try_(exc_cls: tuple[type[Exception], Ellipsis] = (Exception,), *, cb: collections.abc.Callable[[Exception], None] | None = None) AsyncStream[T][source]¶
Stop on the first exception and discard it.
See also
This is more generic than
try_map()and will catch error that happened when callingnext()on the upstream transform but will stop iteration on the first exception.
- batch(by: int) AsyncStream[tuple[T, Ellipsis]][source]¶
Buffer the stream and provide groups to downstream consumers.
See also
Warning
This partially unwinds the stream and will increase memory usage. Only buffer to amounts you’re comfortable holding in memory at once.
- flatten() AsyncStream[U][source]¶
Flatten iterators into their elements.
This will flatten iterables, async iterables and awaitable, so it has the same utility as
sloths.Stream.flatten():>>> import asyncio>>> asyncio.run(AsyncStream.range(11).batch(by=2).flatten().collect()) [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
>>> asyncio.run(AsyncStream.range(0).batch(by=2).flatten().collect()) []
But can also be used to flatten async results (this trivial case is equivalent to calling
amap()):>>> async def aadd_2(x): ... await asyncio.sleep(0.001) ... return x + 2 >>> asyncio.run(AsyncStream.range(11).map(aadd_2).flatten().collect()) [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
Or async iterables:
>>> async def apair(x): ... await asyncio.sleep(0.001) ... for _ in range(2): ... yield x >>> asyncio.run(AsyncStream.range(5).map(apair).flatten().collect()) [0, 0, 1, 1, 2, 2, 3, 3, 4, 4]
- filter(predicate: collections.abc.Callable[[T], bool] | None = None) AsyncStream[T][source]¶
Filter elements by running them through a predicate function.
See also
- afilter(predicate: collections.abc.Callable[[T], collections.abc.Awaitable[bool]]) AsyncStream[T][source]¶
Filter elements by running them through an asynchronous predicate.
- take(count: int) AsyncStream[T][source]¶
Take up to
countelement from the stream and interrupt.See also
Upstream generators will not be polled once we’ve reached the requested number of elements so the source can be consumed to its end separately.
- skip(count: int) AsyncStream[T][source]¶
Skip over
countelement from the iterator.See also
- take_while(predicate: collections.abc.Callable[[T], bool] | None = None) AsyncStream[T][source]¶
Consume element from the stream until the predicate returns
False.See also
- skip_while(predicate: collections.abc.Callable[[T], bool] | None = None) AsyncStream[T][source]¶
Skip elements until the predicate returns
True.See also
- async consume()[source]¶
Consume the stream but discard the results.
This is useful for infinite pipelines or processing pipelines where the results are not important.
- async collect() list[T][source]¶
- async collect(collector: collections.abc.Callable[[collections.abc.AsyncIterable[T]], collections.abc.Awaitable[U]]) U
Collect the iterator.
By default this collects into a list but custom collectors are also supported as long as they accept async iterables as input.
- async count() int[source]¶
Return the length of the stream after consuming it.
__alen__would implicitly consume the stream in various places so is unsafe to add.
- async nth(nth: int) T[source]¶
- async nth(nth: int, *, default: T) T
- async nth(nth: int, *, default: U) T | U
Return the
nthvalue.See also
Raises
IndexErrorif the stream isn’t long enough and a default value is not provided.
- async find(predicate: collections.abc.Callable[[T], bool] | None = None) T | None[source]¶
Find the first elements that satisfies a predicate.
See also
This short-cirtcuits so it won’t consume the source iterator past the target element:
- async afind(predicate: collections.abc.Callable[[T], collections.abc.Awaitable[bool]]) T | None[source]¶
Find the first elements that satisfies an asynchronous predicate.
This short-cirtcuits so it won’t consume the source iterator past the target element:
- class sloths.ext.asyncio.AsyncPipe(source: collections.abc.AsyncIterable[T], fn: AsyncTransform[T, P, U], name: str | None = None, *args: P, **kwargs: P)[source]¶
Bases:
Generic[T,P,U],AsyncStream[U]A stream representing a source stream passed through a transform function.
This should not be interacted with directly.
See also
Stream.pipe()