Cookbook¶
Reading a file line by line¶
def compute_sequential(filename: str) -> int:
"""
Given a file made of one integer per line read line by line and
compute the sum of all lines.
"""
from sloths import Stream
with Path(filename).open() as f:
return (
Stream(f)
.filter(lambda x: bool(x.strip()))
.map(int)
.fold(lambda x, y: x + y, 0)
)
If the file is very large we can also do this using a process pool:
def compute_processpool(
filename: str,
batch_size: int = 16_000,
workers: int = 4,
) -> int:
"""
Given a file made of one integer per line read line by line and
compute the sum of all lines across multiple processes by reading batches
of lines and distributing the sum operation for the batches to individual
processes.
"""
from concurrent.futures import ProcessPoolExecutor
from sloths import Stream
from sloths.ext.concurrent import imap_with_executor_as_completed
with (
Path(filename).open() as f,
ProcessPoolExecutor(max_workers=workers) as e,
):
return (
Stream(f)
.batch(batch_size)
.pipe(
imap_with_executor_as_completed,
part_sum,
executor=e,
prefetch=workers * 2,
)
.fold(lambda x, y: x + y, 0)
)
def part_sum(batch: Iterable[str]) -> int:
"""Parse and sum a batch of lines."""
return sum(int(x.strip()) for x in batch if x.strip())
Rate limiting¶
from __future__ import annotations
import sys
from collections.abc import Iterable
from time import perf_counter, sleep
from typing import TypeVar
from sloths import Stream
T = TypeVar("T")
def rate_limit(it: Iterable[T], *, interval_seconds: float) -> Iterable[T]:
"""
A very basic rate limiting transform which allows at most one item per
``interval_seconds`` interval.
"""
iterator = iter(it)
try:
last_yield = perf_counter()
yield next(iterator)
except StopIteration:
return
while True:
since_last_yield = perf_counter() - last_yield
if since_last_yield < interval_seconds:
# This technically adds some tiny wall time over the rate
# limit which is acceptable in most cases.
#
# Putting the sleep before the yield also means we'll poll upstream
# at most once per interval, we could move it after the yield to
# only make values available within the limit but poll as fast as we
# can.
sleep(interval_seconds - since_last_yield)
try:
last_yield = perf_counter()
yield next(iterator)
except StopIteration:
return
if __name__ == "__main__":
start = perf_counter()
size = int(sys.argv[-2])
limit = float(sys.argv[-1])
Stream(range(size)).inspect(lambda x: print(">", x)).pipe(
rate_limit,
interval_seconds=limit,
).inspect(lambda x: print("<", x)).consume()
assert perf_counter() - start >= limit * size
Adding progress bars¶
The inspect() function is useful to add progress monitoring
hooks within the pipeline, for example using tqdm and working off the file
reading example above:
def compute(
filename: str,
batch_size: int = 16_000,
workers: int = 4,
) -> int:
with (
Path(filename).open() as f,
ProcessPoolExecutor(max_workers=workers) as e,
tqdm.tqdm(desc="Lines read") as read_pb,
tqdm.tqdm(desc="Batches ready") as batches_pb,
tqdm.tqdm(desc="Batches summed") as summed_pb,
):
return (
Stream(f)
.inspect(lambda _: read_pb.update(1))
.batch(batch_size)
.inspect(lambda _: batches_pb.update(1))
.pipe(
imap_with_executor_as_completed,
part_sum,
executor=e,
prefetch=workers * 2,
)
.inspect(lambda _: summed_pb.update(1))
.fold(lambda x, y: x + y, 0)
)
Will result in an output like so:
Lines read: 79100904it [00:25, 2868235.68it/s]
Batches ready: 4943it [00:25, 179.08it/s]
Batches summed: 4939it [00:25, 176.77it/s]