You are viewing a free preview of this lesson.
Subscribe to unlock all 10 lessons in this course and every other course on LearningBro.
ETL (Extract-Transform-Load) is the backbone of data engineering. An ETL pipeline extracts data from one or more sources, transforms it into a usable format, and loads it into a destination. This lesson covers pipeline architecture, idempotency, incremental loads, and error handling.
┌──────────────┐ ┌────────────────┐ ┌──────────────┐
│ EXTRACT │────▶│ TRANSFORM │────▶│ LOAD │
│ │ │ │ │ │
│ - APIs │ │ - Clean │ │ - Database │
│ - Databases │ │ - Validate │ │ - Data Lake │
│ - Files │ │ - Enrich │ │ - Warehouse │
│ - Streams │ │ - Aggregate │ │ - Files │
└──────────────┘ └────────────────┘ └──────────────┘
import pandas as pd
from sqlalchemy import create_engine
from datetime import datetime
def extract(api_url: str) -> pd.DataFrame:
"""Extract data from an API endpoint."""
import requests
response = requests.get(api_url, timeout=30)
response.raise_for_status()
return pd.DataFrame(response.json()["data"])
def transform(df: pd.DataFrame) -> pd.DataFrame:
"""Clean and transform the extracted data."""
return (
df
.dropna(subset=["email", "name"])
.assign(
email=lambda x: x["email"].str.lower().str.strip(),
name=lambda x: x["name"].str.title(),
extracted_at=datetime.utcnow(),
)
.drop_duplicates(subset=["email"])
)
def load(df: pd.DataFrame, table_name: str, engine) -> int:
"""Load the transformed data into a database table."""
rows = df.to_sql(table_name, engine, if_exists="append", index=False)
return rows or len(df)
def run_pipeline():
"""Run the full ETL pipeline."""
engine = create_engine("postgresql://user:pass@localhost/warehouse")
print("Extracting...")
raw = extract("https://api.example.com/customers")
print(f" Extracted {len(raw)} rows")
print("Transforming...")
clean = transform(raw)
print(f" Transformed to {len(clean)} rows")
print("Loading...")
loaded = load(clean, "customers", engine)
print(f" Loaded {loaded} rows")
if __name__ == "__main__":
run_pipeline()
from dataclasses import dataclass, field
from typing import Any
import logging
logger = logging.getLogger(__name__)
@dataclass
class PipelineResult:
extracted: int = 0
transformed: int = 0
loaded: int = 0
errors: list[str] = field(default_factory=list)
duration_seconds: float = 0.0
class ETLPipeline:
def __init__(self, name: str, source_url: str, dest_engine):
self.name = name
self.source_url = source_url
self.dest_engine = dest_engine
def extract(self) -> pd.DataFrame:
raise NotImplementedError
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
raise NotImplementedError
def load(self, df: pd.DataFrame) -> int:
raise NotImplementedError
def run(self) -> PipelineResult:
import time
start = time.time()
result = PipelineResult()
try:
raw = self.extract()
result.extracted = len(raw)
clean = self.transform(raw)
result.transformed = len(clean)
loaded = self.load(clean)
result.loaded = loaded
except Exception as e:
result.errors.append(str(e))
logger.error(f"Pipeline {self.name} failed: {e}")
result.duration_seconds = round(time.time() - start, 2)
return result
An idempotent pipeline produces the same result whether it runs once or multiple times. This is critical for reliability.
| Strategy | Description | Trade-off |
|---|---|---|
| Delete + Insert | Delete existing data, then insert fresh | Simple but has a gap |
| Upsert (MERGE) | Insert or update based on a key | No data gap |
| Partition overwrite | Replace an entire partition (e.g. by date) | Clean, no duplicates |
from sqlalchemy.dialects.postgresql import insert
Subscribe to continue reading
Get full access to this lesson and all 10 lessons in this course.