Cookbook

Reading a file line by line

def compute_sequential(filename: str) -> int:
    """
    Given a file made of one integer per line read line by line and
    compute the sum of all lines.
    """
    from sloths import Stream

    with Path(filename).open() as f:
        return (
            Stream(f)
            .filter(lambda x: bool(x.strip()))
            .map(int)
            .fold(lambda x, y: x + y, 0)
        )

If the file is very large we can also do this using a process pool:

def compute_processpool(
    filename: str,
    batch_size: int = 16_000,
    workers: int = 4,
) -> int:
    """
    Given a file made of one integer per line read line by line and
    compute the sum of all lines across multiple processes by reading batches
    of lines and distributing the sum operation for the batches to individual
    processes.
    """
    from concurrent.futures import ProcessPoolExecutor

    from sloths import Stream
    from sloths.ext.concurrent import imap_with_executor_as_completed

    with (
        Path(filename).open() as f,
        ProcessPoolExecutor(max_workers=workers) as e,
    ):
        return (
            Stream(f)
            .batch(batch_size)
            .pipe(
                imap_with_executor_as_completed,
                part_sum,
                executor=e,
                prefetch=workers * 2,
            )
            .fold(lambda x, y: x + y, 0)
        )
def part_sum(batch: Iterable[str]) -> int:
    """Parse and sum a batch of lines."""
    return sum(int(x.strip()) for x in batch if x.strip())

Rate limiting

from __future__ import annotations

import sys
from collections.abc import Iterable
from time import perf_counter, sleep
from typing import TypeVar

from sloths import Stream

T = TypeVar("T")


def rate_limit(it: Iterable[T], *, interval_seconds: float) -> Iterable[T]:
    """
    A very basic rate limiting transform which allows at most one item per
    ``interval_seconds`` interval.
    """
    iterator = iter(it)

    try:
        last_yield = perf_counter()
        yield next(iterator)
    except StopIteration:
        return

    while True:
        since_last_yield = perf_counter() - last_yield
        if since_last_yield < interval_seconds:
            # This technically adds some tiny wall time over the rate
            # limit which is acceptable in most cases.
            #
            # Putting the sleep before the yield also means we'll poll upstream
            # at most once per interval, we could move it after the yield to
            # only make values available within the limit but poll as fast as we
            # can.
            sleep(interval_seconds - since_last_yield)
        try:
            last_yield = perf_counter()
            yield next(iterator)
        except StopIteration:
            return


if __name__ == "__main__":
    start = perf_counter()

    size = int(sys.argv[-2])
    limit = float(sys.argv[-1])

    Stream(range(size)).inspect(lambda x: print(">", x)).pipe(
        rate_limit,
        interval_seconds=limit,
    ).inspect(lambda x: print("<", x)).consume()

    assert perf_counter() - start >= limit * size

Adding progress bars

The inspect() function is useful to add progress monitoring hooks within the pipeline, for example using tqdm and working off the file reading example above:

def compute(
    filename: str,
    batch_size: int = 16_000,
    workers: int = 4,
) -> int:
    with (
        Path(filename).open() as f,
        ProcessPoolExecutor(max_workers=workers) as e,
        tqdm.tqdm(desc="Lines read") as read_pb,
        tqdm.tqdm(desc="Batches ready") as batches_pb,
        tqdm.tqdm(desc="Batches summed") as summed_pb,
    ):
        return (
            Stream(f)
            .inspect(lambda _: read_pb.update(1))
            .batch(batch_size)
            .inspect(lambda _: batches_pb.update(1))
            .pipe(
                imap_with_executor_as_completed,
                part_sum,
                executor=e,
                prefetch=workers * 2,
            )
            .inspect(lambda _: summed_pb.update(1))
            .fold(lambda x, y: x + y, 0)
        )

Will result in an output like so:

Lines read: 79100904it [00:25, 2868235.68it/s]
Batches ready: 4943it [00:25, 179.08it/s]
Batches summed: 4939it [00:25, 176.77it/s]