Skip to content

Edge Provisioning Patterns - Project Reference Pattern

Component: Edge Collector Provisioning Status: 🟡 In Development Created: 2025-12-29 Last Updated: 2025-12-29


Overview

Purpose

This PRP defines implementation patterns for edge collector provisioning, including claim code generation, device-bound credential encryption, secure credential storage, and remote wipe procedures.

Scope

Responsibilities: - Claim code generation and validation - Device-bound encryption for credentials - Secure credential storage on edge devices - Remote wipe/decommission procedures - Hardware ID binding

Out of Scope: - Camera adapter implementations (see camera-adapters-prp.md) - API key management (see api-key-prp.md) - Network configuration (see edge-provisioning.md)

Dependencies

Requires: - PostgreSQL (edge schema for collectors table) - cryptography library (Fernet encryption) - Raspberry Pi hardware (serial number for device binding)

Used By: - Central server (claim API) - Edge collector (credential storage) - Admin UI (collector management)


Quick Reference

When to Use This PRP

Use when: - Creating new collector records with claim codes - Implementing claim validation endpoint - Storing camera credentials on edge devices - Implementing remote wipe functionality - Binding credentials to specific hardware

Don't use when: - Managing API keys (see api-key-prp.md) - Implementing camera communication (see camera-adapters-prp.md)


Patterns

Pattern: Claim Code Generation

Problem: Edge devices need a secure, human-readable way to prove they are authorized to join the system without pre-burning credentials into images.

Solution: Generate short-lived, human-friendly claim codes that admin pre-provisions for each collector.

Implementation:

import secrets
from datetime import datetime, timedelta

# Alphabet excludes ambiguous characters: 0, O, I, L, 1
CLAIM_ALPHABET = "23456789ABCDEFGHJKMNPQRSTUVWXYZ"
CLAIM_CODE_EXPIRY_HOURS = 48


def generate_claim_code() -> str:
    """
    Generate human-readable claim code.

    Format: ALPR-XXXX-XXXX
    - Excludes ambiguous chars: 0, O, I, L, 1
    - ~30 bits of entropy
    - Collision probability < 0.0001% with 1000 pending claims

    Returns:
        Claim code string like "ALPR-K7X9-M2P4"
    """
    segment1 = "".join(secrets.choice(CLAIM_ALPHABET) for _ in range(4))
    segment2 = "".join(secrets.choice(CLAIM_ALPHABET) for _ in range(4))
    return f"ALPR-{segment1}-{segment2}"


def generate_claim_expiry() -> datetime:
    """Generate expiration timestamp for claim code."""
    return datetime.utcnow() + timedelta(hours=CLAIM_CODE_EXPIRY_HOURS)


# Usage
claim_code = generate_claim_code()  # "ALPR-K7X9-M2P4"
expires_at = generate_claim_expiry()  # 48 hours from now

Claim Code Properties:

Property Value Rationale
Format ALPR-XXXX-XXXX Easy to read aloud, type manually
Expiry 48 hours Long enough for scheduling, short enough for security
Entropy ~30 bits Low collision probability
Single-use Yes Cleared after successful claim

When to Use: - Creating new pending collectors in admin UI - Generating replacement codes for device re-provisioning

Trade-offs: - Pros: Human-friendly, time-limited, auditable - Cons: Requires admin action to create, not self-service


Pattern: Claim Validation

Problem: Edge devices present claim codes to join the system; validation must be secure and prevent abuse.

Solution: Validate claim code against pending collectors with expiration check, rate limiting, and hardware binding.

Implementation:

from datetime import datetime
from uuid import UUID
import secrets

from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from fastapi import HTTPException, Depends
from pydantic import BaseModel


class ClaimRequest(BaseModel):
    claim_code: str
    hardware_id: str  # Pi serial number
    os_version: str
    edge_version: str


class ClaimResponse(BaseModel):
    collector_id: UUID
    api_key: str  # Only returned once, in plaintext
    site_name: str
    config: dict


