Day 28 · ~13m

Building Validation Pipelines

Chaining validators, building middleware patterns, and composing multi-step validation flows.

🧑‍💻

I'm starting to see how individual models work, but in a real system, data goes through multiple stages — parsing, cleaning, validating, enriching. How do I chain these together?

👩‍🏫

With a validation pipeline — a sequence of models where each stage transforms data for the next. The output of one model becomes the input of the next:

from pydantic import BaseModel, Field, field_validator

# Stage 1: Parse raw input
class RawInput(BaseModel):
    name: str
    email: str
    score: str  # comes as string from CSV

# Stage 2: Clean and type
class CleanedRecord(BaseModel):
    name: str
    email: str
    score: float

    @field_validator("name")
    @classmethod
    def clean_name(cls, v):
        return v.strip().title()

    @field_validator("email")
    @classmethod
    def clean_email(cls, v):
        return v.strip().lower()

# Stage 3: Validate business rules
class ValidatedRecord(BaseModel):
    name: str = Field(min_length=1)
    email: str
    score: float = Field(ge=0, le=100)
    grade: str = ""

    @field_validator("email")
    @classmethod
    def must_have_domain(cls, v):
        if "." not in v.split("@")[-1]:
            raise ValueError("email must have a domain")
        return v

Then chain them:

def pipeline(raw: dict) -> dict:
    parsed = RawInput(**raw)
    cleaned = CleanedRecord(**parsed.model_dump())
    validated = ValidatedRecord(**cleaned.model_dump())
    return validated.model_dump()
🧑‍💻

Why separate models instead of one big model with all the rules?

👩‍🏫

Separation of concerns. Each stage has one job:

  1. Parse — get the data into Python types
  2. Clean — normalize and transform
  3. Validate — enforce business rules
  4. Enrich — add computed or derived fields

If cleaning fails, you know it's a data quality issue. If validation fails, the data is clean but doesn't meet business rules. Clear error attribution.

🧑‍💻

Can I make this pattern reusable? Like a generic pipeline runner?

👩‍🏫

Yes. Write a function that takes a list of model classes and runs data through each one:

def run_pipeline(data: dict, stages: list[type]) -> dict:
    current = data
    for stage in stages:
        model = stage(**current)
        current = model.model_dump()
    return current

result = run_pipeline(
    {"name": "  alice  ", "email": "ALICE@TEST.COM", "score": "85"},
    [RawInput, CleanedRecord, ValidatedRecord]
)

Each stage validates and transforms, passing clean data to the next. If any stage fails, you get a clear error pointing to exactly which stage and field broke.

🧑‍💻

What about adding metadata at each stage? Like tracking which stages the data has passed through?

👩‍🏫

Add a stages_passed field that accumulates:

class StageTracker(BaseModel):
    data: dict
    stages_passed: list[str] = []

    def advance(self, stage_name: str, model_class: type) -> "StageTracker":
        validated = model_class(**self.data)
        return StageTracker(
            data=validated.model_dump(),
            stages_passed=self.stages_passed + [stage_name]
        )

This gives you an audit trail: you know exactly which stages processed the data and where a failure occurred.

🧑‍💻

This is like middleware in web frameworks.

👩‍🏫

Exactly the same concept. Each stage is middleware that processes data on its way through the system. Pydantic models make each stage self-documenting and type-safe.

Practice your skills

Sign up to write and run code in this lesson.

Already have an account? Sign in