Day 13 · ~20m

asyncio Patterns: Tasks, Queues, and Cancellation

You said the pipeline fix was 5 lines. What happens when order 847 of 1000 throws an exception? Learn asyncio.Queue, task cancellation, and production-safe error handling.

student (focused)

You said the pipeline fix was five lines. asyncio.gather on all the orders instead of a for loop with await. I already started writing the PR. Then I realized — what happens if one order fails? Do all the other orders stop processing?

teacher (neutral)

Did you test it?

student (curious)

I ran it locally with some fake order data and added a raise Exception() in the middle. Yeah. The whole gather crashes and stops. Eleven orders in, one throws, the other 989 don't even start.

teacher (serious)

That is your first clue that the five-line fix is not production-ready. You have 989 orders stuck in the queue and a manager asking why the pipeline crashed on Thursday at 2 AM. This week is about the fifteen-line version.

student (thinking)

So we're talking about error handling. gather has a parameter for that, right? return_exceptions=True?

teacher (focused)

That is exactly right. But before we get there, we need to talk about structure. Right now you are calling the handler once per order, top to bottom. What you actually need is a producer-consumer pattern — one or more workers picking orders from a queue and processing them in parallel, while the main thread keeps loading new orders in.

student (confused)

That sounds more complicated than gather. Why not just stick with gather and add return_exceptions=True?

teacher (encouraging)

gather is great for "I have a known list of tasks right now, run them all." A queue is great for "orders are arriving continuously, I don't know how many, and I want steady throughput." Your pipeline has orders coming from an HTTP endpoint, a database, a Kafka topic — you don't have a fixed list. You have a stream. The moment you have a stream, you need a queue.

student (focused)

Okay. So the pattern is: producer puts orders into a queue, N workers pull from the queue and process them, both running concurrently. How do I coordinate them so the workers know when to stop?

teacher (neutral)

With a sentinel value. The producer sends a special value (usually None) down the queue to signal "no more orders." When a worker sees the sentinel, it exits the loop.

import asyncio

async def producer(queue, orders):
    """Add orders to the queue, then signal completion."""
    for order in orders:
        await queue.put(order)
    # Signal that production is done
    await queue.put(None)

async def worker(queue, worker_id):
    """Process orders from the queue until sentinel."""
    while True:
        order = await queue.get()
        if order is None:  # Sentinel — stop
            break
        # Process the order
        print(f"Worker {worker_id} processed {order}")
        queue.task_done()

async def run_pipeline(orders, num_workers):
    queue = asyncio.Queue()
    # Start producer
    producer_task = asyncio.create_task(producer(queue, orders))
    # Start workers
    workers = [asyncio.create_task(worker(queue, i)) for i in range(num_workers)]
    # Wait for producer to finish
    await producer_task
    # Wait for all queued items to be processed
    await queue.join()
    # Cancel workers (they are still waiting in their while loop)
    for w in workers:
        w.cancel()
student (thinking)

Wait — asyncio.create_task(). That is different from asyncio.gather(). What is the difference?

teacher (serious)

gather takes a list of tasks and waits for all of them. create_task schedules a coroutine to run and returns a Task object immediately — the coroutine runs in the background. You use create_task when you want to start something and keep going, then come back to it later.

student (curious)

So create_task is "run this in the background," and I can call it multiple times for multiple tasks, and they all run concurrently?

teacher (focused)

Exactly. And when you want to wait for them, you can collect the Task objects and call await asyncio.gather(*tasks) to wait for all of them.

student (thinking)

But in that example, you called gather nowhere. You just called await producer_task and then await queue.join(). How does the consumer wait for the workers to finish?

teacher (encouraging)

queue.join() blocks until every item in the queue has been processed — specifically, until queue.task_done() is called for each item. The workers call task_done() after processing. When all items are done, join() unblocks.

student (focused)

Okay so the flow is: producer puts orders in the queue. Workers pull from the queue, process, call task_done(). Producer signals with None. When the queue is empty and all items are marked done, join() returns. Then we cancel the workers.

teacher (amused)

You just outlined the whole pattern without me finishing the explanation. What did I miss?

student (confused)

The cancel part. You called w.cancel() on each worker. What does that actually do? They are already waiting in their while loop, blocked on queue.get(). How does cancel interrupt that?

teacher (serious)

cancel() raises asyncio.CancelledError inside the coroutine at the next await point. In this case, queue.get() will raise CancelledError. If the worker does not catch it, the task dies. If the worker catches it and cleanup is needed, it can do that.

async def worker(queue, worker_id):
    try:
        while True:
            order = await queue.get()
            if order is None:
                break
            print(f"Worker {worker_id} processed {order}")
            queue.task_done()
    except asyncio.CancelledError:
        print(f"Worker {worker_id} was cancelled")
        # Cleanup if needed
        raise  # Always re-raise CancelledError
student (curious)

Why re-raise it? If I caught it, shouldn't I be done with the error?

teacher (neutral)

CancelledError is special. If you catch it and do not re-raise it, the task reports as successfully completed rather than cancelled. You want the Task object to remember that it was cancelled. The exception is the signal. Let it propagate.

