Camera Adapters - Project Reference Pattern¶
Component: Edge Collector Camera Adapters Status: 🟢 Stable Created: 2025-12-29 Last Updated: 2025-12-29
Overview¶
Purpose¶
This PRP defines implementation patterns for camera vendor adapters on edge collectors. Adapters handle vendor-specific protocols, normalize detections to a common format, and manage camera connections.
Scope¶
Responsibilities: - Abstract adapter interface definition - Vendor-specific implementations (Hikvision, Unifi) - Detection normalization (plates, timestamps, images) - Connection management and reconnection
Out of Scope: - Detection buffering (see edge architecture) - Central server upload (see detection-batching) - Image storage (see image-storage-prp)
Dependencies¶
Requires: - httpx (async HTTP client) - Camera vendor access (LAN)
Used By: - Edge collector main loop - Detection buffer
Patterns¶
Pattern: Abstract Adapter Interface¶
Problem: Multiple camera vendors with different protocols need unified handling.
Solution: Define abstract base class that all vendor adapters must implement.
Implementation:
# edge/src/adapters/base.py
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
from typing import Callable, Awaitable
from enum import Enum
class CameraStatus(Enum):
"""Camera connection status."""
ONLINE = "online"
OFFLINE = "offline"
ERROR = "error"
CONNECTING = "connecting"
@dataclass
class CameraInfo:
"""Camera identification and metadata."""
id: str # Unique ID within this collector
mac: str # MAC address (normalized, lowercase, no separators)
name: str | None = None # Human-readable name
model: str | None = None # Camera model
firmware: str | None = None
@dataclass
class PlateDetection:
"""Normalized plate detection from camera."""
text: str # Raw plate text from camera
confidence: float # 0.0 to 1.0
region: str | None = None # Plate region/country code
@dataclass
class VehicleInfo:
"""Optional vehicle metadata."""
direction: str | None = None # "entering", "exiting", "unknown"
vehicle_type: str | None = None # "car", "truck", "motorcycle", "bus"
color: str | None = None
brand: str | None = None # Vehicle make (e.g., "Toyota", "Ford")
model: str | None = None # Vehicle model (e.g., "Camry", "F-150")
@dataclass
class ImageData:
"""Image captured during detection."""
full_scene: bytes | None = None # Full camera frame (JPEG)
plate_crop: bytes | None = None # Cropped plate region (JPEG)
@dataclass
class Detection:
"""Normalized detection from any camera vendor."""
id: str # Unique detection ID (from camera or generated)
timestamp: datetime # When detection occurred (camera time)
camera: CameraInfo
plate: PlateDetection
vehicle: VehicleInfo | None = None
images: ImageData | None = None
vendor: str = "" # "hikvision", "unifi", etc.
raw_data: dict | None = None # Original vendor payload (for debugging)
class CameraAdapter(ABC):
"""
Abstract base class for camera vendor adapters.
Each adapter implementation must:
1. Connect to camera(s) using vendor-specific protocol
2. Subscribe to detections
3. Normalize to Detection format
4. Handle reconnection on failure
"""
def __init__(self, config: dict):
self.config = config
self._status: CameraStatus = CameraStatus.OFFLINE
self._cameras: dict[str, CameraInfo] = {}
@property
@abstractmethod
def vendor_name(self) -> str:
"""Return vendor identifier (e.g., 'hikvision', 'unifi')."""
pass
@abstractmethod
async def connect(self) -> bool:
"""
Establish connection to camera(s).
Returns:
True if connection successful, False otherwise.
"""
pass
@abstractmethod
async def disconnect(self) -> None:
"""Cleanly disconnect from camera(s)."""
pass
@abstractmethod
async def subscribe_detections(
self,
callback: Callable[[Detection], Awaitable[None]]
) -> None:
"""
Subscribe to detections.
Args:
callback: Async function called for each detection.
"""
pass
@abstractmethod
async def get_cameras(self) -> list[CameraInfo]:
"""Get list of cameras managed by this adapter."""
pass
@property
def status(self) -> CameraStatus:
"""Current connection status."""
return self._status
@property
def cameras(self) -> dict[str, CameraInfo]:
"""Dictionary of cameras by ID."""
return self._cameras
async def health_check(self) -> bool:
"""Check if connection is healthy."""
return self._status == CameraStatus.ONLINE
Pattern: Detection Normalization¶
Problem: Each vendor returns different data formats; need consistent format for processing.
Solution: Normalize all vendor-specific values to standard formats.
MAC Address Normalization:
def normalize_mac(mac: str) -> str:
"""
Normalize MAC address to lowercase hex without separators.
Examples:
"00:1A:2B:3C:4D:5E" -> "001a2b3c4d5e"
"00-1A-2B-3C-4D-5E" -> "001a2b3c4d5e"
"""
return mac.lower().replace(":", "").replace("-", "")
Timestamp Normalization:
from datetime import datetime, timezone
from zoneinfo import ZoneInfo
def normalize_timestamp(camera_time: str, camera_tz: str = "UTC") -> datetime:
"""
Convert camera timestamp to UTC datetime.
Args:
camera_time: Timestamp string from camera
camera_tz: Camera's configured timezone
Returns:
UTC datetime object
"""
# Parse vendor-specific format (example: Hikvision)
local_dt = datetime.strptime(camera_time, "%Y-%m-%dT%H:%M:%S")
# Apply camera timezone
tz = ZoneInfo(camera_tz)
local_dt = local_dt.replace(tzinfo=tz)
# Convert to UTC
return local_dt.astimezone(timezone.utc)
Direction Normalization:
DIRECTION_MAP = {
# Vendor values -> normalized
"in": "entering",
"entry": "entering",
"entering": "entering",
"approach": "entering",
"out": "exiting",
"exit": "exiting",
"exiting": "exiting",
"leaving": "exiting",
}
def normalize_direction(vendor_direction: str | None) -> str:
"""Normalize direction to standard values."""
if not vendor_direction:
return "unknown"
return DIRECTION_MAP.get(vendor_direction.lower(), "unknown")
Vehicle Type Normalization:
VEHICLE_TYPE_MAP = {
"car": "car",
"sedan": "car",
"suv": "car",
"automobile": "car",
"truck": "truck",
"lorry": "truck",
"pickup": "truck",
"motorcycle": "motorcycle",
"motorbike": "motorcycle",
"bike": "motorcycle",
"bus": "bus",
"coach": "bus",
"van": "van",
"minivan": "van",
}
def normalize_vehicle_type(vendor_type: str | None) -> str | None:
"""Normalize vehicle type or return None if unknown."""
if not vendor_type:
return None
return VEHICLE_TYPE_MAP.get(vendor_type.lower())
Pattern: Hikvision ISAPI Adapter¶
Problem: Integrate with Hikvision cameras using their proprietary ISAPI protocol.
Solution: HTTP client with Digest Auth, XML parsing, long-poll event stream.
Implementation:
# edge/src/adapters/hikvision.py
import httpx
import uuid
from xml.etree import ElementTree
from .base import CameraAdapter, Detection, CameraInfo, PlateDetection, VehicleInfo, ImageData, CameraStatus
class HikvisionAdapter(CameraAdapter):
"""Hikvision ISAPI camera adapter."""
@property
def vendor_name(self) -> str:
return "hikvision"
async def connect(self) -> bool:
"""Connect to Hikvision camera."""
self._client = httpx.AsyncClient(
base_url=f"http://{self.config['host']}:{self.config.get('port', 80)}",
auth=httpx.DigestAuth(
self.config['username'],
self.config['password']
),
timeout=30.0
)
try:
resp = await self._client.get("/ISAPI/System/deviceInfo")
if resp.status_code == 200:
self._parse_device_info(resp.content)
self._status = CameraStatus.ONLINE
return True
except Exception as e:
self._status = CameraStatus.ERROR
return False
return False
async def disconnect(self) -> None:
"""Disconnect from camera."""
if self._client:
await self._client.aclose()
self._status = CameraStatus.OFFLINE
async def subscribe_detections(self, callback):
"""Subscribe to ISAPI traffic detections via long-poll."""
url = "/ISAPI/Event/notification/alertStream"
async with self._client.stream("GET", url) as response:
buffer = b""
async for chunk in response.aiter_bytes():
buffer += chunk
# ISAPI uses multipart boundaries
if b"--boundary" in buffer:
detections = self._split_multipart(buffer)
for detection_data in detections:
detection = self._parse_alert(detection_data)
if detection:
await callback(detection)
buffer = b""
def _parse_alert(self, data: bytes) -> Detection | None:
"""Parse ISAPI alert XML to Detection."""
try:
root = ElementTree.fromstring(data)
plate_elem = root.find(".//plateNumber")
if plate_elem is None:
return None
plate_text = plate_elem.text
confidence = float(root.findtext(".//confidence", "0.8"))
# Extract images (base64 encoded in XML)
full_image = self._decode_image(root.find(".//fullImage"))
crop_image = self._decode_image(root.find(".//plateImage"))
# Extract vehicle info
direction = root.findtext(".//direction", "unknown")
vehicle_type = root.findtext(".//vehicleType")
vehicle_color = root.findtext(".//vehicleColor")
vehicle_brand = root.findtext(".//vehicleBrand")
vehicle_model = root.findtext(".//vehicleModel")
return Detection(
id=root.findtext(".//eventId", str(uuid.uuid4())),
timestamp=normalize_timestamp(
root.findtext(".//dateTime"),
self.config.get("timezone", "UTC")
),
camera=self._cameras.get(self.config['cameras'][0]['id']),
plate=PlateDetection(
text=plate_text,
confidence=confidence
),
vehicle=VehicleInfo(
direction=normalize_direction(direction),
vehicle_type=normalize_vehicle_type(vehicle_type),
color=vehicle_color,
brand=vehicle_brand,
model=vehicle_model
),
images=ImageData(
full_scene=full_image,
plate_crop=crop_image
),
vendor="hikvision"
)
except Exception as e:
logger.error(f"Failed to parse Hikvision detection: {e}")
return None
def _decode_image(self, elem) -> bytes | None:
"""Decode base64 image from XML element."""
if elem is None or not elem.text:
return None
import base64
return base64.b64decode(elem.text)
async def get_cameras(self) -> list[CameraInfo]:
return list(self._cameras.values())
Hikvision Quirks: - Uses HTTP Digest authentication (not Basic) - Event stream uses multipart/x-mixed-replace - Timestamps in local camera time (configure timezone) - Some models send duplicate events within 1 second - Image data is base64 encoded in XML
Pattern: Unifi Protect Adapter¶
Problem: Integrate with Unifi Protect cameras via NVR API.
Solution: REST API for bootstrap, WebSocket for real-time detections.
Implementation:
# edge/src/adapters/unifi.py
import httpx
import websockets
import json
from .base import CameraAdapter, Detection, CameraStatus
class UnifiAdapter(CameraAdapter):
"""Unifi Protect camera adapter."""
@property
def vendor_name(self) -> str:
return "unifi"
async def connect(self) -> bool:
"""Connect to Unifi Protect console."""
self._client = httpx.AsyncClient(
base_url=self.config['console_url'],
headers={"Authorization": f"Bearer {self.config['api_key']}"},
verify=False, # Self-signed cert on console
timeout=30.0
)
try:
resp = await self._client.get("/proxy/protect/api/bootstrap")
if resp.status_code == 200:
self._parse_bootstrap(resp.json())
self._status = CameraStatus.ONLINE
return True
except Exception:
self._status = CameraStatus.ERROR
return False
return False
async def subscribe_detections(self, callback):
"""Subscribe to Protect WebSocket detections."""
ws_url = self.config['console_url'].replace("https://", "wss://")
ws_url = f"{ws_url}/proxy/protect/ws/updates"
headers = {"Authorization": f"Bearer {self.config['api_key']}"}
async with websockets.connect(ws_url, extra_headers=headers, ssl=False) as ws:
async for message in ws:
data = json.loads(message)
if data.get("modelKey") == "event" and data.get("action") == "add":
detection = self._parse_detection(data["payload"])
if detection and detection.plate:
await callback(detection)
def _parse_detection(self, payload: dict) -> Detection | None:
"""Parse Protect payload to Detection."""
# Unifi detection payloads vary by camera model
# Extract plate info if present
if "licensePlate" not in payload:
return None
return Detection(
id=payload["id"],
timestamp=datetime.fromtimestamp(
payload["start"] / 1000, tz=timezone.utc
),
camera=self._cameras.get(payload["camera"]),
plate=PlateDetection(
text=payload["licensePlate"]["value"],
confidence=payload["licensePlate"].get("confidence", 0.8)
),
vendor="unifi"
)
async def get_cameras(self) -> list[CameraInfo]:
return list(self._cameras.values())
Unifi Quirks: - Uses self-signed SSL certificates (verify=False) - Detections come through NVR, not directly from camera - WebSocket connection may drop; need reconnection logic - Images must be fetched separately via API call - Rate limiting on image downloads
Pattern: Reconnection with Backoff¶
Problem: Camera connections drop; need automatic reconnection without flooding.
Solution: Exponential backoff with jitter for reconnection attempts.
Implementation:
import asyncio
import random
import logging
logger = logging.getLogger(__name__)
class CameraAdapter(ABC):
# ... other methods ...
async def run_with_reconnect(
self,
callback,
max_retries: int = -1 # -1 = infinite
):
"""
Run detection subscription with automatic reconnection.
Args:
callback: Event handler function
max_retries: Max reconnection attempts (-1 = infinite)
"""
retries = 0
backoff = 1 # seconds
while max_retries == -1 or retries < max_retries:
try:
if await self.connect():
logger.info(f"{self.vendor_name} connected successfully")
backoff = 1 # Reset backoff on success
retries = 0
await self.subscribe_detections(callback)
except ConnectionError as e:
logger.warning(f"{self.vendor_name} connection lost: {e}")
self._status = CameraStatus.OFFLINE
except Exception as e:
logger.error(f"{self.vendor_name} error: {e}")
self._status = CameraStatus.ERROR
# Calculate backoff with jitter
jitter = random.uniform(0, 1)
wait_time = backoff + jitter
logger.info(f"{self.vendor_name} reconnecting in {wait_time:.1f}s")
await asyncio.sleep(wait_time)
# Exponential backoff, max 60 seconds
backoff = min(backoff * 2, 60)
retries += 1
logger.error(f"{self.vendor_name} max retries exceeded")
Pattern: Adapter Registry¶
Problem: Need to instantiate adapters by type from configuration.
Solution: Registry pattern for adapter lookup.
Implementation:
# edge/src/adapters/__init__.py
from .hikvision import HikvisionAdapter
from .unifi import UnifiAdapter
from .base import CameraAdapter
ADAPTER_REGISTRY: dict[str, type[CameraAdapter]] = {
"hikvision": HikvisionAdapter,
"unifi": UnifiAdapter,
}
def create_adapter(adapter_type: str, config: dict) -> CameraAdapter:
"""
Factory function to create adapter by type.
Args:
adapter_type: Vendor name ("hikvision", "unifi")
config: Adapter-specific configuration
Returns:
Configured adapter instance
Raises:
ValueError: Unknown adapter type
"""
if adapter_type not in ADAPTER_REGISTRY:
raise ValueError(f"Unknown adapter type: {adapter_type}")
adapter_class = ADAPTER_REGISTRY[adapter_type]
return adapter_class(config)
def register_adapter(name: str, adapter_class: type[CameraAdapter]):
"""Register a new adapter type."""
ADAPTER_REGISTRY[name] = adapter_class
Testing Strategies¶
Unit Testing Adapters¶
import pytest
from unittest.mock import AsyncMock, patch, Mock
@pytest.fixture
def hikvision_config():
return {
"host": "192.168.1.100",
"port": 80,
"username": "admin",
"password": "test",
"timezone": "America/New_York",
"cameras": [{"id": "cam-001", "channel": 1}]
}
@pytest.fixture
def mock_hikvision_response():
"""Sample ISAPI response XML."""
return b"""
<EventNotificationAlert>
<eventId>12345</eventId>
<dateTime>2025-01-15T14:32:05</dateTime>
<plateNumber>ABC1234</plateNumber>
<confidence>0.95</confidence>
<direction>in</direction>
</EventNotificationAlert>
"""
async def test_hikvision_connect_success(hikvision_config):
with patch("httpx.AsyncClient") as mock_client:
mock_client.return_value.get = AsyncMock(
return_value=Mock(status_code=200, content=b"<device/>")
)
adapter = HikvisionAdapter(hikvision_config)
result = await adapter.connect()
assert result is True
assert adapter.status == CameraStatus.ONLINE
async def test_hikvision_parse_detection(hikvision_config, mock_hikvision_response):
adapter = HikvisionAdapter(hikvision_config)
detection = adapter._parse_alert(mock_hikvision_response)
assert detection is not None
assert detection.plate.text == "ABC1234"
assert detection.plate.confidence == 0.95
assert detection.vehicle.direction == "entering"
async def test_normalization_functions():
"""Test normalization utilities."""
assert normalize_mac("00:1A:2B:3C:4D:5E") == "001a2b3c4d5e"
assert normalize_mac("00-1a-2b-3c-4d-5e") == "001a2b3c4d5e"
assert normalize_direction("in") == "entering"
assert normalize_direction("EXIT") == "exiting"
assert normalize_direction(None) == "unknown"
assert normalize_vehicle_type("sedan") == "car"
assert normalize_vehicle_type("TRUCK") == "truck"
assert normalize_vehicle_type("spaceship") is None
Adding New Vendors¶
Checklist¶
- [ ] Create adapter class extending
CameraAdapter - [ ] Implement all abstract methods
- [ ] Add normalization for vendor-specific values
- [ ] Handle vendor-specific authentication
- [ ] Implement reconnection logic
- [ ] Add configuration schema
- [ ] Write unit tests with mocked vendor responses
- [ ] Write integration tests (if test camera available)
- [ ] Document vendor quirks and limitations
- [ ] Register adapter in
ADAPTER_REGISTRY
Template¶
# edge/src/adapters/newvendor.py
from .base import CameraAdapter, Detection, CameraInfo, CameraStatus
class NewVendorAdapter(CameraAdapter):
"""
Adapter for [Vendor Name] cameras.
Protocol: [HTTP/WebSocket/RTSP/etc.]
Authentication: [Method]
Documentation: [Link to vendor docs]
"""
@property
def vendor_name(self) -> str:
return "newvendor"
async def connect(self) -> bool:
# 1. Create HTTP/WebSocket client
# 2. Authenticate
# 3. Verify connection
# 4. Discover and populate self._cameras dict
# 5. Set self._status = CameraStatus.ONLINE
raise NotImplementedError
async def disconnect(self) -> None:
# 1. Unsubscribe from events
# 2. Close connections
# 3. Set self._status = CameraStatus.OFFLINE
raise NotImplementedError
async def subscribe_events(self, callback):
# 1. Connect to event stream
# 2. Parse vendor events
# 3. Normalize to Detection
# 4. Call callback for each detection
raise NotImplementedError
async def get_cameras(self) -> list[CameraInfo]:
return list(self._cameras.values())
Related Documentation¶
- Architecture: camera-adapters.md - High-level design
- Detection Batching: detection-batching.md - Upload to central
- PRP: image-storage-prp.md - Image storage patterns for captured plates
- Testing: TEST_CONFIG.md - Test patterns
Maintainer: Development Team Review Cycle: When adding new vendors