Skip to content

API Key Patterns - Project Reference Pattern

Component: API Key Authentication Status: 🟡 In Development Created: 2025-12-29 Last Updated: 2025-12-29


Overview

Purpose

This PRP defines implementation patterns for API key authentication used in edge-to-central communication, including generation, hashing, validation, rotation, and revocation.

Scope

Responsibilities: - API key generation with secure randomness - Key hashing for storage (SHA-256) - Key validation on requests - Key rotation with grace period - Key revocation (immediate effect) - Usage tracking and monitoring

Out of Scope: - User authentication (uses JWT, see auth docs) - Edge provisioning flow (see edge-provisioning-prp.md) - Camera credentials (see edge-provisioning-prp.md)

Dependencies

Requires: - PostgreSQL (api_keys table) - secrets module (Python standard library) - hashlib module (Python standard library)

Used By: - Central server (validation middleware) - Edge collector (HTTP client) - Admin UI (key management)


Quick Reference

When to Use This PRP

Use when: - Generating new API keys for collectors - Implementing API key validation middleware - Building key rotation functionality - Implementing key revocation

Don't use when: - Authenticating web users (use JWT) - Storing camera credentials (use device-bound encryption)

Key Format

alpr_<32-byte-random-base64url>

Example: alpr_7Kx9mPqR2vNwYzA1bCdEfGhIjKlMnOpQ3rStUvWx

Property Value
Prefix alpr_ (identifies key type)
Random portion 32 bytes (256 bits)
Encoding Base64URL (URL-safe)
Total length ~49 characters
Entropy 256 bits

Patterns

Pattern: Key Generation

Problem: Need to generate cryptographically secure API keys that are identifiable and URL-safe.

Solution: Use Python's secrets module with base64url encoding and a prefix for identification.

Implementation:

import secrets


def generate_api_key() -> str:
    """
    Generate a secure API key for a collector.

    Format: alpr_<32-byte-random-base64url>

    Properties:
    - 256 bits of entropy (cryptographically secure)
    - URL-safe encoding (no special characters)
    - Prefix for identification and validation
    - ~49 characters total length

    Returns:
        API key string like "alpr_7Kx9mPqR2vNwYzA1bCdEfGhIjKlMnOpQ3rStUvWx"
    """
    random_bytes = secrets.token_urlsafe(32)
    return f"alpr_{random_bytes}"


def validate_key_format(api_key: str) -> bool:
    """
    Validate API key format (not authenticity).

    Checks:
    - Starts with 'alpr_'
    - Correct length (~49 chars)
    - Contains only valid base64url characters

    Args:
        api_key: Key to validate

    Returns:
        True if format is valid
    """
    if not api_key.startswith("alpr_"):
        return False

    if len(api_key) < 40 or len(api_key) > 55:
        return False

    # Check base64url characters only
    valid_chars = set(
        "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_"
    )
    random_part = api_key[5:]  # After "alpr_"
    return all(c in valid_chars for c in random_part)

When to Use: - Creating new collectors - Key rotation (generating replacement key) - Provisioning flow

Trade-offs: - Pros: High entropy, URL-safe, identifiable prefix - Cons: Longer than some key formats (~49 chars)


Pattern: Key Hashing

Problem: API keys must be stored securely; plaintext storage exposes all keys if database is compromised.

Solution: Store SHA-256 hash of key; verify by hashing incoming key and comparing.

Implementation:

import hashlib


def hash_api_key(api_key: str) -> str:
    """
    Hash API key for secure storage.

    Uses SHA-256 (fast lookup, collision-resistant).
    Note: We use SHA-256 not bcrypt/argon2 because:
    - Keys have 256 bits of entropy (immune to brute force)
    - We need fast validation on every request
    - No need for salt (key is random)

    Args:
        api_key: Plaintext API key

    Returns:
        64-character hexadecimal hash
    """
    return hashlib.sha256(api_key.encode()).hexdigest()


