Skip to content

Database Patterns - Project Reference Pattern

Component: PostgreSQL Database & SQLAlchemy ORM Status: 🟢 Stable Created: 2025-12-29 Last Updated: 2025-12-29


Overview

Purpose

This PRP defines implementation patterns for database operations including SQLAlchemy model definitions, Alembic migrations, and query patterns for the Chaverim ALPR Platform.

Scope

Responsibilities: - SQLAlchemy model definitions - Alembic migration patterns - Query patterns and optimizations - Table partitioning strategies

Out of Scope: - Business logic (see component-specific PRPs) - API endpoint implementation - Connection pooling configuration (see deployment docs)

Dependencies

Requires: - PostgreSQL 17 - SQLAlchemy 2.0 - Alembic

Used By: - All central server components - API endpoints - Background workers


Quick Reference

When to Use This PRP

Use when: - Creating new database models - Writing migrations - Adding table partitioning

Don't use when: - Implementing business logic (use component PRPs) - Configuring database connections (see deployment)


Patterns

Pattern: Table Partitioning

Problem: detections table grows unboundedly; queries slow down; cleanup is expensive.

Solution: Partition detections table by month for efficient cleanup and query performance.

Implementation:

-- Create partitioned table
CREATE TABLE events (
    id BIGSERIAL,
    collector_id UUID NOT NULL,
    plate_number VARCHAR(20) NOT NULL,
    captured_at TIMESTAMPTZ NOT NULL,
    PRIMARY KEY (id, captured_at)
) PARTITION BY RANGE (captured_at);

-- Create partitions (automated via cron or migration)
CREATE TABLE events_2025_01 PARTITION OF events
    FOR VALUES FROM ('2025-01-01') TO ('2025-02-01');

CREATE TABLE events_2025_02 PARTITION OF events
    FOR VALUES FROM ('2025-02-01') TO ('2025-03-01');

Partition Management Script:

from datetime import datetime, timedelta
from sqlalchemy import text

async def create_future_partitions(session, months_ahead: int = 3):
    """Create partitions for upcoming months."""
    today = datetime.utcnow()

    for i in range(months_ahead):
        target = today + timedelta(days=30 * i)
        year = target.year
        month = target.month

        partition_name = f"events_{year}_{month:02d}"
        start_date = f"{year}-{month:02d}-01"

        # Calculate next month
        if month == 12:
            end_date = f"{year + 1}-01-01"
        else:
            end_date = f"{year}-{month + 1:02d}-01"

        sql = text(f"""
            CREATE TABLE IF NOT EXISTS {partition_name}
            PARTITION OF events
            FOR VALUES FROM ('{start_date}') TO ('{end_date}')
        """)

        await session.execute(sql)


async def drop_old_partitions(session, retention_years: int = 7):
    """Drop partitions older than retention period."""
    cutoff = datetime.utcnow() - timedelta(days=365 * retention_years)

    # List partitions older than cutoff
    sql = text("""
        SELECT tablename FROM pg_tables
        WHERE tablename LIKE 'events_%'
        AND schemaname = 'public'
    """)

    result = await session.execute(sql)
    for row in result:
        table_name = row[0]
        # Parse year_month from table name
        parts = table_name.split('_')
        if len(parts) == 3:
            year, month = int(parts[1]), int(parts[2])
            partition_date = datetime(year, month, 1)

            if partition_date < cutoff:
                await session.execute(text(f"DROP TABLE {table_name}"))

When to Use: - Tables expected to exceed 10M rows - Time-series data with retention policies - events, audit_log, alert_queue

Trade-offs: - Pros: Fast cleanup (DROP vs DELETE), query pruning, manageable backups - Cons: Partition management overhead, unique constraints must include partition key


Pattern: Soft Delete vs Hard Delete

Problem: Need to preserve data for audit/investigation while allowing "deletion" from user perspective.

Solution: Use soft delete for user-facing data, hard delete only for truly ephemeral data.

Implementation:

from sqlalchemy import Column, DateTime, Boolean
from datetime import datetime

class SoftDeleteMixin:
    """Mixin for soft-deletable models."""
    is_deleted = Column(Boolean, default=False, nullable=False, index=True)
    deleted_at = Column(DateTime(timezone=True), nullable=True)
    deleted_by = Column(UUID, nullable=True)  # FK to users

    def soft_delete(self, user_id: UUID):
        self.is_deleted = True
        self.deleted_at = datetime.utcnow()
        self.deleted_by = user_id


class Watchlist(Base, SoftDeleteMixin):
    __tablename__ = "watchlists"

    id = Column(UUID, primary_key=True)
    name = Column(String(100), nullable=False)
    # ... other fields


