Yesterday: collect items across pages, then process. Two failure modes at scale: (1) memory grows linearly with total items, (2) a crash on page 17 of 20 loses everything you collected.
The streaming alternative — process each page as it arrives. Memory stays at one-page-worth. A crash on page 17 means pages 1-16 are already done.
processed = 0
page_token = None
for page_num in range(5): # MAX_PAGES
args = {"max_results": 5}
if page_token:
args["page_token"] = page_token
result = toolset.execute_action(Action.GMAIL_FETCH_EMAILS, args)
page = result.get("messages", [])
# process this page immediately — don't collect
for item in page:
# ... do work for `item` here ...
processed += 1
page_token = result.get("nextPageToken")
if not page_token:
break
print(f"streamed {processed} items")No all_items list. Each page is processed and discarded before the next fetch.
When does this matter?
Three signals. (1) Memory — at 100K items × 5KB each, the collect-all list is 500MB; streaming stays small. (2) Recovery — combine streaming with state-across-runs (day 12) and a crash mid-pagination doesn't lose the work that already happened. (3) Latency — first item processed in seconds vs minutes (you don't wait for all pages to download before starting).
Are there downsides?
Yes. Aggregation is harder — if you need a total at the end, you have to count as you go. Sorting across pages is harder — items from different pages are interleaved. For "do something to each item" tasks, streaming wins. For "compute a property of the whole set" tasks, collect-all is simpler.
Collect, then process (yesterday):
all_items = []
for page in pages:
all_items.extend(page)
for item in all_items:
process(item)Memory: O(N). Crash mid-process: lose progress made so far. Aggregations: easy.
Stream:
for page in pages:
for item in page:
process(item)Memory: O(page_size). Crash mid-process: prior pages already done. Aggregations: count as you go.
| Use streaming when | Use collect when |
|---|---|
| N is large (thousands+) | N is small (~100) |
| Each item processed independently | Need to sort/aggregate the whole set |
| Failure-recovery matters | Script runs once, no recovery |
| First-item latency matters | Latency doesn't matter |
| Combining with state-across-runs | Single-shot script |
The killer combination:
checkpoint = read_state() # last processed id
page_token = None
for _ in range(MAX_PAGES):
args = {"page_token": page_token} if page_token else {}
result = fetch(args)
for item in result["items"]:
if item["id"] == checkpoint:
return # caught up
process(item)
write_state(item["id"]) # advance checkpoint
page_token = result.get("nextPageToken")
if not page_token:
breakA crash on item 5,432 of 100K means item 5,432 is unfinished but items 1-5,431 are done and recorded. Re-run reads checkpoint=5431, picks up at 5432. Self-healing.
For totals when streaming:
results = {"ok": 0, "fail": 0}
for page in pages:
for item in page:
try:
process(item)
results["ok"] += 1
except Exception:
results["fail"] += 1One counter dict; updated per item. Final values reflect every item ever processed in this run.
For reusable streaming logic:
def iter_emails(max_pages=5):
page_token = None
for _ in range(max_pages):
...
for item in result["messages"]:
yield item
if not result.get("nextPageToken"):
return
page_token = result["nextPageToken"]
for msg in iter_emails():
process(msg)Generators. Python Patterns covers them properly. For Patterns track, the inline shape is enough.
Yesterday: collect items across pages, then process. Two failure modes at scale: (1) memory grows linearly with total items, (2) a crash on page 17 of 20 loses everything you collected.
The streaming alternative — process each page as it arrives. Memory stays at one-page-worth. A crash on page 17 means pages 1-16 are already done.
processed = 0
page_token = None
for page_num in range(5): # MAX_PAGES
args = {"max_results": 5}
if page_token:
args["page_token"] = page_token
result = toolset.execute_action(Action.GMAIL_FETCH_EMAILS, args)
page = result.get("messages", [])
# process this page immediately — don't collect
for item in page:
# ... do work for `item` here ...
processed += 1
page_token = result.get("nextPageToken")
if not page_token:
break
print(f"streamed {processed} items")No all_items list. Each page is processed and discarded before the next fetch.
When does this matter?
Three signals. (1) Memory — at 100K items × 5KB each, the collect-all list is 500MB; streaming stays small. (2) Recovery — combine streaming with state-across-runs (day 12) and a crash mid-pagination doesn't lose the work that already happened. (3) Latency — first item processed in seconds vs minutes (you don't wait for all pages to download before starting).
Are there downsides?
Yes. Aggregation is harder — if you need a total at the end, you have to count as you go. Sorting across pages is harder — items from different pages are interleaved. For "do something to each item" tasks, streaming wins. For "compute a property of the whole set" tasks, collect-all is simpler.
Collect, then process (yesterday):
all_items = []
for page in pages:
all_items.extend(page)
for item in all_items:
process(item)Memory: O(N). Crash mid-process: lose progress made so far. Aggregations: easy.
Stream:
for page in pages:
for item in page:
process(item)Memory: O(page_size). Crash mid-process: prior pages already done. Aggregations: count as you go.
| Use streaming when | Use collect when |
|---|---|
| N is large (thousands+) | N is small (~100) |
| Each item processed independently | Need to sort/aggregate the whole set |
| Failure-recovery matters | Script runs once, no recovery |
| First-item latency matters | Latency doesn't matter |
| Combining with state-across-runs | Single-shot script |
The killer combination:
checkpoint = read_state() # last processed id
page_token = None
for _ in range(MAX_PAGES):
args = {"page_token": page_token} if page_token else {}
result = fetch(args)
for item in result["items"]:
if item["id"] == checkpoint:
return # caught up
process(item)
write_state(item["id"]) # advance checkpoint
page_token = result.get("nextPageToken")
if not page_token:
breakA crash on item 5,432 of 100K means item 5,432 is unfinished but items 1-5,431 are done and recorded. Re-run reads checkpoint=5431, picks up at 5432. Self-healing.
For totals when streaming:
results = {"ok": 0, "fail": 0}
for page in pages:
for item in page:
try:
process(item)
results["ok"] += 1
except Exception:
results["fail"] += 1One counter dict; updated per item. Final values reflect every item ever processed in this run.
For reusable streaming logic:
def iter_emails(max_pages=5):
page_token = None
for _ in range(max_pages):
...
for item in result["messages"]:
yield item
if not result.get("nextPageToken"):
return
page_token = result["nextPageToken"]
for msg in iter_emails():
process(msg)Generators. Python Patterns covers them properly. For Patterns track, the inline shape is enough.
Create a free account to get started. Paid plans unlock all tracks.