async def validate_claim(
    request: ClaimRequest,
    client_ip: str,
    db: AsyncSession
) -> ClaimResponse:
    """
    Validate claim code and activate collector.

    Security checks:
    1. Claim code exists and is pending
    2. Claim code not expired
    3. Hardware ID not already registered
    4. Single-use: code cleared after claim

    Args:
        request: Claim request with code and device info
        client_ip: Request source IP for audit
        db: Database session

    Returns:
        ClaimResponse with API key (only time it's revealed)

    Raises:
        HTTPException: 404 if invalid/expired, 409 if hardware already registered
    """
    # 1. Find pending collector by claim code
    result = await db.execute(
        select(Collector)
        .where(Collector.claim_code == request.claim_code.upper())
        .where(Collector.status == "pending")
        .where(Collector.claim_code_expires_at > datetime.utcnow())
    )
    collector = result.scalar_one_or_none()

    if not collector:
        # Don't distinguish between invalid and expired (security)
        raise HTTPException(404, "Invalid or expired claim code")

    # 2. Check hardware_id not already registered
    existing = await db.execute(
        select(Collector)
        .where(Collector.hardware_id == request.hardware_id)
        .where(Collector.status.in_(["active", "disabled"]))
    )
    if existing.scalar_one_or_none():
        raise HTTPException(409, "Device already registered")

    # 3. Generate API key (see api-key-prp.md for details)
    api_key = secrets.token_urlsafe(32)
    api_key_hash = hash_api_key(api_key)  # SHA-256

    # 4. Update collector
    collector.status = "active"
    collector.api_key_hash = api_key_hash
    collector.api_key_prefix = api_key[:8]
    collector.hardware_id = request.hardware_id
    collector.os_version = request.os_version
    collector.edge_version = request.edge_version
    collector.claimed_at = datetime.utcnow()
    collector.claimed_from_ip = client_ip
    collector.claim_code = None  # Clear after use (single-use)
    collector.claim_code_expires_at = None

    await db.commit()

    # 5. Return API key (only time it's returned in plaintext)
    return ClaimResponse(
        collector_id=collector.id,
        api_key=api_key,
        site_name=collector.site_name,
        config=await build_collector_config(collector.id, db)
    )

Validation Checks:

Check Failure Response Purpose
Code exists + pending 404 Prevent enumeration
Code not expired 404 Time-limited validity
Hardware not registered 409 Prevent re-use

When to Use: - POST /api/v1/collectors/claim endpoint - Device first-boot sequence

Trade-offs: - Pros: Secure, auditable, prevents unauthorized devices - Cons: Requires admin to pre-create collectors


Pattern: Device-Bound Encryption

Problem: Camera credentials stored on edge device must be protected if SD card is stolen.

Solution: Derive encryption key from device-specific constants (not stored on disk) using PBKDF2.

Implementation:

import hashlib
import base64
import json
from pathlib import Path
from cryptography.fernet import Fernet
from pydantic import BaseModel


class CameraCredential(BaseModel):
    camera_id: str
    vendor: str  # "hikvision", "unifi"
    host: str
    port: int
    username: str
    password: str
    extra_config: dict = {}


def derive_device_key(
    collector_id: str,
    hardware_id: str,
    install_timestamp: int
) -> bytes:
    """
    Derive encryption key from device-specific values.

    Key is not stored - regenerated on each boot from constants.
    Stolen SD card cannot decrypt without knowing all three values.

    Args:
        collector_id: UUID assigned during claim
        hardware_id: Raspberry Pi serial number
        install_timestamp: Unix timestamp of first claim

    Returns:
        Fernet-compatible key (32 bytes, base64 encoded)
    """
    key_material = f"{collector_id}:{hardware_id}:{install_timestamp}".encode()

    key = hashlib.pbkdf2_hmac(
        "sha256",
        key_material,
        salt=b"chaverim-edge-v1",  # Static salt (key material is unique)
        iterations=100_000
    )

    return base64.urlsafe_b64encode(key)


