Edge Provisioning Patterns - Project Reference Pattern¶
Component: Edge Collector Provisioning Status: 🟡 In Development Created: 2025-12-29 Last Updated: 2025-12-29
Overview¶
Purpose¶
This PRP defines implementation patterns for edge collector provisioning, including claim code generation, device-bound credential encryption, secure credential storage, and remote wipe procedures.
Scope¶
Responsibilities: - Claim code generation and validation - Device-bound encryption for credentials - Secure credential storage on edge devices - Remote wipe/decommission procedures - Hardware ID binding
Out of Scope: - Camera adapter implementations (see camera-adapters-prp.md) - API key management (see api-key-prp.md) - Network configuration (see edge-provisioning.md)
Dependencies¶
Requires: - PostgreSQL (edge schema for collectors table) - cryptography library (Fernet encryption) - Raspberry Pi hardware (serial number for device binding)
Used By: - Central server (claim API) - Edge collector (credential storage) - Admin UI (collector management)
Quick Reference¶
When to Use This PRP¶
Use when: - Creating new collector records with claim codes - Implementing claim validation endpoint - Storing camera credentials on edge devices - Implementing remote wipe functionality - Binding credentials to specific hardware
Don't use when: - Managing API keys (see api-key-prp.md) - Implementing camera communication (see camera-adapters-prp.md)
Patterns¶
Pattern: Claim Code Generation¶
Problem: Edge devices need a secure, human-readable way to prove they are authorized to join the system without pre-burning credentials into images.
Solution: Generate short-lived, human-friendly claim codes that admin pre-provisions for each collector.
Implementation:
import secrets
from datetime import datetime, timedelta
# Alphabet excludes ambiguous characters: 0, O, I, L, 1
CLAIM_ALPHABET = "23456789ABCDEFGHJKMNPQRSTUVWXYZ"
CLAIM_CODE_EXPIRY_HOURS = 48
def generate_claim_code() -> str:
"""
Generate human-readable claim code.
Format: ALPR-XXXX-XXXX
- Excludes ambiguous chars: 0, O, I, L, 1
- ~30 bits of entropy
- Collision probability < 0.0001% with 1000 pending claims
Returns:
Claim code string like "ALPR-K7X9-M2P4"
"""
segment1 = "".join(secrets.choice(CLAIM_ALPHABET) for _ in range(4))
segment2 = "".join(secrets.choice(CLAIM_ALPHABET) for _ in range(4))
return f"ALPR-{segment1}-{segment2}"
def generate_claim_expiry() -> datetime:
"""Generate expiration timestamp for claim code."""
return datetime.utcnow() + timedelta(hours=CLAIM_CODE_EXPIRY_HOURS)
# Usage
claim_code = generate_claim_code() # "ALPR-K7X9-M2P4"
expires_at = generate_claim_expiry() # 48 hours from now
Claim Code Properties:
| Property | Value | Rationale |
|---|---|---|
| Format | ALPR-XXXX-XXXX |
Easy to read aloud, type manually |
| Expiry | 48 hours | Long enough for scheduling, short enough for security |
| Entropy | ~30 bits | Low collision probability |
| Single-use | Yes | Cleared after successful claim |
When to Use: - Creating new pending collectors in admin UI - Generating replacement codes for device re-provisioning
Trade-offs: - Pros: Human-friendly, time-limited, auditable - Cons: Requires admin action to create, not self-service
Pattern: Claim Validation¶
Problem: Edge devices present claim codes to join the system; validation must be secure and prevent abuse.
Solution: Validate claim code against pending collectors with expiration check, rate limiting, and hardware binding.
Implementation:
from datetime import datetime
from uuid import UUID
import secrets
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from fastapi import HTTPException, Depends
from pydantic import BaseModel
class ClaimRequest(BaseModel):
claim_code: str
hardware_id: str # Pi serial number
os_version: str
edge_version: str
class ClaimResponse(BaseModel):
collector_id: UUID
api_key: str # Only returned once, in plaintext
site_name: str
config: dict
async def validate_claim(
request: ClaimRequest,
client_ip: str,
db: AsyncSession
) -> ClaimResponse:
"""
Validate claim code and activate collector.
Security checks:
1. Claim code exists and is pending
2. Claim code not expired
3. Hardware ID not already registered
4. Single-use: code cleared after claim
Args:
request: Claim request with code and device info
client_ip: Request source IP for audit
db: Database session
Returns:
ClaimResponse with API key (only time it's revealed)
Raises:
HTTPException: 404 if invalid/expired, 409 if hardware already registered
"""
# 1. Find pending collector by claim code
result = await db.execute(
select(Collector)
.where(Collector.claim_code == request.claim_code.upper())
.where(Collector.status == "pending")
.where(Collector.claim_code_expires_at > datetime.utcnow())
)
collector = result.scalar_one_or_none()
if not collector:
# Don't distinguish between invalid and expired (security)
raise HTTPException(404, "Invalid or expired claim code")
# 2. Check hardware_id not already registered
existing = await db.execute(
select(Collector)
.where(Collector.hardware_id == request.hardware_id)
.where(Collector.status.in_(["active", "disabled"]))
)
if existing.scalar_one_or_none():
raise HTTPException(409, "Device already registered")
# 3. Generate API key (see api-key-prp.md for details)
api_key = secrets.token_urlsafe(32)
api_key_hash = hash_api_key(api_key) # SHA-256
# 4. Update collector
collector.status = "active"
collector.api_key_hash = api_key_hash
collector.api_key_prefix = api_key[:8]
collector.hardware_id = request.hardware_id
collector.os_version = request.os_version
collector.edge_version = request.edge_version
collector.claimed_at = datetime.utcnow()
collector.claimed_from_ip = client_ip
collector.claim_code = None # Clear after use (single-use)
collector.claim_code_expires_at = None
await db.commit()
# 5. Return API key (only time it's returned in plaintext)
return ClaimResponse(
collector_id=collector.id,
api_key=api_key,
site_name=collector.site_name,
config=await build_collector_config(collector.id, db)
)
Validation Checks:
| Check | Failure Response | Purpose |
|---|---|---|
| Code exists + pending | 404 | Prevent enumeration |
| Code not expired | 404 | Time-limited validity |
| Hardware not registered | 409 | Prevent re-use |
When to Use: - POST /api/v1/collectors/claim endpoint - Device first-boot sequence
Trade-offs: - Pros: Secure, auditable, prevents unauthorized devices - Cons: Requires admin to pre-create collectors
Pattern: Device-Bound Encryption¶
Problem: Camera credentials stored on edge device must be protected if SD card is stolen.
Solution: Derive encryption key from device-specific constants (not stored on disk) using PBKDF2.
Implementation:
import hashlib
import base64
import json
from pathlib import Path
from cryptography.fernet import Fernet
from pydantic import BaseModel
class CameraCredential(BaseModel):
camera_id: str
vendor: str # "hikvision", "unifi"
host: str
port: int
username: str
password: str
extra_config: dict = {}
def derive_device_key(
collector_id: str,
hardware_id: str,
install_timestamp: int
) -> bytes:
"""
Derive encryption key from device-specific values.
Key is not stored - regenerated on each boot from constants.
Stolen SD card cannot decrypt without knowing all three values.
Args:
collector_id: UUID assigned during claim
hardware_id: Raspberry Pi serial number
install_timestamp: Unix timestamp of first claim
Returns:
Fernet-compatible key (32 bytes, base64 encoded)
"""
key_material = f"{collector_id}:{hardware_id}:{install_timestamp}".encode()
key = hashlib.pbkdf2_hmac(
"sha256",
key_material,
salt=b"chaverim-edge-v1", # Static salt (key material is unique)
iterations=100_000
)
return base64.urlsafe_b64encode(key)
class CredentialStore:
"""Encrypted storage for camera credentials."""
def __init__(self, config_dir: Path):
self.creds_file = config_dir / ".credentials" / "cameras.enc"
self._fernet: Fernet | None = None
def initialize(
self,
collector_id: str,
hardware_id: str,
install_timestamp: int
) -> None:
"""Initialize encryption with device-specific key."""
key = derive_device_key(collector_id, hardware_id, install_timestamp)
self._fernet = Fernet(key)
def store_credentials(self, cameras: list[CameraCredential]) -> None:
"""
Encrypt and store camera credentials.
Creates credential directory with restrictive permissions.
"""
if self._fernet is None:
raise RuntimeError("CredentialStore not initialized")
plaintext = json.dumps(
[c.model_dump() for c in cameras]
).encode()
encrypted = self._fernet.encrypt(plaintext)
# Create directory with restrictive permissions
self.creds_file.parent.mkdir(mode=0o700, exist_ok=True)
self.creds_file.write_bytes(encrypted)
self.creds_file.chmod(0o600)
def load_credentials(self) -> list[CameraCredential]:
"""
Decrypt and return camera credentials.
Returns empty list if no credentials stored.
"""
if self._fernet is None:
raise RuntimeError("CredentialStore not initialized")
if not self.creds_file.exists():
return []
encrypted = self.creds_file.read_bytes()
plaintext = self._fernet.decrypt(encrypted)
data = json.loads(plaintext)
return [CameraCredential(**c) for c in data]
def clear_credentials(self) -> None:
"""Securely delete credential file."""
if self.creds_file.exists():
# Overwrite with random data before deletion
size = self.creds_file.stat().st_size
self.creds_file.write_bytes(os.urandom(size))
self.creds_file.unlink()
Key Derivation Components:
| Component | Source | Purpose |
|---|---|---|
collector_id |
Central server (via claim) | Links to specific collector record |
hardware_id |
Pi serial number | Binds to physical device |
install_timestamp |
First boot time | Adds temporal uniqueness |
Security Properties:
- Key never stored on disk
- Requires all three values to derive
- PBKDF2 with 100k iterations (slow brute force)
- Fernet (AES-128-CBC + HMAC) for encryption
When to Use: - Storing camera credentials on edge device - Any sensitive config that needs at-rest encryption
Trade-offs: - Pros: SD card useless without hardware, no key management - Cons: Credentials lost if any component changes (intentional)
Pattern: Remote Wipe¶
Problem: Compromised or decommissioned devices need secure credential and data removal.
Solution: Implement remote wipe command that securely overwrites sensitive files before deletion.
Implementation:
import os
import shutil
import subprocess
from pathlib import Path
from datetime import datetime
async def execute_wipe_command(
config_dir: Path = Path("/opt/chaverim-edge/config"),
data_dir: Path = Path("/opt/chaverim-edge/data"),
logs_dir: Path = Path("/opt/chaverim-edge/logs")
) -> dict:
"""
Securely wipe all sensitive data from edge device.
Sequence:
1. Stop collector service
2. Overwrite and delete credential files
3. Overwrite and delete SQLite buffer
4. Clear logs
5. Restart in setup mode (awaiting new claim)
Returns:
Status dict with wipe results
"""
results = {
"timestamp": datetime.utcnow().isoformat(),
"credentials_wiped": False,
"database_wiped": False,
"logs_wiped": False,
"services_reset": False
}
try:
# 1. Stop collector service
subprocess.run(
["systemctl", "stop", "chaverim-edge"],
check=True,
timeout=30
)
# 2. Securely delete credentials
creds_dir = config_dir / ".credentials"
if creds_dir.exists():
for creds_file in creds_dir.iterdir():
secure_delete(creds_file)
creds_dir.rmdir()
results["credentials_wiped"] = True
# 3. Securely delete SQLite buffer
db_path = data_dir / "events.db"
if db_path.exists():
secure_delete(db_path)
# Also delete WAL and SHM files
for suffix in ["-wal", "-shm"]:
wal_path = db_path.with_suffix(db_path.suffix + suffix)
if wal_path.exists():
secure_delete(wal_path)
results["database_wiped"] = True
# 4. Clear logs
if logs_dir.exists():
shutil.rmtree(logs_dir, ignore_errors=True)
logs_dir.mkdir(mode=0o755)
results["logs_wiped"] = True
# 5. Remove central config (keep defaults)
central_config = config_dir / "central.yaml"
if central_config.exists():
secure_delete(central_config)
# 6. Reset to setup mode
subprocess.run(
["systemctl", "start", "chaverim-setup"],
check=True,
timeout=30
)
results["services_reset"] = True
except Exception as e:
results["error"] = str(e)
return results
def secure_delete(file_path: Path) -> None:
"""
Overwrite file with random data before deletion.
Note: For SSDs, this may not be 100% effective due to wear leveling.
Full disk encryption is the ultimate protection.
"""
if not file_path.exists():
return
size = file_path.stat().st_size
if size > 0:
# Overwrite with random data
file_path.write_bytes(os.urandom(size))
file_path.unlink()
Wipe Sequence:
1. Stop collector service
↓
2. Overwrite + delete credentials
↓
3. Overwrite + delete SQLite buffer
↓
4. Delete logs
↓
5. Delete central config
↓
6. Start setup service (awaiting new claim)
What Gets Wiped:
| Path | Contents | Wipe Method |
|---|---|---|
/opt/chaverim-edge/config/.credentials/ |
Camera passwords | Overwrite + delete |
/opt/chaverim-edge/data/events.db |
Detection buffer | Overwrite + delete |
/opt/chaverim-edge/logs/ |
Application logs | Delete |
/opt/chaverim-edge/config/central.yaml |
Config with collector ID | Overwrite + delete |
When to Use: - Device decommissioning - Suspected compromise - Device relocation to different site
Trade-offs: - Pros: Removes all sensitive data, returns to clean state - Cons: SSD wear leveling may retain fragments (use FDE for critical deployments)
Pattern: Hardware ID Collection¶
Problem: Need to uniquely identify Raspberry Pi devices for binding and deduplication.
Solution:
Read Pi serial number from /proc/cpuinfo or use vcgencmd.
Implementation:
import subprocess
from pathlib import Path
def get_hardware_id() -> str:
"""
Get Raspberry Pi hardware ID (serial number).
Tries multiple methods for compatibility:
1. /proc/cpuinfo (works on Pi 3, 4, 5)
2. vcgencmd (fallback)
3. /sys/firmware/devicetree/base/serial-number
Returns:
16-character hex serial number
Raises:
RuntimeError: If no serial number found
"""
# Method 1: Parse /proc/cpuinfo
cpuinfo_path = Path("/proc/cpuinfo")
if cpuinfo_path.exists():
for line in cpuinfo_path.read_text().splitlines():
if line.startswith("Serial"):
parts = line.split(":")
if len(parts) == 2:
return parts[1].strip().lower()
# Method 2: vcgencmd
try:
result = subprocess.run(
["vcgencmd", "otp_dump"],
capture_output=True,
text=True,
timeout=5
)
for line in result.stdout.splitlines():
if line.startswith("28:"):
return line.split(":")[1].strip().lower()
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
# Method 3: Device tree
dt_serial = Path("/sys/firmware/devicetree/base/serial-number")
if dt_serial.exists():
serial = dt_serial.read_bytes().rstrip(b"\x00").decode()
return serial.lower()
raise RuntimeError("Could not determine hardware ID")
def validate_hardware_id(hardware_id: str) -> bool:
"""
Validate hardware ID format.
Valid format: 16 hex characters (Pi serial number)
"""
if len(hardware_id) != 16:
return False
try:
int(hardware_id, 16)
return True
except ValueError:
return False
Hardware ID Properties:
| Property | Value |
|---|---|
| Format | 16-character hexadecimal |
| Source | CPU serial number |
| Persistence | Survives OS reinstall |
| Uniqueness | Factory-assigned, globally unique |
When to Use: - During claim process (hardware binding) - Key derivation for credential encryption - Device identification in logs
Anti-Patterns¶
Anti-Pattern: Plaintext Credentials¶
Problem: Storing camera passwords in plaintext config files.
Example of BAD code:
# BAD: /opt/chaverim-edge/config/cameras.yaml
cameras:
- host: 192.168.1.100
username: admin
password: secretpassword123 # Plaintext!
Why It's Wrong: - SD card theft exposes all camera passwords - No protection at rest - Passwords may end up in backups, logs
Correct Approach:
# GOOD: Use CredentialStore for encrypted storage
store = CredentialStore(config_dir)
store.initialize(collector_id, hardware_id, install_timestamp)
store.store_credentials(cameras) # Encrypted with device-bound key
Anti-Pattern: Long-Lived Claim Codes¶
Problem: Claim codes that never expire or have very long validity periods.
Example of BAD code:
# BAD: No expiration
def generate_claim_code() -> str:
return f"ALPR-{random_segment()}-{random_segment()}"
# No expiry set!
Why It's Wrong: - Lost/shared codes remain valid indefinitely - No pressure to complete provisioning - Security window stays open
Correct Approach:
# GOOD: Short-lived codes with expiration
collector.claim_code = generate_claim_code()
collector.claim_code_expires_at = datetime.utcnow() + timedelta(hours=48)
Anti-Pattern: Storing Encryption Key¶
Problem: Saving the device encryption key to a file on the SD card.
Example of BAD code:
# BAD: Key stored on disk
key = derive_device_key(collector_id, hardware_id, timestamp)
Path("/opt/chaverim-edge/.key").write_bytes(key) # Don't do this!
Why It's Wrong: - Key and encrypted data on same storage - SD card theft = full compromise - Defeats purpose of device binding
Correct Approach:
# GOOD: Derive key at runtime, never store
key = derive_device_key(collector_id, hardware_id, timestamp)
# Use immediately, never persist
fernet = Fernet(key)
Testing Strategies¶
Unit Testing¶
import pytest
from datetime import datetime, timedelta
def test_claim_code_format():
"""Test claim code follows expected format."""
code = generate_claim_code()
assert code.startswith("ALPR-")
assert len(code) == 14 # ALPR-XXXX-XXXX
parts = code.split("-")
assert len(parts) == 3
def test_claim_code_no_ambiguous_chars():
"""Test claim code excludes ambiguous characters."""
for _ in range(100):
code = generate_claim_code()
for char in "0OIL1":
assert char not in code
def test_derive_device_key_deterministic():
"""Test key derivation is deterministic."""
key1 = derive_device_key("coll-123", "abc123", 1704067200)
key2 = derive_device_key("coll-123", "abc123", 1704067200)
assert key1 == key2
def test_derive_device_key_different_inputs():
"""Test different inputs produce different keys."""
key1 = derive_device_key("coll-123", "abc123", 1704067200)
key2 = derive_device_key("coll-456", "abc123", 1704067200)
assert key1 != key2
def test_credential_store_roundtrip(tmp_path):
"""Test credentials can be stored and retrieved."""
store = CredentialStore(tmp_path)
store.initialize("coll-123", "abc123", 1704067200)
creds = [
CameraCredential(
camera_id="cam-1",
vendor="hikvision",
host="192.168.1.100",
port=80,
username="admin",
password="secret"
)
]
store.store_credentials(creds)
loaded = store.load_credentials()
assert len(loaded) == 1
assert loaded[0].password == "secret"
Integration Testing¶
@pytest.fixture
def pending_collector(db_session):
"""Create pending collector with claim code."""
collector = Collector(
site_name="Test Site",
claim_code=generate_claim_code(),
claim_code_expires_at=generate_claim_expiry(),
status="pending"
)
db_session.add(collector)
db_session.commit()
return collector
async def test_claim_success(client, pending_collector):
"""Test successful claim activates collector."""
response = await client.post("/api/v1/collectors/claim", json={
"claim_code": pending_collector.claim_code,
"hardware_id": "0000000012345678",
"os_version": "Raspbian 12",
"edge_version": "1.0.0"
})
assert response.status_code == 200
data = response.json()
assert "api_key" in data
assert data["site_name"] == "Test Site"
async def test_claim_expired_code(client, db_session):
"""Test expired claim code is rejected."""
collector = Collector(
site_name="Expired Site",
claim_code="ALPR-XXXX-YYYY",
claim_code_expires_at=datetime.utcnow() - timedelta(hours=1),
status="pending"
)
db_session.add(collector)
db_session.commit()
response = await client.post("/api/v1/collectors/claim", json={
"claim_code": "ALPR-XXXX-YYYY",
"hardware_id": "0000000012345678",
"os_version": "Raspbian 12",
"edge_version": "1.0.0"
})
assert response.status_code == 404
Configuration¶
Environment Variables¶
| Variable | Required | Default | Description |
|---|---|---|---|
CLAIM_CODE_EXPIRY_HOURS |
No | 48 |
Claim code validity period |
PBKDF2_ITERATIONS |
No | 100000 |
Key derivation iterations |
CREDENTIALS_DIR |
No | /opt/chaverim-edge/config/.credentials |
Credential storage path |
Collector Database Table¶
CREATE TABLE edge.collectors (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
-- Identity
site_name VARCHAR(100) NOT NULL,
site_address VARCHAR(255),
gps_latitude DECIMAL(10, 8),
gps_longitude DECIMAL(11, 8),
-- Claim lifecycle
claim_code VARCHAR(20) UNIQUE,
claim_code_expires_at TIMESTAMPTZ,
claimed_at TIMESTAMPTZ,
claimed_from_ip INET,
-- Authentication
api_key_hash VARCHAR(255), -- SHA-256 hash
api_key_prefix VARCHAR(8), -- First 8 chars for identification
-- Device info (populated after claim)
hardware_id VARCHAR(100), -- Pi serial number
os_version VARCHAR(50),
edge_version VARCHAR(20),
-- Status
status VARCHAR(20) DEFAULT 'pending',
last_heartbeat_at TIMESTAMPTZ,
config_version INTEGER DEFAULT 1,
-- Audit
created_at TIMESTAMPTZ DEFAULT NOW(),
created_by UUID REFERENCES auth.users(id),
updated_at TIMESTAMPTZ DEFAULT NOW(),
CONSTRAINT valid_status CHECK (
status IN ('pending', 'active', 'disabled', 'decommissioned')
)
);
CREATE INDEX idx_collectors_claim_code ON edge.collectors(claim_code)
WHERE claim_code IS NOT NULL;
CREATE INDEX idx_collectors_api_key_prefix ON edge.collectors(api_key_prefix);
CREATE INDEX idx_collectors_status ON edge.collectors(status);
Related Documentation¶
- Architecture: edge-provisioning.md - Provisioning workflow decisions
- PRP: api-key-prp.md - API key generation patterns
- PRP: camera-adapters-prp.md - Camera connection patterns
- PRP: database-prp.md - Schema namespacing
Maintainer: Development Team Review Cycle: Quarterly