Day 28 · ~20m

Putting It All Together: An Async + Iterator + Protocol Pipeline from Scratch

Build a complete async order processing pipeline using Protocol for interfaces, async generators for lazy iteration, context managers for cleanup, and singledispatch for routing.

student (excited)

I was in the shower this morning thinking about the pipeline refactor. And I realized — I already know everything I need. I just haven't assembled it.

teacher (amused)

The shower is where architecture happens. What did you figure out?

student (focused)

The pipeline needs three things. A protocol that defines what an order source looks like — any object with an async fetch method, nothing more. An async iterator that pulls from that source lazily. And a context manager that owns the whole pipeline lifecycle — setup, run, cleanup, always.

teacher (surprised)

You just designed a production-grade async pipeline before the lesson started. That is a first.

student (amused)

Three weeks ago I was afraid to touch async code. Now I'm annoyed that the existing pipeline isn't using gather. Things change.

teacher (neutral)

Walk me through the Protocol piece first. Why Protocol and not ABC?

student (thinking)

Because the order sources don't share state. A database source, a Kafka consumer, a mock for testing — none of them should inherit from a common base. They just need to implement async_fetch. Protocol is a skills test, not a membership card. Structural typing.

teacher (focused)

Exact reasoning. And with @runtime_checkable, you can do isinstance(source, OrderSource) at runtime without ABC's registration machinery. That matters for the validation layer.

student (curious)

Right — without @runtime_checkable, Protocol is type-checker only. The isinstance check would blow up at runtime.

teacher (neutral)

Correct. Here is what the Protocol looks like:

from typing import Protocol, runtime_checkable

@runtime_checkable
class OrderSource(Protocol):
    async def async_fetch(self, page: int) -> list[dict]:
        ...

Notice the ... body. This is a pure interface — no implementation. Any class that has an async_fetch(page) method now satisfies this Protocol at runtime. No inheritance required.

student (proud)

And a MockOrderSource in tests just needs that one method. It doesn't need to know OrderSource exists. That's the whole point.

teacher (encouraging)

That is the whole point. Your test doubles stay clean. Your production sources stay clean. The contract lives in one place.

Now the async iterator. You built the iterator protocol on Day 25. What changes when you make it async?

student (thinking)

Instead of __iter__ and __next__, you implement __aiter__ and __anext__. And instead of a regular for loop, you use async for. The difference is that __anext__ is a coroutine — it can await things. Like, await the source's async_fetch call.

teacher (focused)

Precisely. And StopAsyncIteration is the async version of StopIteration — you raise it when pages are exhausted. Let me show you the skeleton:

class AsyncOrderIterator:
    def __init__(self, source: OrderSource, max_pages: int = 5):
        self.source = source
        self.max_pages = max_pages
        self.current_page = 0

    def __aiter__(self):
        return self

    async def __anext__(self) -> dict:
        if self.current_page >= self.max_pages:
            raise StopAsyncIteration

        self.current_page += 1
        page = await self.source.async_fetch(self.current_page)

        if not page:
            raise StopAsyncIteration

        # Yield orders one at a time from the page
        # ...
student (curious)

Wait. If async_fetch returns a list of orders, but I want to yield individual orders — the iterator yields one order per next call, not one page — how do I handle that?

teacher (neutral)

Good catch. You need to buffer a page. Hold the current page's orders, drain them one at a time, then fetch the next page when the buffer is empty:

class AsyncOrderIterator:
    def __init__(self, source: OrderSource, max_pages: int = 5):
        self.source = source
        self.max_pages = max_pages
        self.current_page = 0
        self._buffer: list[dict] = []

    def __aiter__(self):
        return self

    async def __anext__(self) -> dict:
        while not self._buffer:
            if self.current_page >= self.max_pages:
                raise StopAsyncIteration
            self.current_page += 1
            self._buffer = await self.source.async_fetch(self.current_page)
            if not self._buffer:
                raise StopAsyncIteration

        return self._buffer.pop(0)

The while not self._buffer loop keeps fetching pages until it gets orders or exhausts pages. Then it pops orders one at a time.

student (excited)

