You are viewing a free preview of this lesson.
Subscribe to unlock all 10 lessons in this course and every other course on LearningBro.
Data engineering is the discipline of designing, building, and maintaining systems that collect, store, and transform data for analysis and machine learning. Python has become the dominant language in this space thanks to its rich ecosystem and readability. This lesson covers the Python ecosystem for data engineering, key libraries, project setup, and how to use type hints effectively in data code.
Python dominates data engineering for several reasons:
┌────────────────────────────────────────────────────────┐
│ DATA SOURCES │
│ APIs │ Databases │ Files │ Streams │ Webhooks │
└────────┬───────────────────────────────────┬───────────┘
│ │
▼ ▼
┌─────────────────┐ ┌───────────────────────┐
│ EXTRACT │ │ ORCHESTRATION │
│ requests │ │ Airflow / Prefect │
│ sqlalchemy │ │ scheduling, retries │
│ boto3 │ └───────────────────────┘
└────────┬────────┘
│
▼
┌─────────────────┐
│ TRANSFORM │
│ pandas / polars │
│ pydantic │
│ great_expect. │
└────────┬────────┘
│
▼
┌─────────────────┐
│ LOAD │
│ sqlalchemy │
│ psycopg2 │
│ pyarrow │
└─────────────────┘
| Library | Purpose | Install Command |
|---|---|---|
pandas | Data manipulation and analysis | pip install pandas |
polars | Fast DataFrame library (Rust-backed) | pip install polars |
sqlalchemy | Database toolkit and ORM | pip install sqlalchemy |
pydantic | Data validation and settings | pip install pydantic |
pyarrow | Parquet and Arrow file support | pip install pyarrow |
great_expectations | Data quality validation | pip install great_expectations |
requests | HTTP client for API extraction | pip install requests |
aiohttp | Async HTTP client | pip install aiohttp |
prefect | Workflow orchestration | pip install prefect |
pytest | Testing framework | pip install pytest |
data-pipeline/
├── .env # secrets (git-ignored)
├── .gitignore
├── pyproject.toml # dependencies and project metadata
├── src/
│ ├── __init__.py
│ ├── extract/
│ │ ├── __init__.py
│ │ ├── api_client.py # API extraction logic
│ │ └── db_reader.py # Database extraction
│ ├── transform/
│ │ ├── __init__.py
│ │ ├── clean.py # Data cleaning functions
│ │ └── enrich.py # Data enrichment
│ ├── load/
│ │ ├── __init__.py
│ │ └── warehouse.py # Load to destination
│ ├── models/
│ │ ├── __init__.py
│ │ └── schemas.py # Pydantic models
│ └── pipeline.py # Main pipeline orchestration
├── tests/
│ ├── __init__.py
│ ├── test_extract.py
│ ├── test_transform.py
│ └── fixtures/
│ └── sample_data.csv
├── dags/ # Airflow DAGs (if using)
│ └── daily_pipeline.py
└── README.md
[project]
name = "data-pipeline"
version = "0.1.0"
requires-python = ">=3.11"
dependencies = [
"pandas>=2.0",
"sqlalchemy>=2.0",
"pydantic>=2.0",
"pyarrow>=14.0",
"python-dotenv>=1.0",
"requests>=2.31",
]
[project.optional-dependencies]
dev = [
"pytest>=7.0",
"ruff>=0.1.0",
"mypy>=1.5",
]
# Create and activate a virtual environment
python3 -m venv .venv
source .venv/bin/activate
# Install the project in editable mode
pip install -e ".[dev]"
Type hints make data pipelines far more maintainable. They catch bugs early, improve IDE support, and serve as living documentation.
from typing import Optional
def calculate_revenue(
price: float,
quantity: int,
discount: Optional[float] = None,
) -> float:
"""Calculate revenue with optional discount."""
total = price * quantity
if discount is not None:
total *= (1 - discount)
return total
import pandas as pd
def clean_customer_data(df: pd.DataFrame) -> pd.DataFrame:
"""Clean and standardise customer data."""
return (
df
.dropna(subset=["email"])
.assign(
email=lambda x: x["email"].str.lower().str.strip(),
name=lambda x: x["name"].str.title(),
)
)
def load_csv(path: str) -> pd.DataFrame:
"""Load a CSV file into a DataFrame."""
return pd.read_csv(path)
from typing import TypedDict
class CustomerRow(TypedDict):
id: int
name: str
email: str
signup_date: str
total_orders: int
def process_customer(row: CustomerRow) -> dict[str, str | int]:
"""Process a single customer row."""
return {
"id": row["id"],
"display_name": row["name"].upper(),
"order_count": row["total_orders"],
}
from pydantic import BaseModel, EmailStr
from datetime import date
class Customer(BaseModel):
id: int
name: str
email: EmailStr
signup_date: date
total_orders: int = 0
# Validates at runtime
customer = Customer(
id=1,
name="Alice Smith",
email="alice@example.com",
signup_date="2024-01-15",
total_orders=42,
)
from dotenv import load_dotenv
import os
load_dotenv()
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///local.db")
API_KEY = os.getenv("API_KEY")
BATCH_SIZE = int(os.getenv("BATCH_SIZE", "1000"))
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
database_url: str = "sqlite:///local.db"
api_key: str
batch_size: int = 1000
debug: bool = False
class Config:
env_file = ".env"
settings = Settings()
print(settings.database_url)
# Install mypy
pip install mypy
# Run type checking
mypy src/ --ignore-missing-imports
# For stricter checking
mypy src/ --strict --ignore-missing-imports
src/transform/clean.py:15: error: Argument "price" to "calculate_revenue"
has incompatible type "str"; expected "float"
Found 1 error in 1 file (checked 5 source files)