student (thinking)

Okay but we still haven't solved the original problem — what happens if a worker crashes while processing an order? The current code will just die and the order never gets task_done() called.

teacher (focused)

Wrap the processing in a try-except:

async def worker(queue, worker_id):
    try:
        while True:
            order = await queue.get()
            if order is None:
                break
            try:
                # Process the order
                print(f"Worker {worker_id} processed {order}")
            except Exception as e:
                # Log the error but keep the worker alive
                print(f"Worker {worker_id} failed on {order}: {e}")
            finally:
                queue.task_done()  # Always mark it done, even if it failed
    except asyncio.CancelledError:
        print(f"Worker {worker_id} was cancelled")
        raise

Now: worker crashes on order 847, logs the error, calls task_done() anyway, and moves on to order 848. The pipeline does not die. The order is marked as processed (even though it failed) so queue.join() eventually completes.

student (excited)

So that is the production-safe version. One order fails, you log it, the other 989 still process. This is what I need in the PR.

teacher (encouraging)

That is exactly what you need. But there is still one more edge case — what if you want to cancel the entire pipeline while orders are still being processed? Not because they failed, but because the user pressed Ctrl+C or the deployment is shutting down?

student (focused)

Like, the whole async context is shutting down and you want to cancel all the workers gracefully?

teacher (serious)

Exactly. If you just let them die, they might be in the middle of processing an order, writing to a database, leaving things in an inconsistent state. You want to cancel them, let them finish what they are doing (if possible), and then shut down.

async def run_pipeline(orders, num_workers):
    queue = asyncio.Queue()
    producer_task = asyncio.create_task(producer(queue, orders))
    workers = [asyncio.create_task(worker(queue, i)) for i in range(num_workers)]
    
    try:
        await producer_task
        await queue.join()
    except asyncio.CancelledError:
        print("Pipeline cancellation requested")
        # Cancel all workers and wait for them to finish cleanup
        for w in workers:
            w.cancel()
        # This gathers the workers and catches CancelledError
        await asyncio.gather(*workers, return_exceptions=True)
        raise  # Propagate the cancellation
    else:
        # Normal shutdown — cancel workers and wait for cleanup
        for w in workers:
            w.cancel()
        await asyncio.gather(*workers, return_exceptions=True)
student (thinking)

That is starting to look like the pipeline is managing a lot of state. Workers, producer, queue, cancellation, error handling, logging. Fifteen lines is not enough. This is a class.

teacher (amused)

Welcome to production Python. Your five-line gather is now a pipeline orchestrator. And you are not wrong — wrapping all of this into a class with clear state management and testing hooks is the right next step. But this pattern is the foundation: queue for streaming work, producers putting items in, workers pulling and processing, sentinel for shutdown, task_done() for synchronization, cancel() for graceful interruption.

student (proud)

So the kitchen brigade analogy — the queue is the order ticket window. Orders (tickets) pile up. Cooks (workers) grab tickets, cook, mark them done. The chef (producer) sends a last ticket that says "no more orders." When there are no tickets left and all the cooks have marked their ticket done, service is over. If one cook messes up a dish, they log it, mark the ticket done anyway, and grab the next one. If someone yells "we're closing," the chef stops sending tickets, the cooks finish what they are on, and everyone goes home.

teacher (proud)

That is not just understanding the pattern. That is understanding why the pattern exists. You just mapped the entire asynchronous execution model to a real restaurant. That is how you know you have got it.

student (curious)

One more thing — when you cancel() a task, is that immediate? Or does the task get some time to finish?

teacher (neutral)

Calling cancel() just raises the exception. The task raises it at the next await point. If the task is doing a long synchronous operation (not awaiting anything), the cancellation does not happen until the next await. So if you have time.sleep(10) in your worker, and you call cancel(), the worker will keep sleeping for ten seconds. If you have await asyncio.sleep(10), cancel happens immediately.

student (thinking)

So I should use async functions everywhere in my worker so cancellation can propagate cleanly?

teacher (focused)

That is the pattern. Keep blocking operations out of async functions. If you need to do synchronous work, do it in a thread pool (executor) and await it. But for a pipeline processing HTTP requests and database writes, everything should already be async.

student (excited)

Okay I am going to rewrite that PR with the queue pattern and all the error handling. I want the pipeline to be production-safe — one order fails, log it, keep going. The whole thing gets cancelled, everything shuts down gracefully.

teacher (serious)

Before you ship it, think about one more thing: what should you return from the pipeline? Just "done"? Or should you track which orders succeeded and which failed? Production code usually needs to know the status of each order.

student (focused)

I could have the worker return the order ID and a status tuple, and the producer could collect them — but that would need a results queue or something.

teacher (neutral)

Or the worker could put the result back into a results queue that you read after queue.join() completes. But that is Day 14 thinking — concurrent.futures, which gives you a unified API for threads, processes, and async all speaking the same language.

student (amused)

So there is a cleaner pattern coming.

teacher (amused)

There is always a cleaner pattern coming. But you have enough now to fix the pipeline and ship it with confidence.