def get_key_prefix(api_key: str) -> str:
    """
    Extract prefix for UI identification.

    Shows first 10 chars which includes 'alpr_' prefix
    and first 5 chars of random portion.

    Args:
        api_key: Plaintext API key

    Returns:
        Key prefix like "alpr_7Kx9m"
    """
    return api_key[:10] if len(api_key) >= 10 else api_key

Database Storage:

Column Contains Example
key_hash SHA-256 hash (64 chars) a1b2c3d4...
key_prefix First 10 chars alpr_7Kx9m

When to Use: - Storing new keys in database - Validating keys on requests

Trade-offs: - Pros: Fast validation, no brute force risk (high entropy), deterministic - Cons: Key cannot be recovered (by design)


Pattern: Key Validation Middleware

Problem: Every edge-to-central request must be authenticated; validation must be fast and provide useful context.

Solution: FastAPI dependency that validates key, tracks usage, and returns collector context.

Implementation:

from datetime import datetime
from typing import Optional

from fastapi import Header, HTTPException, Depends, Request
from sqlalchemy import or_, func
from sqlalchemy.ext.asyncio import AsyncSession


async def validate_api_key(
    request: Request,
    api_key: str = Header(alias="X-API-Key"),
    db: AsyncSession = Depends(get_db)
) -> "Collector":
    """
    Validate API key and return associated collector.

    Performs checks:
    1. Key format is valid
    2. Key hash exists in database
    3. Key is active (not revoked)
    4. Key is not expired
    5. Collector is active

    Updates:
    - last_used_at timestamp
    - last_used_ip address

    Args:
        request: FastAPI request for IP tracking
        api_key: Key from X-API-Key header
        db: Database session

    Returns:
        Collector associated with the key

    Raises:
        HTTPException: 401 if key invalid, inactive, or expired
    """
    # Quick format check
    if not validate_key_format(api_key):
        logger.warning(
            "Malformed API key",
            key_prefix=api_key[:10] if len(api_key) >= 10 else "short",
            src_ip=request.client.host
        )
        raise HTTPException(401, "Invalid API key format")

    key_hash = hash_api_key(api_key)

    # Query for valid key
    result = await db.execute(
        select(ApiKey)
        .join(Collector)
        .where(
            ApiKey.key_hash == key_hash,
            ApiKey.is_active == True,
            Collector.status == "active",
            or_(
                ApiKey.expires_at.is_(None),
                ApiKey.expires_at > func.now()
            )
        )
    )
    key_record = result.scalar_one_or_none()

    if not key_record:
        # Check if key exists but is revoked (for alerting)
        await check_revoked_key(api_key, db, request)

        logger.warning(
            "Invalid API key attempt",
            key_prefix=get_key_prefix(api_key),
            src_ip=request.client.host
        )
        raise HTTPException(401, "Invalid or inactive API key")

    # Update usage tracking
    key_record.last_used_at = datetime.utcnow()
    key_record.last_used_ip = request.client.host
    await db.commit()

    return key_record.collector


async def check_revoked_key(
    api_key: str,
    db: AsyncSession,
    request: Request
) -> None:
    """Check if key was revoked and alert if so."""
    key_hash = hash_api_key(api_key)

    result = await db.execute(
        select(ApiKey).where(
            ApiKey.key_hash == key_hash,
            ApiKey.is_active == False,
            ApiKey.revoked_at.isnot(None)
        )
    )
    revoked_key = result.scalar_one_or_none()

    if revoked_key:
        logger.warning(
            "Revoked API key usage attempt",
            key_prefix=get_key_prefix(api_key),
            collector_id=str(revoked_key.collector_id),
            revoked_at=revoked_key.revoked_at.isoformat(),
            revoke_reason=revoked_key.revoke_reason,
            src_ip=request.client.host
        )
        # Optionally trigger alert
        await alert_revoked_key_usage(revoked_key, request)

Usage in Endpoints:

from fastapi import APIRouter, Depends

router = APIRouter()


@router.post("/api/v1/events")
async def receive_events(
    events: list[EventCreate],
    collector: Collector = Depends(validate_api_key),
    db: AsyncSession = Depends(get_db)
):
    """
    Receive detections from edge collector.

    Collector is automatically authenticated via API key.
    """
    for detection in detections:
        detection.collector_id = collector.id
        # ... process events

    return {"received": len(events)}


