Ingest Service - Project Reference Pattern¶
Component: Unified Ingest Service Status: 🟡 In Development Created: 2025-12-30 Last Updated: 2025-12-30
Overview¶
Purpose¶
The ingest service handles camera Detection ingestion for the ALPR platform. It is a single application that runs in two modes:
- Edge mode: Deployed on Raspberry Pi at each site, connects to local cameras
- Central mode: Deployed on central infrastructure as fallback for sites without edge devices
This unified design ensures consistent behavior and shared code between deployment scenarios.
Scope¶
Responsibilities:
- Connect to cameras via vendor adapters (pull and/or push modes)
- Normalize vendor-specific detections to Detection format
- Buffer detections locally (edge mode)
- Upload detections to central server
Out of Scope: - Detection processing and storage (central main app responsibility) - Alert matching and notifications (central main app responsibility) - Camera discovery and provisioning (see edge-provisioning-prp)
Dependencies¶
Requires:
- Camera vendor APIs (Hikvision ISAPI, Unifi Protect)
- Central server API (/api/v1/detections/batch)
- SQLite (edge mode buffer)
Used By: - Central main application (receives uploaded detections) - Site administrators (configure cameras)
Quick Reference¶
When to Use This Component¶
✅ Use when: - Deploying camera ingestion at a new site (edge mode) - Setting up fallback ingestion for edgeless sites (central mode) - Adding support for a new camera vendor
❌ Don't use when: - Processing detections after ingestion (use central app) - Managing user authentication (use central app) - Configuring alerting rules (use central app)
Key Patterns¶
- Layered Config: Mode defaults → ingest.yaml → cameras.yaml (edge only)
- Dual-Mode Operation: Single codebase, behavior controlled by
--modeflag - Adapter Pattern: Vendor-specific adapters with common interface
- Vendor Routing: Webhook URL path determines adapter for push mode
Implementation Task List¶
Setup & Configuration¶
- [ ] Create config loader with mode defaults
- [ ] Implement YAML config parsing with env var expansion
- [ ] Add config validation with Pydantic
- [ ] Support layered config loading
Core Implementation¶
- [ ] Implement abstract
CameraAdapterbase class - [ ] Create Hikvision adapter (pull + push)
- [ ] Create Unifi adapter (pull + push)
- [ ] Implement adapter factory with vendor routing
- [ ] Create webhook receiver (FastAPI)
- [ ] Implement SQLite buffer with retention policy
- [ ] Create HTTPS uploader with retry logic
Error Handling¶
- [ ] Adapter connection failures with exponential backoff
- [ ] Malformed detection parsing (log and continue)
- [ ] Upload failures with local buffering
- [ ] Config validation errors with clear messages
Security¶
- [ ] API key authentication for uploads
- [ ] Webhook authentication (central mode)
- [ ] Credential storage via environment variables
- [ ] TLS for all external connections
Testing¶
- [ ] Unit tests for config loading
- [ ] Unit tests for each adapter
- [ ] Unit tests for normalizer
- [ ] Integration tests with mock cameras
- [ ] Integration tests for buffer/uploader
Documentation¶
- [ ] Update this PRP with implementation details
- [ ] Document camera configuration examples
- [ ] Create deployment runbooks
Patterns¶
Pattern: Layered Configuration¶
Problem: Different deployment modes require different default settings, but operators should be able to override any setting.
Solution: Apply configuration in layers: mode defaults → config file → environment variables.
Implementation:
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
import os
import yaml
MODE_DEFAULTS = {
"edge": {
"buffer": {"enabled": True, "retention_days": 7},
"pull_adapters": {"enabled": True},
},
"central": {
"buffer": {"enabled": False},
"pull_adapters": {"enabled": False},
},
}
def deep_merge(base: dict, override: dict) -> dict:
"""Merge override into base, recursively for nested dicts."""
result = base.copy()
for key, value in override.items():
if key in result and isinstance(result[key], dict) and isinstance(value, dict):
result[key] = deep_merge(result[key], value)
else:
result[key] = value
return result
def expand_env_vars(config: dict) -> dict:
"""Recursively expand ${VAR} references in string values."""
result = {}
for key, value in config.items():
if isinstance(value, dict):
result[key] = expand_env_vars(value)
elif isinstance(value, str) and value.startswith("${") and value.endswith("}"):
env_var = value[2:-1]
result[key] = os.environ.get(env_var, "")
else:
result[key] = value
return result
def load_config(config_path: Path) -> dict:
"""Load configuration with layered defaults."""
# 1. Load base config
with open(config_path / "ingest.yaml") as f:
config = yaml.safe_load(f)
# 2. Apply mode defaults (config values override defaults)
mode = config.get("mode", "edge")
if mode not in MODE_DEFAULTS:
raise ValueError(f"Invalid mode: {mode}. Must be 'edge' or 'central'")
config = deep_merge(MODE_DEFAULTS[mode], config)
# 3. Load cameras.yaml if edge mode
if mode == "edge":
cameras_file = config_path / "cameras.yaml"
if cameras_file.exists():
with open(cameras_file) as f:
config["cameras"] = yaml.safe_load(f).get("cameras", [])
# 4. Expand environment variables
config = expand_env_vars(config)
return config
When to Use: - Service initialization - Config reload on SIGHUP
Trade-offs: - Pros: Flexible, operator-friendly, mode-appropriate defaults - Cons: Slightly complex config loading logic
Pattern: Dual-Mode Service¶
Problem: Edge and central deployments need different behavior but should share code.
Solution:
Single entry point with --mode flag that controls which components are enabled.
Implementation:
import argparse
import asyncio
from pathlib import Path
async def run_edge_mode(config: dict):
"""Run ingest service in edge mode."""
# Start camera adapters (pull mode)
adapters = []
for camera_config in config.get("cameras", []):
adapter = create_adapter(camera_config)
adapters.append(adapter)
# Start webhook receiver (push mode)
receiver = WebhookReceiver(config["receiver"])
# Start buffer and uploader
buffer = DetectionBuffer(config["buffer"])
uploader = DetectionUploader(config["upload"], buffer)
# Run all components
await asyncio.gather(
*[adapter.run() for adapter in adapters],
receiver.run(),
uploader.run(),
)
async def run_central_mode(config: dict):
"""Run ingest service in central mode."""
# No camera adapters - only webhook receiver
receiver = WebhookReceiver(config["receiver"])
# No buffer - upload immediately
uploader = DirectUploader(config["upload"])
# Connect receiver directly to uploader
receiver.on_detection = uploader.upload
await receiver.run()
def main():
parser = argparse.ArgumentParser(description="ALPR Ingest Service")
parser.add_argument("--mode", choices=["edge", "central"], required=True)
parser.add_argument("--config", type=Path, default=Path("/etc/alpr"))
args = parser.parse_args()
config = load_config(args.config)
if args.mode == "edge":
asyncio.run(run_edge_mode(config))
else:
asyncio.run(run_central_mode(config))
if __name__ == "__main__":
main()
When to Use: - Always - this is the service entry point
Trade-offs: - Pros: Single codebase, consistent behavior, shared testing - Cons: Must ensure mode-specific code paths are well tested
Pattern: Vendor Adapter Interface¶
Problem: Different camera vendors have different protocols, but the rest of the system needs a consistent interface.
Solution: Abstract base class with pull and push methods that each vendor implements.
Implementation:
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
from typing import AsyncIterator
@dataclass
class Detection:
"""Normalized detection."""
detection_id: str
timestamp: datetime
camera_id: str
camera_mac: str
plate_text: str
plate_normalized: str
confidence: float
direction: str | None = None
vehicle_type: str | None = None
vehicle_color: str | None = None
vehicle_brand: str | None = None
vehicle_model: str | None = None
image_full: bytes | None = None
image_plate: bytes | None = None
class CameraAdapter(ABC):
"""Abstract base class for camera vendor adapters."""
def __init__(self, config: dict):
self.config = config
self.camera_id = config["id"]
@abstractmethod
async def connect(self) -> None:
"""Establish connection to camera/NVR."""
pass
@abstractmethod
async def disconnect(self) -> None:
"""Close connection to camera/NVR."""
pass
@abstractmethod
async def pull_detections(self) -> AsyncIterator[Detection]:
"""Poll camera for new detections. Yields normalized detections."""
pass
@abstractmethod
def parse_push_detection(self, payload: bytes, headers: dict) -> Detection:
"""Parse incoming webhook payload. Returns normalized detection."""
pass
async def run(self) -> None:
"""Main loop for pull mode."""
await self.connect()
try:
async for detection in self.pull_detections():
await self.on_detection(detection)
finally:
await self.disconnect()
async def on_detection(self, detection: Detection) -> None:
"""Callback when detection received. Set by service."""
pass
When to Use: - Implementing any camera vendor integration - Adding new vendor support
Trade-offs: - Pros: Consistent interface, easy to add vendors, testable with mocks - Cons: Some vendors may not fit the model perfectly
Related Patterns: See Camera Adapters PRP for vendor-specific implementations.
Pattern: Webhook Vendor Routing¶
Problem: When receiving pushed events, the system must identify the camera vendor to parse the payload correctly.
Solution: Include vendor in the webhook URL path. Route to appropriate adapter.
Implementation:
from fastapi import FastAPI, Request, HTTPException
from typing import Callable
app = FastAPI()
# Adapter factory
ADAPTERS = {
"hikvision": HikvisionAdapter,
"unifi": UnifiAdapter,
}
def get_adapter(vendor: str) -> CameraAdapter:
"""Get adapter instance for vendor."""
if vendor not in ADAPTERS:
raise HTTPException(400, f"Unknown vendor: {vendor}")
return ADAPTERS[vendor]({}) # Config not needed for push parsing
# Detection callback - set by service
on_detection: Callable[[Detection], None] = None
@app.post("/api/v1/ingest/{vendor}")
async def receive_webhook(vendor: str, request: Request, camera_id: str = None):
"""Receive camera webhook and route to adapter."""
adapter = get_adapter(vendor)
payload = await request.body()
headers = dict(request.headers)
try:
detection = adapter.parse_push_detection(payload, headers)
if camera_id:
detection.camera_id = camera_id
if on_detection:
await on_detection(detection)
return {"status": "accepted"}
except Exception as e:
# Log error but don't expose details
logger.warning(f"Failed to parse {vendor} webhook: {e}")
raise HTTPException(400, "Invalid payload")
@app.post("/api/v1/ingest/auto")
async def receive_webhook_auto(request: Request):
"""Auto-detect vendor from payload/headers."""
payload = await request.body()
headers = dict(request.headers)
# Try to detect vendor
content_type = headers.get("content-type", "")
if "xml" in content_type:
vendor = "hikvision"
elif "json" in content_type and "x-unifi" in str(headers).lower():
vendor = "unifi"
else:
raise HTTPException(400, "Could not detect vendor. Use /api/v1/ingest/{vendor}")
return await receive_webhook(vendor, request)
When to Use: - Receiving camera webhooks (push mode) - Central ingest service
Trade-offs: - Pros: Explicit routing, easy debugging, clear logs - Cons: Camera must be configured with correct URL
Pattern: Detection Buffer¶
Problem: Network outages shouldn't cause detection loss. Edge devices need offline resilience.
Solution: SQLite buffer with configurable retention. Uploader reads from buffer.
Implementation:
import sqlite3
from datetime import datetime, timedelta
from pathlib import Path
import json
class DetectionBuffer:
"""SQLite buffer for detections awaiting upload."""
def __init__(self, config: dict):
self.enabled = config.get("enabled", True)
self.retention_days = config.get("retention_days", 7)
self.db_path = Path(config.get("path", "/var/lib/alpr/buffer.db"))
if self.enabled:
self._init_db()
def _init_db(self):
"""Initialize database schema."""
self.db_path.parent.mkdir(parents=True, exist_ok=True)
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS detections (
id INTEGER PRIMARY KEY AUTOINCREMENT,
detection_id TEXT UNIQUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
uploaded_at TIMESTAMP,
data JSON NOT NULL
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_uploaded ON detections(uploaded_at)")
# Enable WAL mode for better concurrency
conn.execute("PRAGMA journal_mode=WAL")
def add(self, detection: Detection) -> None:
"""Add detection to buffer."""
if not self.enabled:
return
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"INSERT OR IGNORE INTO detections (detection_id, data) VALUES (?, ?)",
(detection.detection_id, json.dumps(detection.__dict__, default=str))
)
def get_pending(self, limit: int = 50) -> list[dict]:
"""Get detections not yet uploaded."""
if not self.enabled:
return []
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
rows = conn.execute(
"SELECT id, data FROM detections WHERE uploaded_at IS NULL ORDER BY id LIMIT ?",
(limit,)
).fetchall()
return [{"id": r["id"], "data": json.loads(r["data"])} for r in rows]
def mark_uploaded(self, ids: list[int]) -> None:
"""Mark detections as uploaded."""
if not self.enabled or not ids:
return
with sqlite3.connect(self.db_path) as conn:
placeholders = ",".join("?" * len(ids))
conn.execute(
f"UPDATE detections SET uploaded_at = CURRENT_TIMESTAMP WHERE id IN ({placeholders})",
ids
)
def cleanup(self) -> int:
"""Remove old uploaded detections. Returns count deleted."""
if not self.enabled:
return 0
cutoff = datetime.utcnow() - timedelta(days=self.retention_days)
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute(
"DELETE FROM detections WHERE uploaded_at IS NOT NULL AND uploaded_at < ?",
(cutoff,)
)
return cursor.rowcount
@property
def depth(self) -> int:
"""Get count of pending detections."""
if not self.enabled:
return 0
with sqlite3.connect(self.db_path) as conn:
return conn.execute(
"SELECT COUNT(*) FROM detections WHERE uploaded_at IS NULL"
).fetchone()[0]
When to Use: - Edge mode (always) - Central mode with unreliable connection to main app
Trade-offs: - Pros: Offline resilience, no data loss, bounded storage - Cons: Additional I/O, disk space usage
Pattern: Batch Uploader with Retry¶
Problem: Detections should be uploaded efficiently with retry on failure.
Solution: Batch upload with exponential backoff retry. Read from buffer.
Implementation:
import asyncio
import httpx
from datetime import datetime
class DetectionUploader:
"""Upload detections to central server."""
def __init__(self, config: dict, buffer: DetectionBuffer):
self.url = config["url"]
self.api_key = config["api_key"]
self.batch_size = config.get("batch_size", 50)
self.interval = config.get("interval_seconds", 10)
self.timeout = config.get("timeout_seconds", 30)
self.buffer = buffer
self._backoff = 1 # Current backoff in seconds
self._max_backoff = 300 # Max 5 minutes
async def run(self):
"""Main upload loop."""
while True:
try:
uploaded = await self._upload_batch()
if uploaded > 0:
self._backoff = 1 # Reset on success
continue # Try another batch immediately
# No pending detections, wait for interval
await asyncio.sleep(self.interval)
except Exception as e:
logger.error(f"Upload failed: {e}")
await asyncio.sleep(self._backoff)
self._backoff = min(self._backoff * 2, self._max_backoff)
async def _upload_batch(self) -> int:
"""Upload a batch of detections. Returns count uploaded."""
pending = self.buffer.get_pending(self.batch_size)
if not pending:
return 0
detections = [p["data"] for p in pending]
ids = [p["id"] for p in pending]
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.post(
self.url,
json={"detections": detections},
headers={"X-API-Key": self.api_key}
)
response.raise_for_status()
self.buffer.mark_uploaded(ids)
logger.info(f"Uploaded {len(ids)} detections")
return len(ids)
When to Use: - Edge mode upload to central - Central ingest forward to main app
Trade-offs: - Pros: Efficient batching, retry on failure, backpressure via buffer - Cons: Latency from batching interval
Anti-Patterns¶
❌ Anti-Pattern: Hardcoded Vendor Detection¶
Problem: Trying to auto-detect vendor from every webhook leads to fragile, unmaintainable code.
Example of BAD code:
@app.post("/webhook")
async def receive_any_webhook(request: Request):
payload = await request.body()
# Fragile vendor detection
if b"<EventNotificationAlert>" in payload:
vendor = "hikvision"
elif b'"type": "motion"' in payload:
vendor = "unifi"
elif b"licensePlate" in payload:
vendor = "unknown_lpr" # Guess!
else:
raise HTTPException(400, "Unknown vendor")
# Parse based on guess...
Why It's Wrong: - Payload formats change between firmware versions - Multiple vendors may use similar formats - Hard to debug which vendor was detected - No clear contract with camera configuration
Correct Approach:
@app.post("/api/v1/ingest/{vendor}")
async def receive_webhook(vendor: str, request: Request):
# Vendor is explicit in URL
adapter = get_adapter(vendor)
# ...
How to Fix: 1. Use URL path routing for vendor identification 2. Document webhook URL format for each camera 3. Keep auto-detection only as fallback with logging
❌ Anti-Pattern: Unbounded In-Memory Queue¶
Problem: Buffering detections in memory leads to OOM under load or network outage.
Example of BAD code:
class IngestService:
def __init__(self):
self.pending_detections = [] # Unbounded list!
async def on_detection(self, detection: Detection):
self.pending_detections.append(detection)
async def upload_loop(self):
while True:
if self.pending_detections:
batch = self.pending_detections[:50]
await self.upload(batch)
self.pending_detections = self.pending_detections[50:]
await asyncio.sleep(10)
Why It's Wrong: - Network outage: queue grows unbounded - Process restart: all pending detections lost - High-volume camera: memory exhaustion
Correct Approach:
# Use SQLite buffer with retention policy
buffer = DetectionBuffer({"enabled": True, "retention_days": 7})
async def on_detection(detection: Detection):
buffer.add(detection) # Persisted to disk
How to Fix: 1. Use SQLite buffer for persistence 2. Set retention policy for bounded storage 3. Monitor buffer depth for alerting
Configuration¶
Environment Variables¶
| Variable | Required | Default | Description |
|---|---|---|---|
INGEST_API_KEY |
Yes | - | API key for central server authentication |
CAM_*_PASSWORD |
Per camera | - | Camera credentials (e.g., CAM_FRONT_GATE_PASSWORD) |
UNIFI_API_KEY |
If using Unifi | - | Unifi Protect API key |
Configuration Files¶
ingest.yaml:
mode: edge # or "central"
buffer:
# enabled: true # Default for edge
# retention_days: 7 # Default for edge
path: /var/lib/alpr/buffer.db
upload:
url: https://central.example.com/api/v1/detections/batch
api_key: ${INGEST_API_KEY}
batch_size: 50
interval_seconds: 10
timeout_seconds: 30
receiver:
enabled: true
host: 0.0.0.0
port: 8080
logging:
level: INFO
format: json
cameras.yaml (edge mode only):
cameras:
- id: cam_front_gate
vendor: hikvision
mode: pull
host: 192.168.1.100
port: 80
username: admin
password: ${CAM_FRONT_GATE_PASSWORD}
channel: 1
poll_interval_seconds: 2
timezone: America/New_York
- id: cam_parking
vendor: hikvision
mode: push
# Camera pushes to: http://edge:8080/api/v1/ingest/hikvision?camera_id=cam_parking
Testing Strategies¶
Unit Testing¶
Focus: - Config loading and validation - Adapter parsing (with fixture payloads) - Buffer operations - Normalizer functions
Example:
def test_layered_config_applies_mode_defaults():
config = load_config(Path("tests/fixtures/edge_config"))
assert config["mode"] == "edge"
assert config["buffer"]["enabled"] is True
assert config["buffer"]["retention_days"] == 7
def test_central_mode_disables_buffer():
config = load_config(Path("tests/fixtures/central_config"))
assert config["mode"] == "central"
assert config["buffer"]["enabled"] is False
Integration Testing¶
Focus: - Adapter connection to mock camera server - Buffer + uploader integration - Webhook receiver end-to-end
Setup:
@pytest.fixture
async def mock_camera_server():
"""Start mock Hikvision camera server."""
app = create_mock_hikvision_app()
server = await start_test_server(app)
yield server
await server.close()
async def test_hikvision_pull_receives_events(mock_camera_server):
adapter = HikvisionAdapter({
"id": "test_cam",
"host": "localhost",
"port": mock_camera_server.port,
"username": "admin",
"password": "test",
})
detections = []
async for detection in adapter.pull_detections():
detections.append(detection)
if len(detections) >= 3:
break
assert len(detections) == 3
assert all(d.plate_normalized for d in detections)
Edge Cases¶
Test these scenarios: 1. Camera offline during pull (should retry with backoff) 2. Malformed webhook payload (should log and return 400) 3. Central server offline during upload (should buffer and retry) 4. Buffer at retention limit (should cleanup old entries) 5. Config with missing required fields (should fail fast with clear error)
Performance Considerations¶
Optimization Strategies¶
Do: - Use async I/O throughout - Batch uploads (50 detections default) - SQLite WAL mode for concurrent access - Connection pooling for HTTP clients
Avoid: - Synchronous I/O in async context - Unbounded in-memory queues - Per-detection uploads - Blocking the detection loop
Benchmarks¶
Expected Performance: - Detection parsing: < 10ms per detection - Buffer write: < 5ms per detection - Upload batch (50 detections): < 500ms - Max throughput: 100 detections/second sustained
Monitoring: - Buffer depth (alert if > 1000) - Upload latency (alert if p99 > 5s) - Parse errors rate (alert if > 1%)
Related Documentation¶
Internal References¶
- Ingest Service Architecture - WHAT and WHY
- Camera Adapters Architecture - Adapter design
- Camera Adapters PRP - Vendor-specific implementation
- Global PRP - Cross-cutting patterns
External Resources¶
- Hikvision ISAPI Documentation (partner account required)
- Unifi Protect API
- FastAPI Documentation
- httpx Documentation
Maintenance Notes¶
Review Frequency: After each phase milestone Last Reviewed: 2025-12-30 Next Review: After Phase 5 implementation
Active Issues: - Ingest service not yet implemented (Phase 5) - Vendor adapters to be ported from design
Maintainer: Architecture Team
Quick Checklist¶
When implementing this component:
- [ ] Read this entire PRP
- [ ] Check
global.mdfor cross-cutting concerns - [ ] Review adapter pattern in camera-adapters-prp.md
- [ ] Implement config loader with layered defaults
- [ ] Implement at least one vendor adapter
- [ ] Implement buffer and uploader
- [ ] Implement webhook receiver
- [ ] Write tests per testing strategies
- [ ] Test both edge and central modes
- [ ] Document deployment steps
Remember: PRPs are living documents. Update as patterns evolve!