Skip to content

Ingest Service - Project Reference Pattern

Component: Unified Ingest Service Status: 🟡 In Development Created: 2025-12-30 Last Updated: 2025-12-30


Overview

Purpose

The ingest service handles camera Detection ingestion for the ALPR platform. It is a single application that runs in two modes:

  • Edge mode: Deployed on Raspberry Pi at each site, connects to local cameras
  • Central mode: Deployed on central infrastructure as fallback for sites without edge devices

This unified design ensures consistent behavior and shared code between deployment scenarios.

Scope

Responsibilities: - Connect to cameras via vendor adapters (pull and/or push modes) - Normalize vendor-specific detections to Detection format - Buffer detections locally (edge mode) - Upload detections to central server

Out of Scope: - Detection processing and storage (central main app responsibility) - Alert matching and notifications (central main app responsibility) - Camera discovery and provisioning (see edge-provisioning-prp)

Dependencies

Requires: - Camera vendor APIs (Hikvision ISAPI, Unifi Protect) - Central server API (/api/v1/detections/batch) - SQLite (edge mode buffer)

Used By: - Central main application (receives uploaded detections) - Site administrators (configure cameras)


Quick Reference

When to Use This Component

Use when: - Deploying camera ingestion at a new site (edge mode) - Setting up fallback ingestion for edgeless sites (central mode) - Adding support for a new camera vendor

Don't use when: - Processing detections after ingestion (use central app) - Managing user authentication (use central app) - Configuring alerting rules (use central app)

Key Patterns

  1. Layered Config: Mode defaults → ingest.yaml → cameras.yaml (edge only)
  2. Dual-Mode Operation: Single codebase, behavior controlled by --mode flag
  3. Adapter Pattern: Vendor-specific adapters with common interface
  4. Vendor Routing: Webhook URL path determines adapter for push mode

Implementation Task List

Setup & Configuration

  • [ ] Create config loader with mode defaults
  • [ ] Implement YAML config parsing with env var expansion
  • [ ] Add config validation with Pydantic
  • [ ] Support layered config loading

Core Implementation

  • [ ] Implement abstract CameraAdapter base class
  • [ ] Create Hikvision adapter (pull + push)
  • [ ] Create Unifi adapter (pull + push)
  • [ ] Implement adapter factory with vendor routing
  • [ ] Create webhook receiver (FastAPI)
  • [ ] Implement SQLite buffer with retention policy
  • [ ] Create HTTPS uploader with retry logic

Error Handling

  • [ ] Adapter connection failures with exponential backoff
  • [ ] Malformed detection parsing (log and continue)
  • [ ] Upload failures with local buffering
  • [ ] Config validation errors with clear messages

Security

  • [ ] API key authentication for uploads
  • [ ] Webhook authentication (central mode)
  • [ ] Credential storage via environment variables
  • [ ] TLS for all external connections

Testing

  • [ ] Unit tests for config loading
  • [ ] Unit tests for each adapter
  • [ ] Unit tests for normalizer
  • [ ] Integration tests with mock cameras
  • [ ] Integration tests for buffer/uploader

Documentation

  • [ ] Update this PRP with implementation details
  • [ ] Document camera configuration examples
  • [ ] Create deployment runbooks

Patterns

Pattern: Layered Configuration

Problem: Different deployment modes require different default settings, but operators should be able to override any setting.

Solution: Apply configuration in layers: mode defaults → config file → environment variables.

Implementation:

from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
import os
import yaml

MODE_DEFAULTS = {
    "edge": {
        "buffer": {"enabled": True, "retention_days": 7},
        "pull_adapters": {"enabled": True},
    },
    "central": {
        "buffer": {"enabled": False},
        "pull_adapters": {"enabled": False},
    },
}

def deep_merge(base: dict, override: dict) -> dict:
    """Merge override into base, recursively for nested dicts."""
    result = base.copy()
    for key, value in override.items():
        if key in result and isinstance(result[key], dict) and isinstance(value, dict):
            result[key] = deep_merge(result[key], value)
        else:
            result[key] = value
    return result