# Query helper - exclude soft-deleted by default
def active_watchlists(session):
    return session.query(Watchlist).filter(Watchlist.is_deleted == False)

When to Use: - Watchlists, watchlist entries - User-created data that may need recovery

When NOT to Use: - detections table (WORM - never delete, see data-integrity docs) - Session tokens, temporary data - Alert queue (processed entries)


Pattern: Optimistic Locking

Problem: Concurrent updates may overwrite each other without detection.

Solution: Use version column for optimistic locking on frequently updated records.

Implementation:

from sqlalchemy import Column, Integer
from sqlalchemy.orm import validates

class Collector(Base):
    __tablename__ = "collectors"

    id = Column(UUID, primary_key=True)
    site_name = Column(String(100), nullable=False)
    config_version = Column(Integer, default=1, nullable=False)

    __mapper_args__ = {
        "version_id_col": config_version
    }


# Usage - SQLAlchemy handles version check automatically
async def update_collector_config(session, collector_id: UUID, new_config: dict):
    collector = await session.get(Collector, collector_id)

    collector.config = new_config
    # config_version auto-increments on commit
    # Raises StaleDataError if version mismatch

    try:
        await session.commit()
    except StaleDataError:
        await session.rollback()
        raise ConflictError("Collector was modified by another request")

When to Use: - Collector configuration - Watchlist entries - User preferences


Anti-Patterns

❌ Anti-Pattern: N+1 Queries

Problem: Loading related objects one at a time in a loop.

Example of BAD code:

# BAD: N+1 queries
watchlists = session.query(Watchlist).all()
for watchlist in watchlists:
    # This executes a query for EACH watchlist
    print(f"{watchlist.name}: {len(watchlist.entries)} entries")

Why It's Wrong: - 1 query for watchlists + N queries for entries - Performance degrades linearly with data size

Correct Approach:

# GOOD: Eager loading with joinedload
from sqlalchemy.orm import joinedload

watchlists = (
    session.query(Watchlist)
    .options(joinedload(Watchlist.entries))
    .all()
)

# Or use selectinload for large collections
watchlists = (
    session.query(Watchlist)
    .options(selectinload(Watchlist.entries))
    .all()
)

❌ Anti-Pattern: Missing Indexes

Problem: Queries on unindexed columns cause full table scans.

Example of BAD code:

# Table without proper indexes
class Detection(Base):
    __tablename__ = "detections"

    id = Column(BigInteger, primary_key=True)
    plate_number = Column(String(20))  # No index!
    captured_at = Column(DateTime)      # No index!

Correct Approach:

class Detection(Base):
    __tablename__ = "detections"
    __table_args__ = (
        Index("ix_events_plate_number", "plate_number"),
        Index("ix_events_captured_at", "captured_at"),
        Index("ix_events_collector_captured", "collector_id", "captured_at"),
    )

    id = Column(BigInteger, primary_key=True)
    plate_number = Column(String(20), nullable=False, index=True)
    captured_at = Column(DateTime(timezone=True), nullable=False, index=True)

Testing Strategies

Unit Testing Models

import pytest
from datetime import datetime

def test_detection_model_creation(db_session):
    """Test basic Detection model creation."""
    detection = Detection(
        collector_id=uuid.uuid4(),
        plate_number="ABC1234",
        captured_at=datetime.utcnow()
    )
    db_session.add(detection)
    db_session.commit()

    assert detection.id is not None
    assert detection.plate_number == "ABC1234"


def test_soft_delete(db_session, admin_user):
    """Test soft delete mixin."""
    watchlist = Watchlist(name="Test", created_by=admin_user.id)
    db_session.add(watchlist)
    db_session.commit()

    watchlist.soft_delete(admin_user.id)
    db_session.commit()

    assert watchlist.is_deleted is True
    assert watchlist.deleted_at is not None

Migration Testing

def test_migration_upgrade_downgrade(alembic_runner):
    """Test migration is reversible."""
    # Upgrade to latest
    alembic_runner.migrate_up_to("head")

    # Downgrade one revision
    alembic_runner.migrate_down_one()

    # Upgrade again - should succeed
    alembic_runner.migrate_up_one()

Configuration

Environment Variables

Variable Required Default Description
DATABASE_URL Yes - PostgreSQL connection string
DB_POOL_SIZE No 5 Connection pool size
DB_MAX_OVERFLOW No 10 Max overflow connections
DB_ECHO No false Log all SQL queries


Maintainer: Development Team Review Cycle: Quarterly