Table of Contents
In most data workflows, the ETL process - Extract, Transform, Load - is where everything begins.
It’s the backbone of analytics, reporting, and data-driven applications. In practice, ETL means:
- Extract: pulling data from one or more sources (like APIs, CSV files, or databases).
- Transform: cleaning, validating, and reshaping that data into a consistent structure.
- Load: storing the clean data into a target system such as a database, data warehouse, or analytics tool.
While the steps sound simple, the “T” (Transform) stage often becomes the most painful. Real-world data is rarely clean, you’ll encounter missing fields, inconsistent types, malformed values, and human errors. Maybe someone typed "N/A"
in a numeric column or used "bob[at]example.com"
as an email. Without strong validation, these small inconsistencies can silently break pipelines or cause downstream bugs that are hard to trace.
That’s where Pydantic comes in. Built on top of Python’s type hints, Pydantic lets you define clear data models that automatically validate and parse incoming data. It can convert types (like strings to dates), enforce constraints, and produce detailed error messages when something doesn’t match your expectations. It helps ensure your ETL pipeline only loads trusted, structured, and predictable data.
In this tutorial, we’ll build a mini ETL pipeline using Pydantic. You’ll learn how to:
- Define data schemas with validation rules.
- Read raw data from a CSV file.
- Automatically detect and fix invalid records.
- Separate clean data from bad data for safe loading.
By the end, you’ll see how easily Pydantic can turn messy real-world data into a well-structured dataset you can rely on.
Source Code available at: https://github.com/nunombispo/PydanticETL-Article
Why Pydantic Fits Perfectly in ETL
When building ETL pipelines, data reliability is everything. You need to be confident that every record entering your system meets your expectations. Traditionally, this means writing dozens of conditional checks and try–except blocks to catch type errors, missing fields, or invalid values. That approach works, but it’s fragile, repetitive, and hard to maintain as your data model evolves.
Pydantic changes that completely. It lets you define your expected data shape using Python’s type hints, and it automatically handles validation, parsing, and error reporting for you. Instead of writing validation logic by hand, you describe what your data should look like, and Pydantic enforces it.
Here’s why it’s such a good fit for ETL work:
- Declarative schemas: Define your data structure once, and let Pydantic handle enforcement. A simple model with type hints (
int
,str
,float
,datetime
, etc.) becomes both a validator and a documentation source for your ETL process. - Automatic type conversion: Pydantic can coerce strings like
"42"
into integers,"2025-10-06"
intodatetime.date
, and more. All without extra parsing code. This is invaluable when you’re dealing with CSVs or APIs that don’t always follow strict typing rules. - Detailed error handling: When something goes wrong, Pydantic provides structured, human-readable error messages showing exactly which fields failed and why. This makes debugging and error reporting in ETL pipelines much simpler.
- Clear data contracts: Pydantic models act as a contract between stages of your pipeline. Whether data comes from a file, an API, or a database, you can ensure that only records matching your schema pass through to the next stage.
Without Pydantic, a typical validation workflow might look like this:
def validate_record(record):
if not record.get("email") or "@" not in record["email"]:
raise ValueError("Invalid email")
try:
record["signup_date"] = datetime.strptime(record["signup_date"], "%Y-%m-%d")
except ValueError:
raise ValueError("Invalid date format")
record["spend"] = float(record["spend"])
return record
With Pydantic, all that logic becomes declarative and reusable:
from datetime import date
from pydantic import BaseModel, EmailStr
class Customer(BaseModel):
id: int
name: str
email: EmailStr
signup_date: date
spend: float
Now, Pydantic automatically enforces types, validates formats, and raises precise errors when something doesn’t fit.
For ETL developers, this means less code, fewer bugs, and cleaner data. All while keeping your pipeline easy to maintain and extend.

