API Key Patterns - Project Reference Pattern¶
Component: API Key Authentication Status: 🟡 In Development Created: 2025-12-29 Last Updated: 2025-12-29
Overview¶
Purpose¶
This PRP defines implementation patterns for API key authentication used in edge-to-central communication, including generation, hashing, validation, rotation, and revocation.
Scope¶
Responsibilities: - API key generation with secure randomness - Key hashing for storage (SHA-256) - Key validation on requests - Key rotation with grace period - Key revocation (immediate effect) - Usage tracking and monitoring
Out of Scope: - User authentication (uses JWT, see auth docs) - Edge provisioning flow (see edge-provisioning-prp.md) - Camera credentials (see edge-provisioning-prp.md)
Dependencies¶
Requires: - PostgreSQL (api_keys table) - secrets module (Python standard library) - hashlib module (Python standard library)
Used By: - Central server (validation middleware) - Edge collector (HTTP client) - Admin UI (key management)
Quick Reference¶
When to Use This PRP¶
Use when: - Generating new API keys for collectors - Implementing API key validation middleware - Building key rotation functionality - Implementing key revocation
Don't use when: - Authenticating web users (use JWT) - Storing camera credentials (use device-bound encryption)
Key Format¶
Example: alpr_7Kx9mPqR2vNwYzA1bCdEfGhIjKlMnOpQ3rStUvWx
| Property | Value |
|---|---|
| Prefix | alpr_ (identifies key type) |
| Random portion | 32 bytes (256 bits) |
| Encoding | Base64URL (URL-safe) |
| Total length | ~49 characters |
| Entropy | 256 bits |
Patterns¶
Pattern: Key Generation¶
Problem: Need to generate cryptographically secure API keys that are identifiable and URL-safe.
Solution:
Use Python's secrets module with base64url encoding and a prefix for identification.
Implementation:
import secrets
def generate_api_key() -> str:
"""
Generate a secure API key for a collector.
Format: alpr_<32-byte-random-base64url>
Properties:
- 256 bits of entropy (cryptographically secure)
- URL-safe encoding (no special characters)
- Prefix for identification and validation
- ~49 characters total length
Returns:
API key string like "alpr_7Kx9mPqR2vNwYzA1bCdEfGhIjKlMnOpQ3rStUvWx"
"""
random_bytes = secrets.token_urlsafe(32)
return f"alpr_{random_bytes}"
def validate_key_format(api_key: str) -> bool:
"""
Validate API key format (not authenticity).
Checks:
- Starts with 'alpr_'
- Correct length (~49 chars)
- Contains only valid base64url characters
Args:
api_key: Key to validate
Returns:
True if format is valid
"""
if not api_key.startswith("alpr_"):
return False
if len(api_key) < 40 or len(api_key) > 55:
return False
# Check base64url characters only
valid_chars = set(
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_"
)
random_part = api_key[5:] # After "alpr_"
return all(c in valid_chars for c in random_part)
When to Use: - Creating new collectors - Key rotation (generating replacement key) - Provisioning flow
Trade-offs: - Pros: High entropy, URL-safe, identifiable prefix - Cons: Longer than some key formats (~49 chars)
Pattern: Key Hashing¶
Problem: API keys must be stored securely; plaintext storage exposes all keys if database is compromised.
Solution: Store SHA-256 hash of key; verify by hashing incoming key and comparing.
Implementation:
import hashlib
def hash_api_key(api_key: str) -> str:
"""
Hash API key for secure storage.
Uses SHA-256 (fast lookup, collision-resistant).
Note: We use SHA-256 not bcrypt/argon2 because:
- Keys have 256 bits of entropy (immune to brute force)
- We need fast validation on every request
- No need for salt (key is random)
Args:
api_key: Plaintext API key
Returns:
64-character hexadecimal hash
"""
return hashlib.sha256(api_key.encode()).hexdigest()
def get_key_prefix(api_key: str) -> str:
"""
Extract prefix for UI identification.
Shows first 10 chars which includes 'alpr_' prefix
and first 5 chars of random portion.
Args:
api_key: Plaintext API key
Returns:
Key prefix like "alpr_7Kx9m"
"""
return api_key[:10] if len(api_key) >= 10 else api_key
Database Storage:
| Column | Contains | Example |
|---|---|---|
key_hash |
SHA-256 hash (64 chars) | a1b2c3d4... |
key_prefix |
First 10 chars | alpr_7Kx9m |
When to Use: - Storing new keys in database - Validating keys on requests
Trade-offs: - Pros: Fast validation, no brute force risk (high entropy), deterministic - Cons: Key cannot be recovered (by design)
Pattern: Key Validation Middleware¶
Problem: Every edge-to-central request must be authenticated; validation must be fast and provide useful context.
Solution: FastAPI dependency that validates key, tracks usage, and returns collector context.
Implementation:
from datetime import datetime
from typing import Optional
from fastapi import Header, HTTPException, Depends, Request
from sqlalchemy import or_, func
from sqlalchemy.ext.asyncio import AsyncSession
async def validate_api_key(
request: Request,
api_key: str = Header(alias="X-API-Key"),
db: AsyncSession = Depends(get_db)
) -> "Collector":
"""
Validate API key and return associated collector.
Performs checks:
1. Key format is valid
2. Key hash exists in database
3. Key is active (not revoked)
4. Key is not expired
5. Collector is active
Updates:
- last_used_at timestamp
- last_used_ip address
Args:
request: FastAPI request for IP tracking
api_key: Key from X-API-Key header
db: Database session
Returns:
Collector associated with the key
Raises:
HTTPException: 401 if key invalid, inactive, or expired
"""
# Quick format check
if not validate_key_format(api_key):
logger.warning(
"Malformed API key",
key_prefix=api_key[:10] if len(api_key) >= 10 else "short",
src_ip=request.client.host
)
raise HTTPException(401, "Invalid API key format")
key_hash = hash_api_key(api_key)
# Query for valid key
result = await db.execute(
select(ApiKey)
.join(Collector)
.where(
ApiKey.key_hash == key_hash,
ApiKey.is_active == True,
Collector.status == "active",
or_(
ApiKey.expires_at.is_(None),
ApiKey.expires_at > func.now()
)
)
)
key_record = result.scalar_one_or_none()
if not key_record:
# Check if key exists but is revoked (for alerting)
await check_revoked_key(api_key, db, request)
logger.warning(
"Invalid API key attempt",
key_prefix=get_key_prefix(api_key),
src_ip=request.client.host
)
raise HTTPException(401, "Invalid or inactive API key")
# Update usage tracking
key_record.last_used_at = datetime.utcnow()
key_record.last_used_ip = request.client.host
await db.commit()
return key_record.collector
async def check_revoked_key(
api_key: str,
db: AsyncSession,
request: Request
) -> None:
"""Check if key was revoked and alert if so."""
key_hash = hash_api_key(api_key)
result = await db.execute(
select(ApiKey).where(
ApiKey.key_hash == key_hash,
ApiKey.is_active == False,
ApiKey.revoked_at.isnot(None)
)
)
revoked_key = result.scalar_one_or_none()
if revoked_key:
logger.warning(
"Revoked API key usage attempt",
key_prefix=get_key_prefix(api_key),
collector_id=str(revoked_key.collector_id),
revoked_at=revoked_key.revoked_at.isoformat(),
revoke_reason=revoked_key.revoke_reason,
src_ip=request.client.host
)
# Optionally trigger alert
await alert_revoked_key_usage(revoked_key, request)
Usage in Endpoints:
from fastapi import APIRouter, Depends
router = APIRouter()
@router.post("/api/v1/events")
async def receive_events(
events: list[EventCreate],
collector: Collector = Depends(validate_api_key),
db: AsyncSession = Depends(get_db)
):
"""
Receive detections from edge collector.
Collector is automatically authenticated via API key.
"""
for detection in detections:
detection.collector_id = collector.id
# ... process events
return {"received": len(events)}
@router.post("/api/v1/heartbeat")
async def heartbeat(
status: HeartbeatRequest,
collector: Collector = Depends(validate_api_key),
db: AsyncSession = Depends(get_db)
):
"""
Receive heartbeat and return pending commands.
"""
collector.last_heartbeat_at = datetime.utcnow()
await db.commit()
commands = await get_pending_commands(collector.id, db)
return {"status": "ok", "pending_commands": commands}
When to Use: - All edge-to-central API endpoints - Any endpoint requiring collector authentication
Trade-offs: - Pros: Centralized validation, usage tracking, revoked key detection - Cons: Database query on every request (mitigated by indexed lookup)
Pattern: Key Rotation¶
Problem: Keys should be rotated periodically; rotation must not cause service interruption.
Solution: Generate new key while keeping old key active during grace period; deactivate old key after confirmation.
Implementation:
from datetime import datetime, timedelta
async def rotate_api_key(
collector_id: str,
grace_period_hours: int = 24,
created_by: str = None,
db: AsyncSession = None
) -> dict:
"""
Rotate API key for a collector with grace period.
Process:
1. Generate new key
2. Store new key (active)
3. Mark old key to expire after grace period
4. Return new key (only time it's shown)
Args:
collector_id: Target collector
grace_period_hours: Time to allow both keys (default 24h)
created_by: Admin user ID performing rotation
db: Database session
Returns:
Dict with new key and expiration of old key
Raises:
ValueError: If collector not found
"""
# Find collector
collector = await db.get(Collector, collector_id)
if not collector:
raise ValueError(f"Collector {collector_id} not found")
# Generate new key
new_key = generate_api_key()
new_key_hash = hash_api_key(new_key)
# Create new key record
new_key_record = ApiKey(
collector_id=collector_id,
key_hash=new_key_hash,
key_prefix=get_key_prefix(new_key),
name="Rotated key",
is_active=True,
created_at=datetime.utcnow(),
created_by=created_by
)
db.add(new_key_record)
# Set expiration on old keys
old_key_expires = datetime.utcnow() + timedelta(hours=grace_period_hours)
await db.execute(
update(ApiKey)
.where(
ApiKey.collector_id == collector_id,
ApiKey.is_active == True,
ApiKey.id != new_key_record.id
)
.values(expires_at=old_key_expires)
)
await db.commit()
logger.info(
"API key rotated",
collector_id=collector_id,
new_key_prefix=get_key_prefix(new_key),
grace_period_hours=grace_period_hours,
old_key_expires_at=old_key_expires.isoformat(),
rotated_by=created_by
)
return {
"new_key": new_key, # Only time key is revealed
"old_key_expires_at": old_key_expires.isoformat(),
"message": f"Update edge collector within {grace_period_hours} hours"
}
Rotation Timeline:
T+0h T+24h (grace period)
│ │
â–¼ â–¼
[Generate new] ──► [Old key expires]
│ │
│ Both keys │ Only new key
│ valid │ valid
â–¼ â–¼
When to Use: - Scheduled key rotation (annual/quarterly) - Personnel changes - After suspected exposure
Trade-offs: - Pros: Zero downtime, configurable grace period - Cons: Brief window with two valid keys
Pattern: Key Revocation¶
Problem: Compromised or decommissioned keys must be immediately invalidated.
Solution: Mark key as inactive with timestamp and reason; validation rejects immediately.
Implementation:
async def revoke_api_key(
key_id: str,
revoked_by: str,
reason: str,
db: AsyncSession
) -> dict:
"""
Immediately revoke an API key.
Effects:
- Key becomes invalid immediately
- All requests with this key will fail
- Revocation is logged with reason
- Usage attempts are tracked and alerted
Args:
key_id: API key record ID
revoked_by: Admin user ID
reason: Reason for revocation
db: Database session
Returns:
Revocation confirmation
Raises:
ValueError: If key not found
"""
key_record = await db.get(ApiKey, key_id)
if not key_record:
raise ValueError(f"API key {key_id} not found")
key_record.is_active = False
key_record.revoked_at = datetime.utcnow()
key_record.revoked_by = revoked_by
key_record.revoke_reason = reason
await db.commit()
logger.warning(
"API key revoked",
key_id=key_id,
key_prefix=key_record.key_prefix,
collector_id=str(key_record.collector_id),
revoked_by=revoked_by,
revoke_reason=reason
)
return {
"revoked_at": key_record.revoked_at.isoformat(),
"collector_id": str(key_record.collector_id),
"message": "Key revoked. Collector will be unable to authenticate."
}
async def revoke_all_collector_keys(
collector_id: str,
revoked_by: str,
reason: str,
db: AsyncSession
) -> int:
"""
Revoke all API keys for a collector.
Used when decommissioning collector or full compromise.
Returns:
Number of keys revoked
"""
result = await db.execute(
update(ApiKey)
.where(
ApiKey.collector_id == collector_id,
ApiKey.is_active == True
)
.values(
is_active=False,
revoked_at=datetime.utcnow(),
revoked_by=revoked_by,
revoke_reason=reason
)
)
await db.commit()
count = result.rowcount
logger.warning(
"All collector keys revoked",
collector_id=collector_id,
keys_revoked=count,
revoked_by=revoked_by,
revoke_reason=reason
)
return count
When to Use: - Suspected compromise - Collector decommissioning - Lost/stolen device
Trade-offs: - Pros: Immediate effect, full audit trail - Cons: Edge collector goes offline until new key provisioned
Anti-Patterns¶
Anti-Pattern: Storing Plaintext Keys¶
Problem: Storing API keys in plaintext in the database.
Example of BAD code:
Why It's Wrong: - Database breach exposes all keys - Backups contain live credentials - Logs may accidentally include keys
Correct Approach:
# GOOD: Hash only
class ApiKey(Base):
key_hash = Column(String(64), nullable=False) # SHA-256
key_prefix = Column(String(10)) # For identification only
Anti-Pattern: Using Slow Hashing for API Keys¶
Problem: Using bcrypt/argon2 for API key validation.
Example of BAD code:
# BAD: Slow hash for high-entropy keys
from argon2 import PasswordHasher
def verify_key(api_key: str, stored_hash: str) -> bool:
return PasswordHasher().verify(stored_hash, api_key) # ~100ms
Why It's Wrong: - Adds 100ms+ to every request - Unnecessary for high-entropy keys - Password hashing is for low-entropy secrets
Correct Approach:
# GOOD: Fast SHA-256 for high-entropy keys
import hashlib
def verify_key(api_key: str, stored_hash: str) -> bool:
return hashlib.sha256(api_key.encode()).hexdigest() == stored_hash # <1ms
Anti-Pattern: No Revocation Tracking¶
Problem: Simply deleting revoked keys instead of marking them inactive.
Example of BAD code:
# BAD: Delete revoked keys
async def revoke_key(key_id: str):
await db.execute(delete(ApiKey).where(ApiKey.id == key_id))
Why It's Wrong: - Cannot detect attempts to use revoked keys - No audit trail - Cannot investigate compromises
Correct Approach:
# GOOD: Soft revoke with audit
async def revoke_key(key_id: str, revoked_by: str, reason: str):
await db.execute(
update(ApiKey)
.where(ApiKey.id == key_id)
.values(
is_active=False,
revoked_at=datetime.utcnow(),
revoked_by=revoked_by,
revoke_reason=reason
)
)
Testing Strategies¶
Unit Testing¶
import pytest
def test_generate_api_key_format():
"""Test generated key has correct format."""
key = generate_api_key()
assert key.startswith("alpr_")
assert len(key) >= 40
assert len(key) <= 55
def test_generate_api_key_unique():
"""Test keys are unique."""
keys = {generate_api_key() for _ in range(100)}
assert len(keys) == 100
def test_hash_api_key_deterministic():
"""Test same key produces same hash."""
key = "alpr_test123"
hash1 = hash_api_key(key)
hash2 = hash_api_key(key)
assert hash1 == hash2
def test_hash_api_key_length():
"""Test hash is SHA-256 length."""
key = generate_api_key()
key_hash = hash_api_key(key)
assert len(key_hash) == 64 # SHA-256 hex
def test_validate_key_format_valid():
"""Test valid key format passes."""
key = generate_api_key()
assert validate_key_format(key) is True
def test_validate_key_format_no_prefix():
"""Test missing prefix fails."""
assert validate_key_format("abcd1234") is False
def test_validate_key_format_wrong_prefix():
"""Test wrong prefix fails."""
assert validate_key_format("api_abcd1234") is False
Integration Testing¶
@pytest.fixture
def active_api_key(db_session):
"""Create active API key for testing."""
key = generate_api_key()
key_record = ApiKey(
collector_id=test_collector_id,
key_hash=hash_api_key(key),
key_prefix=get_key_prefix(key),
is_active=True
)
db_session.add(key_record)
db_session.commit()
return key
async def test_validate_api_key_success(client, active_api_key):
"""Test valid key authenticates successfully."""
response = await client.post(
"/api/v1/heartbeat",
headers={"X-API-Key": active_api_key},
json={"uptime_sec": 100}
)
assert response.status_code == 200
async def test_validate_api_key_invalid(client):
"""Test invalid key is rejected."""
response = await client.post(
"/api/v1/heartbeat",
headers={"X-API-Key": "alpr_invalid_key_here"},
json={"uptime_sec": 100}
)
assert response.status_code == 401
async def test_validate_api_key_missing(client):
"""Test missing key is rejected."""
response = await client.post(
"/api/v1/heartbeat",
json={"uptime_sec": 100}
)
assert response.status_code == 422 # Missing header
Configuration¶
Environment Variables¶
| Variable | Required | Default | Description |
|---|---|---|---|
API_KEY_PREFIX |
No | alpr_ |
Prefix for generated keys |
API_KEY_ROTATION_GRACE_HOURS |
No | 24 |
Default rotation grace period |
Database Schema¶
CREATE TABLE api_keys (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
collector_id UUID NOT NULL REFERENCES edge.collectors(id),
-- Key storage
key_hash CHAR(64) NOT NULL, -- SHA-256 hash
key_prefix VARCHAR(10) NOT NULL, -- First 10 chars for UI
-- Metadata
name VARCHAR(100), -- Optional label
-- Lifecycle
is_active BOOLEAN DEFAULT true,
created_at TIMESTAMPTZ DEFAULT NOW(),
created_by UUID NOT NULL,
expires_at TIMESTAMPTZ, -- NULL = no expiration
-- Usage tracking
last_used_at TIMESTAMPTZ,
last_used_ip VARCHAR(45),
-- Revocation
revoked_at TIMESTAMPTZ,
revoked_by UUID,
revoke_reason TEXT
);
-- Fast lookup by hash (for validation)
CREATE INDEX idx_api_keys_hash ON api_keys (key_hash) WHERE is_active = true;
-- Lookup by collector
CREATE INDEX idx_api_keys_collector ON api_keys (collector_id);
-- Find expiring keys
CREATE INDEX idx_api_keys_expires ON api_keys (expires_at)
WHERE expires_at IS NOT NULL AND is_active = true;
Related Documentation¶
- Architecture: api-key-management.md - Lifecycle decisions
- PRP: edge-provisioning-prp.md - Key provisioning flow
- PRP: global.md - Authentication patterns
Maintainer: Development Team Review Cycle: Quarterly