class CredentialStore:
    """Encrypted storage for camera credentials."""

    def __init__(self, config_dir: Path):
        self.creds_file = config_dir / ".credentials" / "cameras.enc"
        self._fernet: Fernet | None = None

    def initialize(
        self,
        collector_id: str,
        hardware_id: str,
        install_timestamp: int
    ) -> None:
        """Initialize encryption with device-specific key."""
        key = derive_device_key(collector_id, hardware_id, install_timestamp)
        self._fernet = Fernet(key)

    def store_credentials(self, cameras: list[CameraCredential]) -> None:
        """
        Encrypt and store camera credentials.

        Creates credential directory with restrictive permissions.
        """
        if self._fernet is None:
            raise RuntimeError("CredentialStore not initialized")

        plaintext = json.dumps(
            [c.model_dump() for c in cameras]
        ).encode()
        encrypted = self._fernet.encrypt(plaintext)

        # Create directory with restrictive permissions
        self.creds_file.parent.mkdir(mode=0o700, exist_ok=True)
        self.creds_file.write_bytes(encrypted)
        self.creds_file.chmod(0o600)

    def load_credentials(self) -> list[CameraCredential]:
        """
        Decrypt and return camera credentials.

        Returns empty list if no credentials stored.
        """
        if self._fernet is None:
            raise RuntimeError("CredentialStore not initialized")

        if not self.creds_file.exists():
            return []

        encrypted = self.creds_file.read_bytes()
        plaintext = self._fernet.decrypt(encrypted)
        data = json.loads(plaintext)
        return [CameraCredential(**c) for c in data]

    def clear_credentials(self) -> None:
        """Securely delete credential file."""
        if self.creds_file.exists():
            # Overwrite with random data before deletion
            size = self.creds_file.stat().st_size
            self.creds_file.write_bytes(os.urandom(size))
            self.creds_file.unlink()

Key Derivation Components:

Component Source Purpose
collector_id Central server (via claim) Links to specific collector record
hardware_id Pi serial number Binds to physical device
install_timestamp First boot time Adds temporal uniqueness

Security Properties:

  • Key never stored on disk
  • Requires all three values to derive
  • PBKDF2 with 100k iterations (slow brute force)
  • Fernet (AES-128-CBC + HMAC) for encryption

When to Use: - Storing camera credentials on edge device - Any sensitive config that needs at-rest encryption

Trade-offs: - Pros: SD card useless without hardware, no key management - Cons: Credentials lost if any component changes (intentional)


Pattern: Remote Wipe

Problem: Compromised or decommissioned devices need secure credential and data removal.

Solution: Implement remote wipe command that securely overwrites sensitive files before deletion.

Implementation:

import os
import shutil
import subprocess
from pathlib import Path
from datetime import datetime