@router.post("/api/v1/heartbeat")
async def heartbeat(
    status: HeartbeatRequest,
    collector: Collector = Depends(validate_api_key),
    db: AsyncSession = Depends(get_db)
):
    """
    Receive heartbeat and return pending commands.
    """
    collector.last_heartbeat_at = datetime.utcnow()
    await db.commit()

    commands = await get_pending_commands(collector.id, db)
    return {"status": "ok", "pending_commands": commands}

When to Use: - All edge-to-central API endpoints - Any endpoint requiring collector authentication

Trade-offs: - Pros: Centralized validation, usage tracking, revoked key detection - Cons: Database query on every request (mitigated by indexed lookup)


Pattern: Key Rotation

Problem: Keys should be rotated periodically; rotation must not cause service interruption.

Solution: Generate new key while keeping old key active during grace period; deactivate old key after confirmation.

Implementation:

from datetime import datetime, timedelta


async def rotate_api_key(
    collector_id: str,
    grace_period_hours: int = 24,
    created_by: str = None,
    db: AsyncSession = None
) -> dict:
    """
    Rotate API key for a collector with grace period.

    Process:
    1. Generate new key
    2. Store new key (active)
    3. Mark old key to expire after grace period
    4. Return new key (only time it's shown)

    Args:
        collector_id: Target collector
        grace_period_hours: Time to allow both keys (default 24h)
        created_by: Admin user ID performing rotation
        db: Database session

    Returns:
        Dict with new key and expiration of old key

    Raises:
        ValueError: If collector not found
    """
    # Find collector
    collector = await db.get(Collector, collector_id)
    if not collector:
        raise ValueError(f"Collector {collector_id} not found")

    # Generate new key
    new_key = generate_api_key()
    new_key_hash = hash_api_key(new_key)

    # Create new key record
    new_key_record = ApiKey(
        collector_id=collector_id,
        key_hash=new_key_hash,
        key_prefix=get_key_prefix(new_key),
        name="Rotated key",
        is_active=True,
        created_at=datetime.utcnow(),
        created_by=created_by
    )
    db.add(new_key_record)

    # Set expiration on old keys
    old_key_expires = datetime.utcnow() + timedelta(hours=grace_period_hours)

    await db.execute(
        update(ApiKey)
        .where(
            ApiKey.collector_id == collector_id,
            ApiKey.is_active == True,
            ApiKey.id != new_key_record.id
        )
        .values(expires_at=old_key_expires)
    )

    await db.commit()

    logger.info(
        "API key rotated",
        collector_id=collector_id,
        new_key_prefix=get_key_prefix(new_key),
        grace_period_hours=grace_period_hours,
        old_key_expires_at=old_key_expires.isoformat(),
        rotated_by=created_by
    )

    return {
        "new_key": new_key,  # Only time key is revealed
        "old_key_expires_at": old_key_expires.isoformat(),
        "message": f"Update edge collector within {grace_period_hours} hours"
    }

Rotation Timeline:

T+0h          T+24h (grace period)
  │               │
  â–¼               â–¼
[Generate new] ──► [Old key expires]
     │               │
     │  Both keys    │  Only new key
     │  valid        │  valid
     â–¼               â–¼

When to Use: - Scheduled key rotation (annual/quarterly) - Personnel changes - After suspected exposure

Trade-offs: - Pros: Zero downtime, configurable grace period - Cons: Brief window with two valid keys


Pattern: Key Revocation

Problem: Compromised or decommissioned keys must be immediately invalidated.

Solution: Mark key as inactive with timestamp and reason; validation rejects immediately.

Implementation:

