Database Patterns - Project Reference Pattern¶
Component: PostgreSQL Database & SQLAlchemy ORM Status: 🟢 Stable Created: 2025-12-29 Last Updated: 2025-12-29
Overview¶
Purpose¶
This PRP defines implementation patterns for database operations including SQLAlchemy model definitions, Alembic migrations, and query patterns for the Chaverim ALPR Platform.
Scope¶
Responsibilities: - SQLAlchemy model definitions - Alembic migration patterns - Query patterns and optimizations - Table partitioning strategies
Out of Scope: - Business logic (see component-specific PRPs) - API endpoint implementation - Connection pooling configuration (see deployment docs)
Dependencies¶
Requires: - PostgreSQL 17 - SQLAlchemy 2.0 - Alembic
Used By: - All central server components - API endpoints - Background workers
Quick Reference¶
When to Use This PRP¶
✅ Use when: - Creating new database models - Writing migrations - Adding table partitioning
❌ Don't use when: - Implementing business logic (use component PRPs) - Configuring database connections (see deployment)
Patterns¶
Pattern: Table Partitioning¶
Problem: detections table grows unboundedly; queries slow down; cleanup is expensive.
Solution: Partition detections table by month for efficient cleanup and query performance.
Implementation:
-- Create partitioned table
CREATE TABLE events (
id BIGSERIAL,
collector_id UUID NOT NULL,
plate_number VARCHAR(20) NOT NULL,
captured_at TIMESTAMPTZ NOT NULL,
PRIMARY KEY (id, captured_at)
) PARTITION BY RANGE (captured_at);
-- Create partitions (automated via cron or migration)
CREATE TABLE events_2025_01 PARTITION OF events
FOR VALUES FROM ('2025-01-01') TO ('2025-02-01');
CREATE TABLE events_2025_02 PARTITION OF events
FOR VALUES FROM ('2025-02-01') TO ('2025-03-01');
Partition Management Script:
from datetime import datetime, timedelta
from sqlalchemy import text
async def create_future_partitions(session, months_ahead: int = 3):
"""Create partitions for upcoming months."""
today = datetime.utcnow()
for i in range(months_ahead):
target = today + timedelta(days=30 * i)
year = target.year
month = target.month
partition_name = f"events_{year}_{month:02d}"
start_date = f"{year}-{month:02d}-01"
# Calculate next month
if month == 12:
end_date = f"{year + 1}-01-01"
else:
end_date = f"{year}-{month + 1:02d}-01"
sql = text(f"""
CREATE TABLE IF NOT EXISTS {partition_name}
PARTITION OF events
FOR VALUES FROM ('{start_date}') TO ('{end_date}')
""")
await session.execute(sql)
async def drop_old_partitions(session, retention_years: int = 7):
"""Drop partitions older than retention period."""
cutoff = datetime.utcnow() - timedelta(days=365 * retention_years)
# List partitions older than cutoff
sql = text("""
SELECT tablename FROM pg_tables
WHERE tablename LIKE 'events_%'
AND schemaname = 'public'
""")
result = await session.execute(sql)
for row in result:
table_name = row[0]
# Parse year_month from table name
parts = table_name.split('_')
if len(parts) == 3:
year, month = int(parts[1]), int(parts[2])
partition_date = datetime(year, month, 1)
if partition_date < cutoff:
await session.execute(text(f"DROP TABLE {table_name}"))
When to Use:
- Tables expected to exceed 10M rows
- Time-series data with retention policies
- events, audit_log, alert_queue
Trade-offs: - Pros: Fast cleanup (DROP vs DELETE), query pruning, manageable backups - Cons: Partition management overhead, unique constraints must include partition key
Pattern: Soft Delete vs Hard Delete¶
Problem: Need to preserve data for audit/investigation while allowing "deletion" from user perspective.
Solution: Use soft delete for user-facing data, hard delete only for truly ephemeral data.
Implementation:
from sqlalchemy import Column, DateTime, Boolean
from datetime import datetime
class SoftDeleteMixin:
"""Mixin for soft-deletable models."""
is_deleted = Column(Boolean, default=False, nullable=False, index=True)
deleted_at = Column(DateTime(timezone=True), nullable=True)
deleted_by = Column(UUID, nullable=True) # FK to users
def soft_delete(self, user_id: UUID):
self.is_deleted = True
self.deleted_at = datetime.utcnow()
self.deleted_by = user_id
class Watchlist(Base, SoftDeleteMixin):
__tablename__ = "watchlists"
id = Column(UUID, primary_key=True)
name = Column(String(100), nullable=False)
# ... other fields
# Query helper - exclude soft-deleted by default
def active_watchlists(session):
return session.query(Watchlist).filter(Watchlist.is_deleted == False)
When to Use: - Watchlists, watchlist entries - User-created data that may need recovery
When NOT to Use: - detections table (WORM - never delete, see data-integrity docs) - Session tokens, temporary data - Alert queue (processed entries)
Pattern: Optimistic Locking¶
Problem: Concurrent updates may overwrite each other without detection.
Solution: Use version column for optimistic locking on frequently updated records.
Implementation:
from sqlalchemy import Column, Integer
from sqlalchemy.orm import validates
class Collector(Base):
__tablename__ = "collectors"
id = Column(UUID, primary_key=True)
site_name = Column(String(100), nullable=False)
config_version = Column(Integer, default=1, nullable=False)
__mapper_args__ = {
"version_id_col": config_version
}
# Usage - SQLAlchemy handles version check automatically
async def update_collector_config(session, collector_id: UUID, new_config: dict):
collector = await session.get(Collector, collector_id)
collector.config = new_config
# config_version auto-increments on commit
# Raises StaleDataError if version mismatch
try:
await session.commit()
except StaleDataError:
await session.rollback()
raise ConflictError("Collector was modified by another request")
When to Use: - Collector configuration - Watchlist entries - User preferences
Anti-Patterns¶
❌ Anti-Pattern: N+1 Queries¶
Problem: Loading related objects one at a time in a loop.
Example of BAD code:
# BAD: N+1 queries
watchlists = session.query(Watchlist).all()
for watchlist in watchlists:
# This executes a query for EACH watchlist
print(f"{watchlist.name}: {len(watchlist.entries)} entries")
Why It's Wrong: - 1 query for watchlists + N queries for entries - Performance degrades linearly with data size
Correct Approach:
# GOOD: Eager loading with joinedload
from sqlalchemy.orm import joinedload
watchlists = (
session.query(Watchlist)
.options(joinedload(Watchlist.entries))
.all()
)
# Or use selectinload for large collections
watchlists = (
session.query(Watchlist)
.options(selectinload(Watchlist.entries))
.all()
)
❌ Anti-Pattern: Missing Indexes¶
Problem: Queries on unindexed columns cause full table scans.
Example of BAD code:
# Table without proper indexes
class Detection(Base):
__tablename__ = "detections"
id = Column(BigInteger, primary_key=True)
plate_number = Column(String(20)) # No index!
captured_at = Column(DateTime) # No index!
Correct Approach:
class Detection(Base):
__tablename__ = "detections"
__table_args__ = (
Index("ix_events_plate_number", "plate_number"),
Index("ix_events_captured_at", "captured_at"),
Index("ix_events_collector_captured", "collector_id", "captured_at"),
)
id = Column(BigInteger, primary_key=True)
plate_number = Column(String(20), nullable=False, index=True)
captured_at = Column(DateTime(timezone=True), nullable=False, index=True)
Testing Strategies¶
Unit Testing Models¶
import pytest
from datetime import datetime
def test_detection_model_creation(db_session):
"""Test basic Detection model creation."""
detection = Detection(
collector_id=uuid.uuid4(),
plate_number="ABC1234",
captured_at=datetime.utcnow()
)
db_session.add(detection)
db_session.commit()
assert detection.id is not None
assert detection.plate_number == "ABC1234"
def test_soft_delete(db_session, admin_user):
"""Test soft delete mixin."""
watchlist = Watchlist(name="Test", created_by=admin_user.id)
db_session.add(watchlist)
db_session.commit()
watchlist.soft_delete(admin_user.id)
db_session.commit()
assert watchlist.is_deleted is True
assert watchlist.deleted_at is not None
Migration Testing¶
def test_migration_upgrade_downgrade(alembic_runner):
"""Test migration is reversible."""
# Upgrade to latest
alembic_runner.migrate_up_to("head")
# Downgrade one revision
alembic_runner.migrate_down_one()
# Upgrade again - should succeed
alembic_runner.migrate_up_one()
Configuration¶
Environment Variables¶
| Variable | Required | Default | Description |
|---|---|---|---|
DATABASE_URL |
Yes | - | PostgreSQL connection string |
DB_POOL_SIZE |
No | 5 |
Connection pool size |
DB_MAX_OVERFLOW |
No | 10 |
Max overflow connections |
DB_ECHO |
No | false |
Log all SQL queries |
Related Documentation¶
- Data Integrity: data-integrity.md - WORM protection
- Data Retention: data-retention.md - Partition cleanup
- Testing: TEST_CONFIG.md - Database fixtures
Maintainer: Development Team Review Cycle: Quarterly