async def execute_wipe_command(
    config_dir: Path = Path("/opt/chaverim-edge/config"),
    data_dir: Path = Path("/opt/chaverim-edge/data"),
    logs_dir: Path = Path("/opt/chaverim-edge/logs")
) -> dict:
    """
    Securely wipe all sensitive data from edge device.

    Sequence:
    1. Stop collector service
    2. Overwrite and delete credential files
    3. Overwrite and delete SQLite buffer
    4. Clear logs
    5. Restart in setup mode (awaiting new claim)

    Returns:
        Status dict with wipe results
    """
    results = {
        "timestamp": datetime.utcnow().isoformat(),
        "credentials_wiped": False,
        "database_wiped": False,
        "logs_wiped": False,
        "services_reset": False
    }

    try:
        # 1. Stop collector service
        subprocess.run(
            ["systemctl", "stop", "chaverim-edge"],
            check=True,
            timeout=30
        )

        # 2. Securely delete credentials
        creds_dir = config_dir / ".credentials"
        if creds_dir.exists():
            for creds_file in creds_dir.iterdir():
                secure_delete(creds_file)
            creds_dir.rmdir()
            results["credentials_wiped"] = True

        # 3. Securely delete SQLite buffer
        db_path = data_dir / "events.db"
        if db_path.exists():
            secure_delete(db_path)
            # Also delete WAL and SHM files
            for suffix in ["-wal", "-shm"]:
                wal_path = db_path.with_suffix(db_path.suffix + suffix)
                if wal_path.exists():
                    secure_delete(wal_path)
            results["database_wiped"] = True

        # 4. Clear logs
        if logs_dir.exists():
            shutil.rmtree(logs_dir, ignore_errors=True)
            logs_dir.mkdir(mode=0o755)
            results["logs_wiped"] = True

        # 5. Remove central config (keep defaults)
        central_config = config_dir / "central.yaml"
        if central_config.exists():
            secure_delete(central_config)

        # 6. Reset to setup mode
        subprocess.run(
            ["systemctl", "start", "chaverim-setup"],
            check=True,
            timeout=30
        )
        results["services_reset"] = True

    except Exception as e:
        results["error"] = str(e)

    return results


def secure_delete(file_path: Path) -> None:
    """
    Overwrite file with random data before deletion.

    Note: For SSDs, this may not be 100% effective due to wear leveling.
    Full disk encryption is the ultimate protection.
    """
    if not file_path.exists():
        return

    size = file_path.stat().st_size
    if size > 0:
        # Overwrite with random data
        file_path.write_bytes(os.urandom(size))

    file_path.unlink()

Wipe Sequence:

1. Stop collector service
       ↓
2. Overwrite + delete credentials
       ↓
3. Overwrite + delete SQLite buffer
       ↓
4. Delete logs
       ↓
5. Delete central config
       ↓
6. Start setup service (awaiting new claim)

What Gets Wiped:

Path Contents Wipe Method
/opt/chaverim-edge/config/.credentials/ Camera passwords Overwrite + delete
/opt/chaverim-edge/data/events.db Detection buffer Overwrite + delete
/opt/chaverim-edge/logs/ Application logs Delete
/opt/chaverim-edge/config/central.yaml Config with collector ID Overwrite + delete

When to Use: - Device decommissioning - Suspected compromise - Device relocation to different site

Trade-offs: - Pros: Removes all sensitive data, returns to clean state - Cons: SSD wear leveling may retain fragments (use FDE for critical deployments)


Pattern: Hardware ID Collection

Problem: Need to uniquely identify Raspberry Pi devices for binding and deduplication.

Solution: Read Pi serial number from /proc/cpuinfo or use vcgencmd.

Implementation:

import subprocess
from pathlib import Path


def get_hardware_id() -> str:
    """
    Get Raspberry Pi hardware ID (serial number).

    Tries multiple methods for compatibility:
    1. /proc/cpuinfo (works on Pi 3, 4, 5)
    2. vcgencmd (fallback)
    3. /sys/firmware/devicetree/base/serial-number

    Returns:
        16-character hex serial number

    Raises:
        RuntimeError: If no serial number found
    """
    # Method 1: Parse /proc/cpuinfo
    cpuinfo_path = Path("/proc/cpuinfo")
    if cpuinfo_path.exists():
        for line in cpuinfo_path.read_text().splitlines():
            if line.startswith("Serial"):
                parts = line.split(":")
                if len(parts) == 2:
                    return parts[1].strip().lower()

    # Method 2: vcgencmd
    try:
        result = subprocess.run(
            ["vcgencmd", "otp_dump"],
            capture_output=True,
            text=True,
            timeout=5
        )
        for line in result.stdout.splitlines():
            if line.startswith("28:"):
                return line.split(":")[1].strip().lower()
    except (subprocess.TimeoutExpired, FileNotFoundError):
        pass

    # Method 3: Device tree
    dt_serial = Path("/sys/firmware/devicetree/base/serial-number")
    if dt_serial.exists():
        serial = dt_serial.read_bytes().rstrip(b"\x00").decode()
        return serial.lower()

    raise RuntimeError("Could not determine hardware ID")