async def revoke_api_key(
    key_id: str,
    revoked_by: str,
    reason: str,
    db: AsyncSession
) -> dict:
    """
    Immediately revoke an API key.

    Effects:
    - Key becomes invalid immediately
    - All requests with this key will fail
    - Revocation is logged with reason
    - Usage attempts are tracked and alerted

    Args:
        key_id: API key record ID
        revoked_by: Admin user ID
        reason: Reason for revocation
        db: Database session

    Returns:
        Revocation confirmation

    Raises:
        ValueError: If key not found
    """
    key_record = await db.get(ApiKey, key_id)
    if not key_record:
        raise ValueError(f"API key {key_id} not found")

    key_record.is_active = False
    key_record.revoked_at = datetime.utcnow()
    key_record.revoked_by = revoked_by
    key_record.revoke_reason = reason

    await db.commit()

    logger.warning(
        "API key revoked",
        key_id=key_id,
        key_prefix=key_record.key_prefix,
        collector_id=str(key_record.collector_id),
        revoked_by=revoked_by,
        revoke_reason=reason
    )

    return {
        "revoked_at": key_record.revoked_at.isoformat(),
        "collector_id": str(key_record.collector_id),
        "message": "Key revoked. Collector will be unable to authenticate."
    }


async def revoke_all_collector_keys(
    collector_id: str,
    revoked_by: str,
    reason: str,
    db: AsyncSession
) -> int:
    """
    Revoke all API keys for a collector.

    Used when decommissioning collector or full compromise.

    Returns:
        Number of keys revoked
    """
    result = await db.execute(
        update(ApiKey)
        .where(
            ApiKey.collector_id == collector_id,
            ApiKey.is_active == True
        )
        .values(
            is_active=False,
            revoked_at=datetime.utcnow(),
            revoked_by=revoked_by,
            revoke_reason=reason
        )
    )

    await db.commit()

    count = result.rowcount
    logger.warning(
        "All collector keys revoked",
        collector_id=collector_id,
        keys_revoked=count,
        revoked_by=revoked_by,
        revoke_reason=reason
    )

    return count

When to Use: - Suspected compromise - Collector decommissioning - Lost/stolen device

Trade-offs: - Pros: Immediate effect, full audit trail - Cons: Edge collector goes offline until new key provisioned


Anti-Patterns

Anti-Pattern: Storing Plaintext Keys

Problem: Storing API keys in plaintext in the database.

Example of BAD code:

# BAD: Plaintext storage
class ApiKey(Base):
    api_key = Column(String(100))  # Plaintext!

Why It's Wrong: - Database breach exposes all keys - Backups contain live credentials - Logs may accidentally include keys

Correct Approach:

# GOOD: Hash only
class ApiKey(Base):
    key_hash = Column(String(64), nullable=False)  # SHA-256
    key_prefix = Column(String(10))  # For identification only

Anti-Pattern: Using Slow Hashing for API Keys

Problem: Using bcrypt/argon2 for API key validation.

Example of BAD code:

# BAD: Slow hash for high-entropy keys
from argon2 import PasswordHasher

def verify_key(api_key: str, stored_hash: str) -> bool:
    return PasswordHasher().verify(stored_hash, api_key)  # ~100ms

Why It's Wrong: - Adds 100ms+ to every request - Unnecessary for high-entropy keys - Password hashing is for low-entropy secrets

Correct Approach:

# GOOD: Fast SHA-256 for high-entropy keys
import hashlib

def verify_key(api_key: str, stored_hash: str) -> bool:
    return hashlib.sha256(api_key.encode()).hexdigest() == stored_hash  # <1ms

Anti-Pattern: No Revocation Tracking

Problem: Simply deleting revoked keys instead of marking them inactive.

Example of BAD code:

# BAD: Delete revoked keys
async def revoke_key(key_id: str):
    await db.execute(delete(ApiKey).where(ApiKey.id == key_id))

Why It's Wrong: - Cannot detect attempts to use revoked keys - No audit trail - Cannot investigate compromises

Correct Approach:

# GOOD: Soft revoke with audit
async def revoke_key(key_id: str, revoked_by: str, reason: str):
    await db.execute(
        update(ApiKey)
        .where(ApiKey.id == key_id)
        .values(
            is_active=False,
            revoked_at=datetime.utcnow(),
            revoked_by=revoked_by,
            revoke_reason=reason
        )
    )

