Yesterday's audit found that hourly re-runs on a 1000-row dataset = 24,000 calls/day. The fix: don't reprocess what last run already saw. Each run handles only items newer than last run's checkpoint.
The pattern:
# Phase 1: read checkpoint (in real code, from a Sheet — today, in-memory)
last_seen_ts = 1000 # the timestamp from last run
# Phase 2: filter new items
all_items = [
{"id": "a", "ts": 800}, # before checkpoint
{"id": "b", "ts": 1500}, # after — new
{"id": "c", "ts": 1200}, # after — new
{"id": "d", "ts": 950}, # before checkpoint
]
new_items = [it for it in all_items if it["ts"] > last_seen_ts]
# Phase 3: process new items, update checkpoint to max ts seen
for it in new_items:
process(it)
new_checkpoint = max((it["ts"] for it in new_items), default=last_seen_ts)
# Phase 4: write checkpoint back (in real code, to a Sheet)
print(f"processed {len(new_items)} items; checkpoint now {new_checkpoint}")Four phases: read checkpoint, filter new, process, write checkpoint.
Why max() and not last? They could be different.
max is safer. If items aren't strictly ordered (and APIs rarely guarantee order), the last item's timestamp might be earlier than another's. Taking max ensures the next run skips everything we just saw.
What if no new items? max([]) would crash.
That's the default=last_seen_ts argument — if the iterable is empty, return the existing checkpoint (no progress, no rollback). Defensive and necessary; an empty input is the normal "nothing happened" case.
# every run, every time
all_items = fetch_everything()
for item in all_items:
process(item)At N=1000 items × 24 runs/day = 24,000 process calls. Most do redundant work.
last_ts = read_checkpoint() # e.g. 1000
all_items = fetch_everything() # still N — but we only process new
new = [i for i in all_items if i["ts"] > last_ts]
for item in new:
process(item)
write_checkpoint(max((i["ts"] for i in new), default=last_ts))First run: process all. Subsequent runs: process only new items. The processing cost matches the input rate, not 24× it.
| Source | Checkpoint |
|---|---|
latest message's internalDate | |
| Calendar | event start_datetime of the latest event |
| Sheet rows | row index of the last-processed row |
| Generic | id of the latest item, if IDs are monotonic |
Use a value the API gives you, not datetime.now() of the run. The run's clock and the API's clock drift; the API's value is authoritative.
Incremental processing is self-healing. Skip a day → next run sees a wider gap → processes everything since the last successful checkpoint. No special "catch up" code needed.
Day 1, 9am: process up to ts=1000, checkpoint=1000
Day 2, 9am: SKIPPED (machine down)
Day 3, 9am: process ts=1001 .. ts=2500, checkpoint=2500
(a 36-hour gap, but no data lost)# right
new = [i for i in items if i["ts"] > last_ts]
# wrong — re-processes the boundary item
new = [i for i in items if i["ts"] >= last_ts]Use > not >=. Otherwise the item that was the checkpoint gets processed every run.
new_checkpoint = max((i["ts"] for i in new), default=last_ts)If no new items, max would crash on an empty iterable. The default=last_ts argument returns the existing checkpoint — no progress, no rollback. The next run starts where this one left off.
Yesterday's audit found that hourly re-runs on a 1000-row dataset = 24,000 calls/day. The fix: don't reprocess what last run already saw. Each run handles only items newer than last run's checkpoint.
The pattern:
# Phase 1: read checkpoint (in real code, from a Sheet — today, in-memory)
last_seen_ts = 1000 # the timestamp from last run
# Phase 2: filter new items
all_items = [
{"id": "a", "ts": 800}, # before checkpoint
{"id": "b", "ts": 1500}, # after — new
{"id": "c", "ts": 1200}, # after — new
{"id": "d", "ts": 950}, # before checkpoint
]
new_items = [it for it in all_items if it["ts"] > last_seen_ts]
# Phase 3: process new items, update checkpoint to max ts seen
for it in new_items:
process(it)
new_checkpoint = max((it["ts"] for it in new_items), default=last_seen_ts)
# Phase 4: write checkpoint back (in real code, to a Sheet)
print(f"processed {len(new_items)} items; checkpoint now {new_checkpoint}")Four phases: read checkpoint, filter new, process, write checkpoint.
Why max() and not last? They could be different.
max is safer. If items aren't strictly ordered (and APIs rarely guarantee order), the last item's timestamp might be earlier than another's. Taking max ensures the next run skips everything we just saw.
What if no new items? max([]) would crash.
That's the default=last_seen_ts argument — if the iterable is empty, return the existing checkpoint (no progress, no rollback). Defensive and necessary; an empty input is the normal "nothing happened" case.
# every run, every time
all_items = fetch_everything()
for item in all_items:
process(item)At N=1000 items × 24 runs/day = 24,000 process calls. Most do redundant work.
last_ts = read_checkpoint() # e.g. 1000
all_items = fetch_everything() # still N — but we only process new
new = [i for i in all_items if i["ts"] > last_ts]
for item in new:
process(item)
write_checkpoint(max((i["ts"] for i in new), default=last_ts))First run: process all. Subsequent runs: process only new items. The processing cost matches the input rate, not 24× it.
| Source | Checkpoint |
|---|---|
latest message's internalDate | |
| Calendar | event start_datetime of the latest event |
| Sheet rows | row index of the last-processed row |
| Generic | id of the latest item, if IDs are monotonic |
Use a value the API gives you, not datetime.now() of the run. The run's clock and the API's clock drift; the API's value is authoritative.
Incremental processing is self-healing. Skip a day → next run sees a wider gap → processes everything since the last successful checkpoint. No special "catch up" code needed.
Day 1, 9am: process up to ts=1000, checkpoint=1000
Day 2, 9am: SKIPPED (machine down)
Day 3, 9am: process ts=1001 .. ts=2500, checkpoint=2500
(a 36-hour gap, but no data lost)# right
new = [i for i in items if i["ts"] > last_ts]
# wrong — re-processes the boundary item
new = [i for i in items if i["ts"] >= last_ts]Use > not >=. Otherwise the item that was the checkpoint gets processed every run.
new_checkpoint = max((i["ts"] for i in new), default=last_ts)If no new items, max would crash on an empty iterable. The default=last_ts argument returns the existing checkpoint — no progress, no rollback. The next run starts where this one left off.
Create a free account to get started. Paid plans unlock all tracks.