def validate_hardware_id(hardware_id: str) -> bool:
    """
    Validate hardware ID format.

    Valid format: 16 hex characters (Pi serial number)
    """
    if len(hardware_id) != 16:
        return False
    try:
        int(hardware_id, 16)
        return True
    except ValueError:
        return False

Hardware ID Properties:

Property Value
Format 16-character hexadecimal
Source CPU serial number
Persistence Survives OS reinstall
Uniqueness Factory-assigned, globally unique

When to Use: - During claim process (hardware binding) - Key derivation for credential encryption - Device identification in logs


Anti-Patterns

Anti-Pattern: Plaintext Credentials

Problem: Storing camera passwords in plaintext config files.

Example of BAD code:

# BAD: /opt/chaverim-edge/config/cameras.yaml
cameras:
  - host: 192.168.1.100
    username: admin
    password: secretpassword123  # Plaintext!

Why It's Wrong: - SD card theft exposes all camera passwords - No protection at rest - Passwords may end up in backups, logs

Correct Approach:

# GOOD: Use CredentialStore for encrypted storage
store = CredentialStore(config_dir)
store.initialize(collector_id, hardware_id, install_timestamp)
store.store_credentials(cameras)  # Encrypted with device-bound key

Anti-Pattern: Long-Lived Claim Codes

Problem: Claim codes that never expire or have very long validity periods.

Example of BAD code:

# BAD: No expiration
def generate_claim_code() -> str:
    return f"ALPR-{random_segment()}-{random_segment()}"
    # No expiry set!

Why It's Wrong: - Lost/shared codes remain valid indefinitely - No pressure to complete provisioning - Security window stays open

Correct Approach:

# GOOD: Short-lived codes with expiration
collector.claim_code = generate_claim_code()
collector.claim_code_expires_at = datetime.utcnow() + timedelta(hours=48)

Anti-Pattern: Storing Encryption Key

Problem: Saving the device encryption key to a file on the SD card.

Example of BAD code:

# BAD: Key stored on disk
key = derive_device_key(collector_id, hardware_id, timestamp)
Path("/opt/chaverim-edge/.key").write_bytes(key)  # Don't do this!

Why It's Wrong: - Key and encrypted data on same storage - SD card theft = full compromise - Defeats purpose of device binding

Correct Approach:

# GOOD: Derive key at runtime, never store
key = derive_device_key(collector_id, hardware_id, timestamp)
# Use immediately, never persist
fernet = Fernet(key)

Testing Strategies

Unit Testing

import pytest
from datetime import datetime, timedelta


def test_claim_code_format():
    """Test claim code follows expected format."""
    code = generate_claim_code()
    assert code.startswith("ALPR-")
    assert len(code) == 14  # ALPR-XXXX-XXXX
    parts = code.split("-")
    assert len(parts) == 3


def test_claim_code_no_ambiguous_chars():
    """Test claim code excludes ambiguous characters."""
    for _ in range(100):
        code = generate_claim_code()
        for char in "0OIL1":
            assert char not in code


def test_derive_device_key_deterministic():
    """Test key derivation is deterministic."""
    key1 = derive_device_key("coll-123", "abc123", 1704067200)
    key2 = derive_device_key("coll-123", "abc123", 1704067200)
    assert key1 == key2


def test_derive_device_key_different_inputs():
    """Test different inputs produce different keys."""
    key1 = derive_device_key("coll-123", "abc123", 1704067200)
    key2 = derive_device_key("coll-456", "abc123", 1704067200)
    assert key1 != key2