Testing Strategies

Unit Testing

import pytest


def test_generate_api_key_format():
    """Test generated key has correct format."""
    key = generate_api_key()
    assert key.startswith("alpr_")
    assert len(key) >= 40
    assert len(key) <= 55


def test_generate_api_key_unique():
    """Test keys are unique."""
    keys = {generate_api_key() for _ in range(100)}
    assert len(keys) == 100


def test_hash_api_key_deterministic():
    """Test same key produces same hash."""
    key = "alpr_test123"
    hash1 = hash_api_key(key)
    hash2 = hash_api_key(key)
    assert hash1 == hash2


def test_hash_api_key_length():
    """Test hash is SHA-256 length."""
    key = generate_api_key()
    key_hash = hash_api_key(key)
    assert len(key_hash) == 64  # SHA-256 hex


def test_validate_key_format_valid():
    """Test valid key format passes."""
    key = generate_api_key()
    assert validate_key_format(key) is True


def test_validate_key_format_no_prefix():
    """Test missing prefix fails."""
    assert validate_key_format("abcd1234") is False


def test_validate_key_format_wrong_prefix():
    """Test wrong prefix fails."""
    assert validate_key_format("api_abcd1234") is False

Integration Testing

@pytest.fixture
def active_api_key(db_session):
    """Create active API key for testing."""
    key = generate_api_key()
    key_record = ApiKey(
        collector_id=test_collector_id,
        key_hash=hash_api_key(key),
        key_prefix=get_key_prefix(key),
        is_active=True
    )
    db_session.add(key_record)
    db_session.commit()
    return key


async def test_validate_api_key_success(client, active_api_key):
    """Test valid key authenticates successfully."""
    response = await client.post(
        "/api/v1/heartbeat",
        headers={"X-API-Key": active_api_key},
        json={"uptime_sec": 100}
    )
    assert response.status_code == 200


async def test_validate_api_key_invalid(client):
    """Test invalid key is rejected."""
    response = await client.post(
        "/api/v1/heartbeat",
        headers={"X-API-Key": "alpr_invalid_key_here"},
        json={"uptime_sec": 100}
    )
    assert response.status_code == 401


async def test_validate_api_key_missing(client):
    """Test missing key is rejected."""
    response = await client.post(
        "/api/v1/heartbeat",
        json={"uptime_sec": 100}
    )
    assert response.status_code == 422  # Missing header

Configuration

Environment Variables

Variable Required Default Description
API_KEY_PREFIX No alpr_ Prefix for generated keys
API_KEY_ROTATION_GRACE_HOURS No 24 Default rotation grace period

Database Schema

CREATE TABLE api_keys (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    collector_id UUID NOT NULL REFERENCES edge.collectors(id),

    -- Key storage
    key_hash CHAR(64) NOT NULL,          -- SHA-256 hash
    key_prefix VARCHAR(10) NOT NULL,      -- First 10 chars for UI

    -- Metadata
    name VARCHAR(100),                    -- Optional label

    -- Lifecycle
    is_active BOOLEAN DEFAULT true,
    created_at TIMESTAMPTZ DEFAULT NOW(),
    created_by UUID NOT NULL,
    expires_at TIMESTAMPTZ,               -- NULL = no expiration

    -- Usage tracking
    last_used_at TIMESTAMPTZ,
    last_used_ip VARCHAR(45),

    -- Revocation
    revoked_at TIMESTAMPTZ,
    revoked_by UUID,
    revoke_reason TEXT
);

-- Fast lookup by hash (for validation)
CREATE INDEX idx_api_keys_hash ON api_keys (key_hash) WHERE is_active = true;

-- Lookup by collector
CREATE INDEX idx_api_keys_collector ON api_keys (collector_id);

-- Find expiring keys
CREATE INDEX idx_api_keys_expires ON api_keys (expires_at)
    WHERE expires_at IS NOT NULL AND is_active = true;


Maintainer: Development Team Review Cycle: Quarterly