So the iterator hides the pagination entirely. The caller just does async for order in AsyncOrderIterator(source) and gets individual orders. It has no idea there are pages underneath.

teacher (focused)

Lazy evaluation of a paginated async source, one order at a time. That is the whole iterator value.

student (thinking)

Now the context manager. The ManagedPipeline owns the source — initializes it, runs the pipeline, and guarantees cleanup on exit whether or not something goes wrong.

teacher (serious)

What does cleanup mean concretely? What state does the pipeline own?

student (focused)

The list of workers — the asyncio tasks. If we're using gather to run concurrent workers, we need to cancel those tasks on failure. Otherwise they keep running even after the pipeline crashes.

teacher (surprised)

You just identified the most common async resource leak. Tasks that are created but never awaited or cancelled outlive the pipeline and keep consuming memory. The context manager's job is exactly that — on exit, cancel any tasks that are still running.

class ManagedPipeline:
    def __init__(self, source: OrderSource, num_workers: int = 3):
        self.source = source
        self.num_workers = num_workers
        self._tasks: list = []
        self.is_open = False
        self.orders_processed = 0

    async def __aenter__(self):
        self.is_open = True
        print(f"[Pipeline] Opening with {self.num_workers} workers")
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        self.is_open = False
        for task in self._tasks:
            if not task.done():
                task.cancel()
        if self._tasks:
            import asyncio
            await asyncio.gather(*self._tasks, return_exceptions=True)
        print(f"[Pipeline] Closed. Processed {self.orders_processed} orders.")
        return False
student (thinking)

return_exceptions=True in gather — that's important. Without it, if a cancelled task raises CancelledError, gather would propagate it and potentially interfere with the original exception from the pipeline.

teacher (neutral)

Exactly. During cleanup you want to swallow task cancellations. return_exceptions=True turns exceptions into return values so gather doesn't raise.

student (focused)

And since __aenter__ and __aexit__ are coroutines, this is an async context manager — async with ManagedPipeline(source) as pipeline:

teacher (encouraging)

Right. contextlib.asynccontextmanager is the decorator shorthand, same as contextmanager but for async. The class approach is cleaner when you have state to track.

Now singledispatch. The pipeline receives orders of different types — standard orders, rush orders, subscription orders. How would you route them?

student (amused)

The old way is a giant if/elif chain. if order['type'] == 'rush': ... elif order['type'] == 'standard': .... It grows forever and it's impossible to extend without editing the core function.

teacher (focused)

And the singledispatch way?

student (thinking)

functools.singledispatch dispatches by type. But our orders are dicts, not typed objects. So I'd either wrap them in dataclasses or use a manual dispatch table. Actually — I'd use dataclasses. Define StandardOrder, RushOrder, SubscriptionOrder as frozen dataclasses, convert the raw dict to the right type, then let singledispatch route to the right handler.

teacher (excited)

That is the professional design. The singledispatch boundary is where raw dicts become typed objects. Parsing at the edge, typed throughout the interior.

from dataclasses import dataclass
from functools import singledispatch

@dataclass(frozen=True)
class StandardOrder:
    id: str
    total: float
    order_type: str = "standard"

@dataclass(frozen=True)
class RushOrder:
    id: str
    total: float
    rush_fee: float
    order_type: str = "rush"

@dataclass(frozen=True)
class SubscriptionOrder:
    id: str
    total: float
    subscription_id: str
    order_type: str = "subscription"


@singledispatch
def process_order(order) -> str:
    raise NotImplementedError(f"No handler for {type(order)}")

@process_order.register(StandardOrder)
def _(order: StandardOrder) -> str:
    return f"[Standard] {order.id}: ${order.total:.2f}"

@process_order.register(RushOrder)
def _(order: RushOrder) -> str:
    return f"[Rush] {order.id}: ${order.total:.2f} + ${order.rush_fee:.2f} fee"

@process_order.register(SubscriptionOrder)
def _(order: SubscriptionOrder) -> str:
    return f"[Sub] {order.id}: ${order.total:.2f} (sub: {order.subscription_id})"
student (proud)

And if we add a new order type later, we add a new dataclass and a new @process_order.register — we never touch the existing handlers. Open/closed principle.

teacher (amused)