def expand_env_vars(config: dict) -> dict:
    """Recursively expand ${VAR} references in string values."""
    result = {}
    for key, value in config.items():
        if isinstance(value, dict):
            result[key] = expand_env_vars(value)
        elif isinstance(value, str) and value.startswith("${") and value.endswith("}"):
            env_var = value[2:-1]
            result[key] = os.environ.get(env_var, "")
        else:
            result[key] = value
    return result

def load_config(config_path: Path) -> dict:
    """Load configuration with layered defaults."""
    # 1. Load base config
    with open(config_path / "ingest.yaml") as f:
        config = yaml.safe_load(f)

    # 2. Apply mode defaults (config values override defaults)
    mode = config.get("mode", "edge")
    if mode not in MODE_DEFAULTS:
        raise ValueError(f"Invalid mode: {mode}. Must be 'edge' or 'central'")
    config = deep_merge(MODE_DEFAULTS[mode], config)

    # 3. Load cameras.yaml if edge mode
    if mode == "edge":
        cameras_file = config_path / "cameras.yaml"
        if cameras_file.exists():
            with open(cameras_file) as f:
                config["cameras"] = yaml.safe_load(f).get("cameras", [])

    # 4. Expand environment variables
    config = expand_env_vars(config)

    return config

When to Use: - Service initialization - Config reload on SIGHUP

Trade-offs: - Pros: Flexible, operator-friendly, mode-appropriate defaults - Cons: Slightly complex config loading logic


Pattern: Dual-Mode Service

Problem: Edge and central deployments need different behavior but should share code.

Solution: Single entry point with --mode flag that controls which components are enabled.

Implementation:

import argparse
import asyncio
from pathlib import Path

async def run_edge_mode(config: dict):
    """Run ingest service in edge mode."""
    # Start camera adapters (pull mode)
    adapters = []
    for camera_config in config.get("cameras", []):
        adapter = create_adapter(camera_config)
        adapters.append(adapter)

    # Start webhook receiver (push mode)
    receiver = WebhookReceiver(config["receiver"])

    # Start buffer and uploader
    buffer = DetectionBuffer(config["buffer"])
    uploader = DetectionUploader(config["upload"], buffer)

    # Run all components
    await asyncio.gather(
        *[adapter.run() for adapter in adapters],
        receiver.run(),
        uploader.run(),
    )

async def run_central_mode(config: dict):
    """Run ingest service in central mode."""
    # No camera adapters - only webhook receiver
    receiver = WebhookReceiver(config["receiver"])

    # No buffer - upload immediately
    uploader = DirectUploader(config["upload"])

    # Connect receiver directly to uploader
    receiver.on_detection = uploader.upload

    await receiver.run()

def main():
    parser = argparse.ArgumentParser(description="ALPR Ingest Service")
    parser.add_argument("--mode", choices=["edge", "central"], required=True)
    parser.add_argument("--config", type=Path, default=Path("/etc/alpr"))
    args = parser.parse_args()

    config = load_config(args.config)

    if args.mode == "edge":
        asyncio.run(run_edge_mode(config))
    else:
        asyncio.run(run_central_mode(config))

if __name__ == "__main__":
    main()

When to Use: - Always - this is the service entry point

Trade-offs: - Pros: Single codebase, consistent behavior, shared testing - Cons: Must ensure mode-specific code paths are well tested


Pattern: Vendor Adapter Interface

Problem: Different camera vendors have different protocols, but the rest of the system needs a consistent interface.

Solution: Abstract base class with pull and push methods that each vendor implements.

Implementation:

from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
from typing import AsyncIterator

@dataclass
class Detection:
    """Normalized detection."""
    detection_id: str
    timestamp: datetime
    camera_id: str
    camera_mac: str
    plate_text: str
    plate_normalized: str
    confidence: float
    direction: str | None = None
    vehicle_type: str | None = None
    vehicle_color: str | None = None
    vehicle_brand: str | None = None
    vehicle_model: str | None = None
    image_full: bytes | None = None
    image_plate: bytes | None = None

