A chain reads from two tools and combines what it got. Three messages from Gmail, two events from Calendar — print the combined count.
emails = toolset.execute_action(Action.GMAIL_FETCH_EMAILS, {"max_results": 3})
messages = emails.get("messages", [])
cals = toolset.execute_action(Action.GOOGLECALENDAR_LIST_CALENDARS, {})
calendars = cals.get("items", []) or cals.get("response_data", {}).get("items", [])
total = len(messages) + len(calendars)
print(f"emails={len(messages)} calendars={len(calendars)} total={total}")Aggregation = combining. The simplest case: len(a) + len(b). More complex: pair up by key, merge dicts, take a max.
Why is this its own lesson?
Because the alternative is a lot of print(len(emails)) calls scattered through the script. Aggregation gives you one number to log, one number to compare against last run, one line of output. That's the difference between a script you can monitor and a script you can't.
What if the two tools return different shapes — incompatible to combine?
Reshape first (yesterday's lesson), aggregate second. Typical pipeline: read A, read B, reshape both into a common form ({"id": ..., "source": "A"}), then combined = list_a + list_b. Day 6 covers more shapes.
total = len(emails) + len(events) + len(tasks)Readable, no surprises. Use when you only care "how much across all sources".
all_items = []
for msg in messages:
all_items.append({"source": "gmail", "id": msg["id"]})
for evt in events:
all_items.append({"source": "calendar", "id": evt["id"]})Reshape on the way in (uniform {source, id} dicts) so downstream code doesn't have to know which API a given item came from.
total_size = sum(m.get("size_bytes", 0) for m in messages)sum(...) over a generator. .get(field, 0) for missing fields — a missing size shouldn't crash the whole aggregation.
When you need a counter-per-category:
from collections import defaultdict
by_source = defaultdict(int)
for m in messages:
by_source[m.get("from", "unknown")] += 1defaultdict(int) auto-initializes missing keys to zero. Standard library, no import gymnastics.
After aggregating, one line of output. Not one per source.
print(f"emails={len(messages)} calendars={len(calendars)} total={total}")Log-grep friendly. Future-you, debugging at 3am, reads one line not five.
Aggregation throws away detail. If your downstream needs to know which items came from which source, keep the per-item shape. Aggregate only at the report boundary.
A chain reads from two tools and combines what it got. Three messages from Gmail, two events from Calendar — print the combined count.
emails = toolset.execute_action(Action.GMAIL_FETCH_EMAILS, {"max_results": 3})
messages = emails.get("messages", [])
cals = toolset.execute_action(Action.GOOGLECALENDAR_LIST_CALENDARS, {})
calendars = cals.get("items", []) or cals.get("response_data", {}).get("items", [])
total = len(messages) + len(calendars)
print(f"emails={len(messages)} calendars={len(calendars)} total={total}")Aggregation = combining. The simplest case: len(a) + len(b). More complex: pair up by key, merge dicts, take a max.
Why is this its own lesson?
Because the alternative is a lot of print(len(emails)) calls scattered through the script. Aggregation gives you one number to log, one number to compare against last run, one line of output. That's the difference between a script you can monitor and a script you can't.
What if the two tools return different shapes — incompatible to combine?
Reshape first (yesterday's lesson), aggregate second. Typical pipeline: read A, read B, reshape both into a common form ({"id": ..., "source": "A"}), then combined = list_a + list_b. Day 6 covers more shapes.
total = len(emails) + len(events) + len(tasks)Readable, no surprises. Use when you only care "how much across all sources".
all_items = []
for msg in messages:
all_items.append({"source": "gmail", "id": msg["id"]})
for evt in events:
all_items.append({"source": "calendar", "id": evt["id"]})Reshape on the way in (uniform {source, id} dicts) so downstream code doesn't have to know which API a given item came from.
total_size = sum(m.get("size_bytes", 0) for m in messages)sum(...) over a generator. .get(field, 0) for missing fields — a missing size shouldn't crash the whole aggregation.
When you need a counter-per-category:
from collections import defaultdict
by_source = defaultdict(int)
for m in messages:
by_source[m.get("from", "unknown")] += 1defaultdict(int) auto-initializes missing keys to zero. Standard library, no import gymnastics.
After aggregating, one line of output. Not one per source.
print(f"emails={len(messages)} calendars={len(calendars)} total={total}")Log-grep friendly. Future-you, debugging at 3am, reads one line not five.
Aggregation throws away detail. If your downstream needs to know which items came from which source, keep the per-item shape. Aggregate only at the report boundary.
Create a free account to get started. Paid plans unlock all tracks.