Amir would approve. Now let's put the full symphony together. Everything we've built: Protocol for the source interface, async iterator for lazy order fetching, context manager for pipeline lifecycle, singledispatch for order routing, asyncio.gather for concurrent workers.

student (focused)

The run_pipeline function creates the iterator, opens the pipeline context manager, creates a queue of orders, then spawns workers with gather. Each worker pulls from the queue and calls process_order.

teacher (neutral)

Sketch it:

import asyncio

async def run_pipeline(source: OrderSource, num_workers: int = 3) -> dict:
    results = []
    queue: asyncio.Queue = asyncio.Queue()
    
    async def worker():
        while True:
            try:
                order = queue.get_nowait()
            except asyncio.QueueEmpty:
                break
            result = process_order(order)
            results.append(result)
            queue.task_done()

    async with ManagedPipeline(source, num_workers) as pipeline:
        # Fill the queue from the async iterator
        async for raw_order in AsyncOrderIterator(source):
            typed_order = coerce_order(raw_order)  # dict → dataclass
            await queue.put(typed_order)

        # Run workers concurrently
        worker_tasks = [asyncio.create_task(worker()) for _ in range(num_workers)]
        pipeline._tasks = worker_tasks
        await asyncio.gather(*worker_tasks)
        pipeline.orders_processed = len(results)

    return {"processed": len(results), "results": results}
student (excited)

And coerce_order is just the type routing — reads the order_type key and returns the right dataclass. That's where the dict becomes typed.

teacher (focused)

Exactly. The boundary between untyped and typed. And notice what each layer is doing: Protocol says what a source must be. AsyncOrderIterator hides pagination. ManagedPipeline owns cleanup. singledispatch owns routing. gather owns concurrency. No layer knows about the others.

student (thinking)

The iterator doesn't know about workers. The context manager doesn't know about order types. singledispatch doesn't know about async. Each piece is composable because each piece does exactly one thing.

teacher (neutral)

That is the synthesis. Four weeks of individual instruments — this is the performance. Context managers from Day 24, iterator protocol from Day 25, itertools composition from Day 26, singledispatch from Day 27, and async generators from Week 2. All of it coordinated.

student (proud)

I want to write this from scratch. The full thing. Not fill in blanks — build it.

teacher (encouraging)

That is exactly what today's challenge is. Start with the Protocol. Define the interface before the implementation. Mock source first, real design second.

student (excited)

One question before I start. The coerce_order function — should it be singledispatch too? Like, dispatch on the raw dict somehow?

teacher (amused)

You can't dispatch on dicts — they're all the same type. The coerce step is just a conditional factory. The dispatch happens after coercion, on the typed result. One step reads the string, next step routes by type. Clean separation.

student (focused)

Got it. Protocol → async iterator → context manager → queue → workers → singledispatch. Everything in order.

teacher (serious)

One edge case to watch: when the source returns an empty page before reaching max_pages. Your iterator should stop early rather than fetching empty pages forever. Check for empty response before incrementing the counter.

student (thinking)

Right — the if not page: raise StopAsyncIteration guard has to come before the loop goes back to fetch again. Otherwise you get a thundering herd of empty fetches at the end.

teacher (neutral)

Exactly. The empty-page guard is the one subtle detail in this whole pipeline. Everything else is assembling parts you already understand.

student (excited)

Then I'll build it. And then Day 29 is the quiz — the Week 4 checkpoint. And Day 30 is...

teacher (focused)

Day 29 is one checkpoint — five questions across everything from Week 4. Context managers, iterators, itertools, singledispatch, and this pipeline. You are going to pass it.

student (amused)

I'm going to do more than pass it.

teacher (neutral)

And Day 30. That one is different. It's not a new concept. It's a question: what kind of engineer are you now? Four weeks ago you were afraid to touch async code. You had no idea why Amir's code worked. You were promoted into a ceiling you could not see. What about now?

student (proud)

Now I design the pipeline in the shower. I explain Protocol vs ABC in code reviews. I refactored the production system and Amir had zero comments. Zero. I'm a different engineer than I was on Day 1.

teacher (encouraging)

That is the only answer that matters. Build the pipeline.