Building Validation Pipelines
Chaining validators, building middleware patterns, and composing multi-step validation flows.
I'm starting to see how individual models work, but in a real system, data goes through multiple stages — parsing, cleaning, validating, enriching. How do I chain these together?
With a validation pipeline — a sequence of models where each stage transforms data for the next. The output of one model becomes the input of the next:
from pydantic import BaseModel, Field, field_validator
# Stage 1: Parse raw input
class RawInput(BaseModel):
name: str
email: str
score: str # comes as string from CSV
# Stage 2: Clean and type
class CleanedRecord(BaseModel):
name: str
email: str
score: float
@field_validator("name")
@classmethod
def clean_name(cls, v):
return v.strip().title()
@field_validator("email")
@classmethod
def clean_email(cls, v):
return v.strip().lower()
# Stage 3: Validate business rules
class ValidatedRecord(BaseModel):
name: str = Field(min_length=1)
email: str
score: float = Field(ge=0, le=100)
grade: str = ""
@field_validator("email")
@classmethod
def must_have_domain(cls, v):
if "." not in v.split("@")[-1]:
raise ValueError("email must have a domain")
return v
Then chain them:
def pipeline(raw: dict) -> dict:
parsed = RawInput(**raw)
cleaned = CleanedRecord(**parsed.model_dump())
validated = ValidatedRecord(**cleaned.model_dump())
return validated.model_dump()
Why separate models instead of one big model with all the rules?
Separation of concerns. Each stage has one job:
- Parse — get the data into Python types
- Clean — normalize and transform
- Validate — enforce business rules
- Enrich — add computed or derived fields
If cleaning fails, you know it's a data quality issue. If validation fails, the data is clean but doesn't meet business rules. Clear error attribution.
Can I make this pattern reusable? Like a generic pipeline runner?
Yes. Write a function that takes a list of model classes and runs data through each one:
def run_pipeline(data: dict, stages: list[type]) -> dict:
current = data
for stage in stages:
model = stage(**current)
current = model.model_dump()
return current
result = run_pipeline(
{"name": " alice ", "email": "ALICE@TEST.COM", "score": "85"},
[RawInput, CleanedRecord, ValidatedRecord]
)
Each stage validates and transforms, passing clean data to the next. If any stage fails, you get a clear error pointing to exactly which stage and field broke.
What about adding metadata at each stage? Like tracking which stages the data has passed through?
Add a stages_passed field that accumulates:
class StageTracker(BaseModel):
data: dict
stages_passed: list[str] = []
def advance(self, stage_name: str, model_class: type) -> "StageTracker":
validated = model_class(**self.data)
return StageTracker(
data=validated.model_dump(),
stages_passed=self.stages_passed + [stage_name]
)
This gives you an audit trail: you know exactly which stages processed the data and where a failure occurred.
This is like middleware in web frameworks.
Exactly the same concept. Each stage is middleware that processes data on its way through the system. Pydantic models make each stage self-documenting and type-safe.
Practice your skills
Sign up to write and run code in this lesson.
Building Validation Pipelines
Chaining validators, building middleware patterns, and composing multi-step validation flows.
I'm starting to see how individual models work, but in a real system, data goes through multiple stages — parsing, cleaning, validating, enriching. How do I chain these together?
With a validation pipeline — a sequence of models where each stage transforms data for the next. The output of one model becomes the input of the next:
from pydantic import BaseModel, Field, field_validator
# Stage 1: Parse raw input
class RawInput(BaseModel):
name: str
email: str
score: str # comes as string from CSV
# Stage 2: Clean and type
class CleanedRecord(BaseModel):
name: str
email: str
score: float
@field_validator("name")
@classmethod
def clean_name(cls, v):
return v.strip().title()
@field_validator("email")
@classmethod
def clean_email(cls, v):
return v.strip().lower()
# Stage 3: Validate business rules
class ValidatedRecord(BaseModel):
name: str = Field(min_length=1)
email: str
score: float = Field(ge=0, le=100)
grade: str = ""
@field_validator("email")
@classmethod
def must_have_domain(cls, v):
if "." not in v.split("@")[-1]:
raise ValueError("email must have a domain")
return v
Then chain them:
def pipeline(raw: dict) -> dict:
parsed = RawInput(**raw)
cleaned = CleanedRecord(**parsed.model_dump())
validated = ValidatedRecord(**cleaned.model_dump())
return validated.model_dump()
Why separate models instead of one big model with all the rules?
Separation of concerns. Each stage has one job:
- Parse — get the data into Python types
- Clean — normalize and transform
- Validate — enforce business rules
- Enrich — add computed or derived fields
If cleaning fails, you know it's a data quality issue. If validation fails, the data is clean but doesn't meet business rules. Clear error attribution.
Can I make this pattern reusable? Like a generic pipeline runner?
Yes. Write a function that takes a list of model classes and runs data through each one:
def run_pipeline(data: dict, stages: list[type]) -> dict:
current = data
for stage in stages:
model = stage(**current)
current = model.model_dump()
return current
result = run_pipeline(
{"name": " alice ", "email": "ALICE@TEST.COM", "score": "85"},
[RawInput, CleanedRecord, ValidatedRecord]
)
Each stage validates and transforms, passing clean data to the next. If any stage fails, you get a clear error pointing to exactly which stage and field broke.
What about adding metadata at each stage? Like tracking which stages the data has passed through?
Add a stages_passed field that accumulates:
class StageTracker(BaseModel):
data: dict
stages_passed: list[str] = []
def advance(self, stage_name: str, model_class: type) -> "StageTracker":
validated = model_class(**self.data)
return StageTracker(
data=validated.model_dump(),
stages_passed=self.stages_passed + [stage_name]
)
This gives you an audit trail: you know exactly which stages processed the data and where a failure occurred.
This is like middleware in web frameworks.
Exactly the same concept. Each stage is middleware that processes data on its way through the system. Pydantic models make each stage self-documenting and type-safe.
Practice your skills
Sign up to write and run code in this lesson.