def test_credential_store_roundtrip(tmp_path):
    """Test credentials can be stored and retrieved."""
    store = CredentialStore(tmp_path)
    store.initialize("coll-123", "abc123", 1704067200)

    creds = [
        CameraCredential(
            camera_id="cam-1",
            vendor="hikvision",
            host="192.168.1.100",
            port=80,
            username="admin",
            password="secret"
        )
    ]

    store.store_credentials(creds)
    loaded = store.load_credentials()

    assert len(loaded) == 1
    assert loaded[0].password == "secret"

Integration Testing

@pytest.fixture
def pending_collector(db_session):
    """Create pending collector with claim code."""
    collector = Collector(
        site_name="Test Site",
        claim_code=generate_claim_code(),
        claim_code_expires_at=generate_claim_expiry(),
        status="pending"
    )
    db_session.add(collector)
    db_session.commit()
    return collector


async def test_claim_success(client, pending_collector):
    """Test successful claim activates collector."""
    response = await client.post("/api/v1/collectors/claim", json={
        "claim_code": pending_collector.claim_code,
        "hardware_id": "0000000012345678",
        "os_version": "Raspbian 12",
        "edge_version": "1.0.0"
    })

    assert response.status_code == 200
    data = response.json()
    assert "api_key" in data
    assert data["site_name"] == "Test Site"


async def test_claim_expired_code(client, db_session):
    """Test expired claim code is rejected."""
    collector = Collector(
        site_name="Expired Site",
        claim_code="ALPR-XXXX-YYYY",
        claim_code_expires_at=datetime.utcnow() - timedelta(hours=1),
        status="pending"
    )
    db_session.add(collector)
    db_session.commit()

    response = await client.post("/api/v1/collectors/claim", json={
        "claim_code": "ALPR-XXXX-YYYY",
        "hardware_id": "0000000012345678",
        "os_version": "Raspbian 12",
        "edge_version": "1.0.0"
    })

    assert response.status_code == 404

Configuration

Environment Variables

Variable Required Default Description
CLAIM_CODE_EXPIRY_HOURS No 48 Claim code validity period
PBKDF2_ITERATIONS No 100000 Key derivation iterations
CREDENTIALS_DIR No /opt/chaverim-edge/config/.credentials Credential storage path

Collector Database Table

CREATE TABLE edge.collectors (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),

    -- Identity
    site_name VARCHAR(100) NOT NULL,
    site_address VARCHAR(255),
    gps_latitude DECIMAL(10, 8),
    gps_longitude DECIMAL(11, 8),

    -- Claim lifecycle
    claim_code VARCHAR(20) UNIQUE,
    claim_code_expires_at TIMESTAMPTZ,
    claimed_at TIMESTAMPTZ,
    claimed_from_ip INET,

    -- Authentication
    api_key_hash VARCHAR(255),              -- SHA-256 hash
    api_key_prefix VARCHAR(8),              -- First 8 chars for identification

    -- Device info (populated after claim)
    hardware_id VARCHAR(100),               -- Pi serial number
    os_version VARCHAR(50),
    edge_version VARCHAR(20),

    -- Status
    status VARCHAR(20) DEFAULT 'pending',
    last_heartbeat_at TIMESTAMPTZ,
    config_version INTEGER DEFAULT 1,

    -- Audit
    created_at TIMESTAMPTZ DEFAULT NOW(),
    created_by UUID REFERENCES auth.users(id),
    updated_at TIMESTAMPTZ DEFAULT NOW(),

    CONSTRAINT valid_status CHECK (
        status IN ('pending', 'active', 'disabled', 'decommissioned')
    )
);

CREATE INDEX idx_collectors_claim_code ON edge.collectors(claim_code)
    WHERE claim_code IS NOT NULL;
CREATE INDEX idx_collectors_api_key_prefix ON edge.collectors(api_key_prefix);
CREATE INDEX idx_collectors_status ON edge.collectors(status);


Maintainer: Development Team Review Cycle: Quarterly