class CameraAdapter(ABC):
    """Abstract base class for camera vendor adapters."""

    def __init__(self, config: dict):
        self.config = config
        self.camera_id = config["id"]

    @abstractmethod
    async def connect(self) -> None:
        """Establish connection to camera/NVR."""
        pass

    @abstractmethod
    async def disconnect(self) -> None:
        """Close connection to camera/NVR."""
        pass

    @abstractmethod
    async def pull_detections(self) -> AsyncIterator[Detection]:
        """Poll camera for new detections. Yields normalized detections."""
        pass

    @abstractmethod
    def parse_push_detection(self, payload: bytes, headers: dict) -> Detection:
        """Parse incoming webhook payload. Returns normalized detection."""
        pass

    async def run(self) -> None:
        """Main loop for pull mode."""
        await self.connect()
        try:
            async for detection in self.pull_detections():
                await self.on_detection(detection)
        finally:
            await self.disconnect()

    async def on_detection(self, detection: Detection) -> None:
        """Callback when detection received. Set by service."""
        pass

When to Use: - Implementing any camera vendor integration - Adding new vendor support

Trade-offs: - Pros: Consistent interface, easy to add vendors, testable with mocks - Cons: Some vendors may not fit the model perfectly

Related Patterns: See Camera Adapters PRP for vendor-specific implementations.


Pattern: Webhook Vendor Routing

Problem: When receiving pushed events, the system must identify the camera vendor to parse the payload correctly.

Solution: Include vendor in the webhook URL path. Route to appropriate adapter.

Implementation:

from fastapi import FastAPI, Request, HTTPException
from typing import Callable

app = FastAPI()

# Adapter factory
ADAPTERS = {
    "hikvision": HikvisionAdapter,
    "unifi": UnifiAdapter,
}

def get_adapter(vendor: str) -> CameraAdapter:
    """Get adapter instance for vendor."""
    if vendor not in ADAPTERS:
        raise HTTPException(400, f"Unknown vendor: {vendor}")
    return ADAPTERS[vendor]({})  # Config not needed for push parsing

# Detection callback - set by service
on_detection: Callable[[Detection], None] = None

@app.post("/api/v1/ingest/{vendor}")
async def receive_webhook(vendor: str, request: Request, camera_id: str = None):
    """Receive camera webhook and route to adapter."""
    adapter = get_adapter(vendor)

    payload = await request.body()
    headers = dict(request.headers)

    try:
        detection = adapter.parse_push_detection(payload, headers)
        if camera_id:
            detection.camera_id = camera_id

        if on_detection:
            await on_detection(detection)

        return {"status": "accepted"}
    except Exception as e:
        # Log error but don't expose details
        logger.warning(f"Failed to parse {vendor} webhook: {e}")
        raise HTTPException(400, "Invalid payload")

@app.post("/api/v1/ingest/auto")
async def receive_webhook_auto(request: Request):
    """Auto-detect vendor from payload/headers."""
    payload = await request.body()
    headers = dict(request.headers)

    # Try to detect vendor
    content_type = headers.get("content-type", "")
    if "xml" in content_type:
        vendor = "hikvision"
    elif "json" in content_type and "x-unifi" in str(headers).lower():
        vendor = "unifi"
    else:
        raise HTTPException(400, "Could not detect vendor. Use /api/v1/ingest/{vendor}")

    return await receive_webhook(vendor, request)

When to Use: - Receiving camera webhooks (push mode) - Central ingest service

Trade-offs: - Pros: Explicit routing, easy debugging, clear logs - Cons: Camera must be configured with correct URL


Pattern: Detection Buffer

Problem: Network outages shouldn't cause detection loss. Edge devices need offline resilience.

Solution: SQLite buffer with configurable retention. Uploader reads from buffer.

Implementation:

import sqlite3
from datetime import datetime, timedelta
from pathlib import Path
import json

