A pipeline is replay-safe when running it twice over the same input produces the same final state as running it once. The whole pipeline is idempotent end-to-end.
state = {}
def process(events, state):
"""Apply each event to state. Idempotent on event id."""
for event in events:
eid = event["id"]
if eid in state:
continue # already applied — skip
state[eid] = event["value"]
events = [
{"id": "e1", "value": 10},
{"id": "e2", "value": 20},
{"id": "e3", "value": 30},
]
process(events, state)
state_after_run_1 = dict(state)
process(events, state) # replay
state_after_run_2 = dict(state)
print(state_after_run_1 == state_after_run_2)Expected: True. The second run is a no-op because every event ID is already in state.
This is just dedup with state instead of a set?
Same observation, slightly higher level. Dedup keeps a set of seen IDs. Replay safety means the final state itself encodes what's been done. They overlap: most replay-safe pipelines use dedup as one of the mechanisms.
Why is replay safety important if you have dedup?
Because dedup only catches the obvious duplicate (same event.id). Replay safety also covers:
A pipeline that's replay-safe survives every recovery scenario. A pipeline that isn't has a hidden trap waiting.
A pipeline P is replay-safe when:
P(input) == final_state
P(P(input)) == final_state # running twice = same
P(P(P(input))) == final_state # running thrice = same
In database terms: the operation is idempotent.
Real-world scenarios that re-run a pipeline:
Non-replay-safe pipelines turn each of these into a debugging nightmare. Replay-safe pipelines turn them into a quiet re-run.
Three common mechanisms (often combined):
if event_id in state:
return # already applied
state[event_id] = ...First-write-wins, recorded by event_id. The simplest replay-safe shape.
# upsert by (date, user_id) — same input always lands in same row
db.upsert("daily_summary", key=(date, user_id), value=count)The state itself is keyed by something derivable from the input. Re-running just overwrites the same row with the same value.
# state is fully derived from input — no incremental updates
state = {event["id"]: event["value"] for event in events}State is a function of the entire input set. No matter how many times you run, the result is the same.
state["count"] += 1 doubles on replaylist.append(...) duplicatessend_email(...) sends twiceThe fix is always one of: dedup by event ID, key by natural identity, recompute from input.
The trickiest layer. send_email is not idempotent — re-running sends the second email. Mitigations:
Idempotency-Key header)(event_id, action) not in sent"The last is what production systems do for high-stakes effects. The pipeline computes what would happen, persists that, and a separate process actually fires the side effects.
The canonical test is the one in this lesson: run twice over the same input, compare final state, assert equal. Add this test to any pipeline that processes events — it catches non-idempotent updates immediately.
A pipeline is replay-safe when running it twice over the same input produces the same final state as running it once. The whole pipeline is idempotent end-to-end.
state = {}
def process(events, state):
"""Apply each event to state. Idempotent on event id."""
for event in events:
eid = event["id"]
if eid in state:
continue # already applied — skip
state[eid] = event["value"]
events = [
{"id": "e1", "value": 10},
{"id": "e2", "value": 20},
{"id": "e3", "value": 30},
]
process(events, state)
state_after_run_1 = dict(state)
process(events, state) # replay
state_after_run_2 = dict(state)
print(state_after_run_1 == state_after_run_2)Expected: True. The second run is a no-op because every event ID is already in state.
This is just dedup with state instead of a set?
Same observation, slightly higher level. Dedup keeps a set of seen IDs. Replay safety means the final state itself encodes what's been done. They overlap: most replay-safe pipelines use dedup as one of the mechanisms.
Why is replay safety important if you have dedup?
Because dedup only catches the obvious duplicate (same event.id). Replay safety also covers:
A pipeline that's replay-safe survives every recovery scenario. A pipeline that isn't has a hidden trap waiting.
A pipeline P is replay-safe when:
P(input) == final_state
P(P(input)) == final_state # running twice = same
P(P(P(input))) == final_state # running thrice = same
In database terms: the operation is idempotent.
Real-world scenarios that re-run a pipeline:
Non-replay-safe pipelines turn each of these into a debugging nightmare. Replay-safe pipelines turn them into a quiet re-run.
Three common mechanisms (often combined):
if event_id in state:
return # already applied
state[event_id] = ...First-write-wins, recorded by event_id. The simplest replay-safe shape.
# upsert by (date, user_id) — same input always lands in same row
db.upsert("daily_summary", key=(date, user_id), value=count)The state itself is keyed by something derivable from the input. Re-running just overwrites the same row with the same value.
# state is fully derived from input — no incremental updates
state = {event["id"]: event["value"] for event in events}State is a function of the entire input set. No matter how many times you run, the result is the same.
state["count"] += 1 doubles on replaylist.append(...) duplicatessend_email(...) sends twiceThe fix is always one of: dedup by event ID, key by natural identity, recompute from input.
The trickiest layer. send_email is not idempotent — re-running sends the second email. Mitigations:
Idempotency-Key header)(event_id, action) not in sent"The last is what production systems do for high-stakes effects. The pipeline computes what would happen, persists that, and a separate process actually fires the side effects.
The canonical test is the one in this lesson: run twice over the same input, compare final state, assert equal. Add this test to any pipeline that processes events — it catches non-idempotent updates immediately.
Create a free account to get started. Paid plans unlock all tracks.