Stop guessing your financial route. PortfolioPilot.com is like Google Maps for your finances - guiding you step by step, from cutting fees to planning for retirement. Get personal financial advice today, free for 10 days, no credit card required.
Disclosure: PortfolioPilot is a technology product of Global Predictions Inc., an SEC registered investment adviser. Investing involves risk, including possible loss of principal. No guarantee of future results. Global Predictions does not provide tax advice. See full disclosures at globalpredictions.com/disclosures.
Example Scenario - Customer Data from CSV
Let’s use a simple, realistic example. Suppose your team receives a daily export of customer data from a marketing platform in CSV format. This dataset will eventually feed into your analytics database, but before you can trust it, you need to make sure it’s clean and valid.
Here’s a small snippet of the raw file, customers.csv
:
id,name,email,signup_date,spend
1, Alice ,alice@example.com,2025-01-10,100.50
2,Bob,bob[at]example.com,2025-02-05,not_available
3,Charlie,charlie@example.com,invalid_date,200
4,,diana@example.com,2025-03-12,150
At first glance, the file looks fine, until you start checking each row carefully. A few problems quickly appear:
- Invalid email format – Row 2 uses
bob[at]example.com
instead of a proper email address. - Non-numeric value – Row 2’s
spend
column says"not_available"
, which can’t be converted to a float. - Bad date – Row 3’s
signup_date
is"invalid_date"
, which isn’t a valid ISO date. - Missing name – Row 4 has an empty
name
field, which should probably be required.
If you were to load this data directly into a database or data warehouse, you’d risk corrupting your records or breaking downstream transformations.
Step 1 - Define a Pydantic Model
The first step in building a Pydantic-powered ETL pipeline is to define a data model that represents what a valid record looks like. This model acts as a contract for the data flowing through your system, if something doesn’t match, it’s rejected before it can cause problems downstream.
In terms of requirements, we will need to install the following libraries:
pip install pydantic pydantic[email] sqlalchemy
Besides pydantic
itself, we will also use pydantic[email]
for email fields and sqlalchemy
to later on write to a SQLite database.
Let’s start by defining a Customer
model using Pydantic’s BaseModel
.
from datetime import date
from pydantic import BaseModel, EmailStr, Field
class Customer(BaseModel):
id: int
name: str = Field(..., min_length=1) # Must be a non-empty string
email: EmailStr # Must be a valid email address
signup_date: date # Must follow ISO date format
spend: float = Field(..., ge=0) # Must be a float >= 0
Let’s break down what’s happening here:
BaseModel
- the foundation of every Pydantic model. It automatically validates and converts types for all fields.id: int
- ensures each customer’s ID is an integer. If a string like"1"
is provided, Pydantic will convert it automatically.name: str = Field(..., min_length=1)
- marks the name field as required (...
) and enforces that it cannot be empty.email: EmailStr
- a built-in Pydantic type that validates standard email formats.signup_date: date
- automatically parses strings like"2025-01-10"
into Pythondate
objects.spend: float = Field(..., ge=0)
- ensures the spend amount is a positive number. If"100.50"
is passed as a string, Pydantic converts it to a float.
Optional vs. Required Fields
By default, all fields in a Pydantic model are required unless you make them optional using Optional
or provide a default value.
For example, if you wanted to make spend
optional (in case some customers haven’t purchased yet), you could write:
from typing import Optional
class Customer(BaseModel):
id: int
name: str
email: EmailStr
signup_date: date
spend: Optional[float] = Field(default=0, ge=0)
If the spend
column is missing or empty, it will default to 0.0
rather than raising a validation error.
Defining a clear BaseModel
like this sets the foundation for reliable ETL, every record you process now has a consistent, validated structure.
If you’d like to dive deeper into building robust data models, check out my book Practical Pydantic - a hands-on guide to mastering Pydantic for real-world use cases like APIs, configurations, and data pipelines.
Step 2 - Extract and Validate Data
With your Customer
model defined, it’s time to start the ETL process, beginning with the Extract and Transform steps. We’ll read the raw data from the CSV file, use Pydantic to validate each row, and separate the clean records from the bad ones.
Reading the CSV
Python’s built-in csv
module is a great lightweight option for parsing structured text data. Using csv.DictReader
, each line in the file is converted into a dictionary where the keys are the column headers.
import csv
with open("customers.csv", newline="") as f:
reader = csv.DictReader(f)
for row in reader:
print(row)
Sample output:
{'id': '1', 'name': ' Alice ', 'email': 'alice@example.com', 'signup_date': '2025-01-10', 'spend': '100.50'}
{'id': '2', 'name': 'Bob', 'email': 'bob[at]example.com', 'signup_date': '2025-02-05', 'spend': 'not_available'}
{'id': '3', 'name': 'Charlie', 'email': 'charlie@example.com', 'signup_date': 'invalid_date', 'spend': '200'}
{'id': '4', 'name': '', 'email': 'diana@example.com', 'signup_date': '2025-03-12', 'spend': '150'}
Notice that all values are strings, even the numbers and dates. That’s where Pydantic comes in.
Validating Each Record
We’ll loop through each row and try to create a Customer
instance. If the data passes validation, it becomes a proper Python object with correctly typed fields. If not, Pydantic raises a ValidationError
, which we can catch and log.
from pydantic import ValidationError
valid_customers = []
invalid_rows = []
with open("customers.csv", newline="") as f:
reader = csv.DictReader(f)
for row in reader:
try:
customer = Customer(**row)
valid_customers.append(customer)
except ValidationError as e:
invalid_rows.append({"row": row, "errors": e.errors()})
This code does this:
Customer(**row)
attempts to unpack the CSV row into the Pydantic model.- Pydantic automatically:
- Converts types (e.g., strings → floats or dates).
- Validates formats (e.g., proper email addresses).
- Enforces constraints (e.g., non-empty name, positive spend).
- If validation fails, a
ValidationError
is raised with detailed error messages.
Outputting the data with:
print("✅ Valid Records:")
for c in valid_customers:
print(c.dict())
print("\n❌ Invalid Records:")
for item in invalid_rows:
print("Row:", item["row"])
print("Errors:", item["errors"])
print()
Produces this output:
✅ Valid Records:
{'id': 1, 'name': ' Alice ', 'email': 'alice@example.com', 'signup_date': datetime.date(2025, 1, 10), 'spend': 100.5}
❌ Invalid Records:
Row: {'id': '2', 'name': 'Bob', 'email': 'bob[at]example.com', 'signup_date': '2025-02-05', 'spend': 'not_available'}
Errors: [{'type': 'value_error', 'loc': ('email',), 'msg': 'value is not a valid email address: An email address must have an @-sign.', 'input': 'bob[at]example.com', 'ctx': {'reason': 'An email address must have an @-sign.'}}, {'type': 'float_parsing', 'loc': ('spend',), 'msg': 'Input should be a valid number, unable to parse string as a number', 'input': 'not_available', 'url': 'https://errors.pydantic.dev/2.11/v/float_parsing'}]
Row: {'id': '3', 'name': 'Charlie', 'email': 'charlie@example.com', 'signup_date': 'invalid_date', 'spend': '200'}
Errors: [{'type': 'date_from_datetime_parsing', 'loc': ('signup_date',), 'msg': 'Input should be a valid date or datetime, invalid character in year', 'input': 'invalid_date', 'ctx': {'error': 'invalid character in year'}, 'url': 'https://errors.pydantic.dev/2.11/v/date_from_datetime_parsing'}]
Row: {'id': '4', 'name': '', 'email': 'diana@example.com', 'signup_date': '2025-03-12', 'spend': '150'}
Errors: [{'type': 'string_too_short', 'loc': ('name',), 'msg': 'String should have at least 1 character', 'input': '', 'ctx': {'min_length': 1}, 'url': 'https://errors.pydantic.dev/2.11/v/string_too_short'}]
By the end of this step, you’ll have two clean lists:
valid_customers
: ready to be loaded into your destination system.invalid_rows
: to be reviewed, logged, or fixed before reprocessing.
This simple separation gives your ETL pipeline a clear boundary between trusted and untrusted data, and it happens automatically with Pydantic’s validation layer.
Step 3 - Transform and Clean Data
Now that you can separate valid from invalid records, it’s time to make your data a bit smarter. The Transform step in ETL isn’t just about validation, it’s also where you enrich, standardize, or correct your data before loading it into a destination system.
Even with Pydantic handling most of the heavy lifting, you’ll often want to customize how certain fields are parsed or cleaned. This can be done either by:
- Using Pydantic validators to handle field-level transformations.
- Applying post-processing functions after validation to modify or enrich data.
Let’s look at both approaches.
Using Pydantic Validators
Pydantic allows you to define custom validation logic directly inside your model using the @field_validator
(in v2) or @validator
(in v1) decorator.
For example, suppose you want to:
- Strip extra whitespace from names.
- Replace “not_available” with
0
for spend. - Correct the
[at]
in the email field. - Ensure names are title-cased for consistency.
You can do all of this inside your model:
from datetime import date
from pydantic import BaseModel, EmailStr, Field, field_validator
class Customer(BaseModel):
id: int
name: str = Field(..., min_length=1)
email: EmailStr
signup_date: date
spend: float = Field(..., ge=0)
# Normalize names (strip whitespace and title case)
@field_validator("name")
def normalize_name(cls, v):
return v.strip().title()
# Handle non-numeric spend values
@field_validator("spend", mode="before")
def clean_spend(cls, v):
if v in ("not_available", "", None):
return 0.0
return float(v)
# Fix malformed emails like "bob[at]example.com"
@field_validator("email", mode="before")
def fix_email_format(cls, v):
if "[at]" in v:
v = v.replace("[at]", "@")
return v
Here’s what happens:
normalize_name()
runs after Pydantic parses the value, ensuring that" Alice "
becomes"Alice"
.clean_spend()
runs before type coercion, catching invalid or placeholder strings like"not_available"
and converting them into a default numeric value.fix_email_format()
runs before email validation and replaces"[at]"
with"@"
, allowing theEmailStr
validator to pass successfully.
This small correction lets you automatically repair certain data quality issues without rejecting otherwise good records.
Post-Processing Transformations
Sometimes, transformations depend on multiple fields or require logic that’s better handled outside the model. For example, adding a computed field or normalizing based on a rule.
You can apply these transformations right after validation:
cleaned_customers = []
for customer in valid_customers:
record = customer.model_dump()
record["email_domain"] = record["email"].split("@")[-1]
cleaned_customers.append(record)
print("✅ Valid Records:")
for c in cleaned_customers:
print(c)
print("\n❌ Invalid Records:")
for item in invalid_rows:
print("Row:", item["row"])
print("Errors:", item["errors"])
print()
This adds a new field (email_domain
) to each record, useful for analytics or grouping later.
Putting It All Together
After applying both validation and transformation logic, your ETL pipeline now does three key things automatically:
- Rejects invalid data with detailed error messages.
- Cleans borderline data (like “not_available”) into standardized formats.
- Cleans minor data issues automatically (e.g.,
[at]
→@
). - Transforms validated data into structured, analytics-friendly objects.
Here’s an example of the output after transformation:
✅ Valid Records:
{'id': 1, 'name': 'Alice', 'email': 'alice@example.com', 'signup_date': datetime.date(2025, 1, 10), 'spend': 100.5, 'email_domain': 'example.com'}
{'id': 2, 'name': 'Bob', 'email': 'bob@example.com', 'signup_date': datetime.date(2025, 2, 5), 'spend': 0.0, 'email_domain': 'example.com'}
...
By using validators and transformation logic together, you can make your ETL pipeline resilient against bad inputs while ensuring the output is consistent, typed, and ready for loading.
Step 4 - Load Valid Data
Once your data has been validated and transformed, the final step in the ETL pipeline is to load it into the destination system.
This could be a database, a CSV file, or even an analytics tool. At this stage, you can also handle invalid rows by logging or exporting them for review.
Saving or Printing Valid Records
For small datasets or testing purposes, you can simply print the validated records:
print("✅ Valid Records:")
for c in valid_customers:
print(c.model_dump())
Or save them to a new CSV file for downstream use:
import csv
with open("clean_customers.csv", "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=["id", "name", "email", "signup_date", "spend", "email_domain"])
writer.writeheader()
for customer in cleaned_customers:
writer.writerow(customer)
This ensures that only validated, cleaned, and transformed data is persisted.
id,name,email,signup_date,spend,email_domain
1,Alice,alice@example.com,2025-01-10,100.5,example.com
2,Bob,bob@example.com,2025-02-05,0.0,example.com
Logging or Exporting Invalid Records
It’s equally important to keep track of rows that failed validation. You can save them for manual inspection or automated correction:
import json
with open("invalid_customers.json", "w") as f:
json.dump(invalid_rows, f, indent=2)
This produces a structured log of problematic rows along with detailed error messages from Pydantic, making it easier to debug or fix issues in the source data.
[
{
"row": {
"id": "3",
"name": "Charlie",
"email": "charlie@example.com",
"signup_date": "invalid_date",
"spend": "200"
},
"errors": [
{
"type": "date_from_datetime_parsing",
"loc": [
"signup_date"
],
"msg": "Input should be a valid date or datetime, invalid character in year",
"input": "invalid_date",
"ctx": {
"error": "invalid character in year"
},
"url": "https://errors.pydantic.dev/2.11/v/date_from_datetime_parsing"
}
]
},
{
"row": {
"id": "4",
"name": "",
"email": "diana@example.com",
"signup_date": "2025-03-12",
"spend": "150"
},
"errors": [
{
"type": "string_too_short",
"loc": [
"name"
],
"msg": "String should have at least 1 character",
"input": "",
"ctx": {
"min_length": 1
},
"url": "https://errors.pydantic.dev/2.11/v/string_too_short"
}
]
}
]
Inserting Validated Data into a Database
If you’re loading data into a relational database (e.g., PostgreSQL or SQLite), you can use a library like psycopg2
or SQLAlchemy
. Here’s a simplified example using SQLAlchemy
for SQLite:
from sqlalchemy import create_engine, Table, Column, Integer, String, Float, Date, MetaData
engine = create_engine("sqlite:///customers.db")
metadata = MetaData()
customers_table = Table(
"customers",
metadata,
Column("id", Integer, primary_key=True),
Column("name", String),
Column("email", String),
Column("signup_date", Date),
Column("spend", Float),
)
metadata.create_all(engine)
with engine.connect() as conn:
for customer in valid_customers:
conn.execute(customers_table.insert(), customer.model_dump())
conn.commit()
Here:
- Only validated and cleaned records are inserted.
- Invalid records remain in your logs for later inspection.
By following this approach, your ETL pipeline ensures that:
- Clean, reliable data reaches your database or downstream systems.
- Problematic rows are never silently ignored, they’re logged for review.
- You can automate the full ETL process while maintaining transparency and traceability.
Want to go further with Pydantic?
Check out my recent book Practical Pydantic - a hands-on guide that takes you beyond the basics. You’ll learn how to design robust data models, handle complex validations, and apply Pydantic in real-world projects like APIs and ETL pipelines. A great next step if you want to level up your data engineering skills with Python.
Conclusion
Using Pydantic in ETL pipelines provides a powerful, Pythonic way to validate, clean, and transform data before it reaches your destination systems. By defining clear schemas and leveraging built-in validation, you can catch errors early, enforce consistent data formats, and reduce the risk of downstream bugs. This makes your pipelines more reproducible, reliable, and maintainable, as every record is processed according to the same rules and invalid data is automatically flagged.
Beyond basic validation, Pydantic allows you to implement custom transformations and cleaning logic, making it easy to standardize data and enrich records for downstream use. Declarative models act as a single source of truth, simplifying maintenance as your data requirements evolve. Combined with logging of invalid rows, this approach gives you transparency and confidence in your ETL process.
Once you’re comfortable with this workflow, you can integrate it with production tools like Airflow, dbt, or Snowflake to build robust, automated pipelines. Start by applying these patterns to your own ETL scripts, defining Pydantic models for your raw data, validating and transforming records, and only loading clean data. With this approach, you can turn messy, unpredictable datasets into a reliable foundation for analytics and business intelligence.
Follow me on Twitter: https://twitter.com/DevAsService
Follow me on Instagram: https://www.instagram.com/devasservice/
Follow me on TikTok: https://www.tiktok.com/@devasservice
Follow me on YouTube: https://www.youtube.com/@DevAsService