class DetectionBuffer:
    """SQLite buffer for detections awaiting upload."""

    def __init__(self, config: dict):
        self.enabled = config.get("enabled", True)
        self.retention_days = config.get("retention_days", 7)
        self.db_path = Path(config.get("path", "/var/lib/alpr/buffer.db"))

        if self.enabled:
            self._init_db()

    def _init_db(self):
        """Initialize database schema."""
        self.db_path.parent.mkdir(parents=True, exist_ok=True)
        with sqlite3.connect(self.db_path) as conn:
            conn.execute("""
                CREATE TABLE IF NOT EXISTS detections (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    detection_id TEXT UNIQUE,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    uploaded_at TIMESTAMP,
                    data JSON NOT NULL
                )
            """)
            conn.execute("CREATE INDEX IF NOT EXISTS idx_uploaded ON detections(uploaded_at)")
            # Enable WAL mode for better concurrency
            conn.execute("PRAGMA journal_mode=WAL")

    def add(self, detection: Detection) -> None:
        """Add detection to buffer."""
        if not self.enabled:
            return

        with sqlite3.connect(self.db_path) as conn:
            conn.execute(
                "INSERT OR IGNORE INTO detections (detection_id, data) VALUES (?, ?)",
                (detection.detection_id, json.dumps(detection.__dict__, default=str))
            )

    def get_pending(self, limit: int = 50) -> list[dict]:
        """Get detections not yet uploaded."""
        if not self.enabled:
            return []

        with sqlite3.connect(self.db_path) as conn:
            conn.row_factory = sqlite3.Row
            rows = conn.execute(
                "SELECT id, data FROM detections WHERE uploaded_at IS NULL ORDER BY id LIMIT ?",
                (limit,)
            ).fetchall()
            return [{"id": r["id"], "data": json.loads(r["data"])} for r in rows]

    def mark_uploaded(self, ids: list[int]) -> None:
        """Mark detections as uploaded."""
        if not self.enabled or not ids:
            return

        with sqlite3.connect(self.db_path) as conn:
            placeholders = ",".join("?" * len(ids))
            conn.execute(
                f"UPDATE detections SET uploaded_at = CURRENT_TIMESTAMP WHERE id IN ({placeholders})",
                ids
            )

    def cleanup(self) -> int:
        """Remove old uploaded detections. Returns count deleted."""
        if not self.enabled:
            return 0

        cutoff = datetime.utcnow() - timedelta(days=self.retention_days)
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.execute(
                "DELETE FROM detections WHERE uploaded_at IS NOT NULL AND uploaded_at < ?",
                (cutoff,)
            )
            return cursor.rowcount

    @property
    def depth(self) -> int:
        """Get count of pending detections."""
        if not self.enabled:
            return 0

        with sqlite3.connect(self.db_path) as conn:
            return conn.execute(
                "SELECT COUNT(*) FROM detections WHERE uploaded_at IS NULL"
            ).fetchone()[0]

When to Use: - Edge mode (always) - Central mode with unreliable connection to main app

Trade-offs: - Pros: Offline resilience, no data loss, bounded storage - Cons: Additional I/O, disk space usage


Pattern: Batch Uploader with Retry

Problem: Detections should be uploaded efficiently with retry on failure.

Solution: Batch upload with exponential backoff retry. Read from buffer.

Implementation:

import asyncio
import httpx
from datetime import datetime

class DetectionUploader:
    """Upload detections to central server."""

    def __init__(self, config: dict, buffer: DetectionBuffer):
        self.url = config["url"]
        self.api_key = config["api_key"]
        self.batch_size = config.get("batch_size", 50)
        self.interval = config.get("interval_seconds", 10)
        self.timeout = config.get("timeout_seconds", 30)
        self.buffer = buffer

        self._backoff = 1  # Current backoff in seconds
        self._max_backoff = 300  # Max 5 minutes

    async def run(self):
        """Main upload loop."""
        while True:
            try:
                uploaded = await self._upload_batch()
                if uploaded > 0:
                    self._backoff = 1  # Reset on success
                    continue  # Try another batch immediately

                # No pending detections, wait for interval
                await asyncio.sleep(self.interval)

            except Exception as e:
                logger.error(f"Upload failed: {e}")
                await asyncio.sleep(self._backoff)
                self._backoff = min(self._backoff * 2, self._max_backoff)

    async def _upload_batch(self) -> int:
        """Upload a batch of detections. Returns count uploaded."""
        pending = self.buffer.get_pending(self.batch_size)
        if not pending:
            return 0

        detections = [p["data"] for p in pending]
        ids = [p["id"] for p in pending]

        async with httpx.AsyncClient(timeout=self.timeout) as client:
            response = await client.post(
                self.url,
                json={"detections": detections},
                headers={"X-API-Key": self.api_key}
            )
            response.raise_for_status()

        self.buffer.mark_uploaded(ids)
        logger.info(f"Uploaded {len(ids)} detections")
        return len(ids)

When to Use: - Edge mode upload to central - Central ingest forward to main app

Trade-offs: - Pros: Efficient batching, retry on failure, backpressure via buffer - Cons: Latency from batching interval


Anti-Patterns

❌ Anti-Pattern: Hardcoded Vendor Detection

Problem: Trying to auto-detect vendor from every webhook leads to fragile, unmaintainable code.

Example of BAD code:

@app.post("/webhook")
async def receive_any_webhook(request: Request):
    payload = await request.body()

    # Fragile vendor detection
    if b"<EventNotificationAlert>" in payload:
        vendor = "hikvision"
    elif b'"type": "motion"' in payload:
        vendor = "unifi"
    elif b"licensePlate" in payload:
        vendor = "unknown_lpr"  # Guess!
    else:
        raise HTTPException(400, "Unknown vendor")

    # Parse based on guess...

Why It's Wrong: - Payload formats change between firmware versions - Multiple vendors may use similar formats - Hard to debug which vendor was detected - No clear contract with camera configuration

Correct Approach:

@app.post("/api/v1/ingest/{vendor}")
async def receive_webhook(vendor: str, request: Request):
    # Vendor is explicit in URL
    adapter = get_adapter(vendor)
    # ...

How to Fix: 1. Use URL path routing for vendor identification 2. Document webhook URL format for each camera 3. Keep auto-detection only as fallback with logging


❌ Anti-Pattern: Unbounded In-Memory Queue

Problem: Buffering detections in memory leads to OOM under load or network outage.

Example of BAD code:

class IngestService:
    def __init__(self):
        self.pending_detections = []  # Unbounded list!

    async def on_detection(self, detection: Detection):
        self.pending_detections.append(detection)

    async def upload_loop(self):
        while True:
            if self.pending_detections:
                batch = self.pending_detections[:50]
                await self.upload(batch)
                self.pending_detections = self.pending_detections[50:]
            await asyncio.sleep(10)

Why It's Wrong: - Network outage: queue grows unbounded - Process restart: all pending detections lost - High-volume camera: memory exhaustion

Correct Approach:

# Use SQLite buffer with retention policy
buffer = DetectionBuffer({"enabled": True, "retention_days": 7})

async def on_detection(detection: Detection):
    buffer.add(detection)  # Persisted to disk

How to Fix: 1. Use SQLite buffer for persistence 2. Set retention policy for bounded storage 3. Monitor buffer depth for alerting


Configuration

Environment Variables

Variable Required Default Description
INGEST_API_KEY Yes - API key for central server authentication
CAM_*_PASSWORD Per camera - Camera credentials (e.g., CAM_FRONT_GATE_PASSWORD)
UNIFI_API_KEY If using Unifi - Unifi Protect API key

Configuration Files

ingest.yaml:

mode: edge  # or "central"

buffer:
  # enabled: true       # Default for edge
  # retention_days: 7   # Default for edge
  path: /var/lib/alpr/buffer.db

upload:
  url: https://central.example.com/api/v1/detections/batch
  api_key: ${INGEST_API_KEY}
  batch_size: 50
  interval_seconds: 10
  timeout_seconds: 30

receiver:
  enabled: true
  host: 0.0.0.0
  port: 8080

logging:
  level: INFO
  format: json

cameras.yaml (edge mode only):

cameras:
  - id: cam_front_gate
    vendor: hikvision
    mode: pull
    host: 192.168.1.100
    port: 80
    username: admin
    password: ${CAM_FRONT_GATE_PASSWORD}
    channel: 1
    poll_interval_seconds: 2
    timezone: America/New_York

  - id: cam_parking
    vendor: hikvision
    mode: push
    # Camera pushes to: http://edge:8080/api/v1/ingest/hikvision?camera_id=cam_parking


Testing Strategies

Unit Testing

Focus: - Config loading and validation - Adapter parsing (with fixture payloads) - Buffer operations - Normalizer functions

Example:

def test_layered_config_applies_mode_defaults():
    config = load_config(Path("tests/fixtures/edge_config"))

    assert config["mode"] == "edge"
    assert config["buffer"]["enabled"] is True
    assert config["buffer"]["retention_days"] == 7

def test_central_mode_disables_buffer():
    config = load_config(Path("tests/fixtures/central_config"))

    assert config["mode"] == "central"
    assert config["buffer"]["enabled"] is False

Integration Testing

Focus: - Adapter connection to mock camera server - Buffer + uploader integration - Webhook receiver end-to-end

Setup:

@pytest.fixture
async def mock_camera_server():
    """Start mock Hikvision camera server."""
    app = create_mock_hikvision_app()
    server = await start_test_server(app)
    yield server
    await server.close()

async def test_hikvision_pull_receives_events(mock_camera_server):
    adapter = HikvisionAdapter({
        "id": "test_cam",
        "host": "localhost",
        "port": mock_camera_server.port,
        "username": "admin",
        "password": "test",
    })

    detections = []
    async for detection in adapter.pull_detections():
        detections.append(detection)
        if len(detections) >= 3:
            break

    assert len(detections) == 3
    assert all(d.plate_normalized for d in detections)

Edge Cases

Test these scenarios: 1. Camera offline during pull (should retry with backoff) 2. Malformed webhook payload (should log and return 400) 3. Central server offline during upload (should buffer and retry) 4. Buffer at retention limit (should cleanup old entries) 5. Config with missing required fields (should fail fast with clear error)


Performance Considerations

Optimization Strategies

Do: - Use async I/O throughout - Batch uploads (50 detections default) - SQLite WAL mode for concurrent access - Connection pooling for HTTP clients

Avoid: - Synchronous I/O in async context - Unbounded in-memory queues - Per-detection uploads - Blocking the detection loop

Benchmarks

Expected Performance: - Detection parsing: < 10ms per detection - Buffer write: < 5ms per detection - Upload batch (50 detections): < 500ms - Max throughput: 100 detections/second sustained

Monitoring: - Buffer depth (alert if > 1000) - Upload latency (alert if p99 > 5s) - Parse errors rate (alert if > 1%)


Internal References

External Resources


Maintenance Notes

Review Frequency: After each phase milestone Last Reviewed: 2025-12-30 Next Review: After Phase 5 implementation

Active Issues: - Ingest service not yet implemented (Phase 5) - Vendor adapters to be ported from design

Maintainer: Architecture Team


Quick Checklist

When implementing this component:

  • [ ] Read this entire PRP
  • [ ] Check global.md for cross-cutting concerns
  • [ ] Review adapter pattern in camera-adapters-prp.md
  • [ ] Implement config loader with layered defaults
  • [ ] Implement at least one vendor adapter
  • [ ] Implement buffer and uploader
  • [ ] Implement webhook receiver
  • [ ] Write tests per testing strategies
  • [ ] Test both edge and central modes
  • [ ] Document deployment steps

Remember: PRPs are living documents. Update as patterns evolve!