Skip to content

Camera Adapters - Project Reference Pattern

Component: Edge Collector Camera Adapters Status: 🟢 Stable Created: 2025-12-29 Last Updated: 2025-12-29


Overview

Purpose

This PRP defines implementation patterns for camera vendor adapters on edge collectors. Adapters handle vendor-specific protocols, normalize detections to a common format, and manage camera connections.

Scope

Responsibilities: - Abstract adapter interface definition - Vendor-specific implementations (Hikvision, Unifi) - Detection normalization (plates, timestamps, images) - Connection management and reconnection

Out of Scope: - Detection buffering (see edge architecture) - Central server upload (see detection-batching) - Image storage (see image-storage-prp)

Dependencies

Requires: - httpx (async HTTP client) - Camera vendor access (LAN)

Used By: - Edge collector main loop - Detection buffer


Patterns

Pattern: Abstract Adapter Interface

Problem: Multiple camera vendors with different protocols need unified handling.

Solution: Define abstract base class that all vendor adapters must implement.

Implementation:

# edge/src/adapters/base.py
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
from typing import Callable, Awaitable
from enum import Enum


class CameraStatus(Enum):
    """Camera connection status."""
    ONLINE = "online"
    OFFLINE = "offline"
    ERROR = "error"
    CONNECTING = "connecting"


@dataclass
class CameraInfo:
    """Camera identification and metadata."""
    id: str                      # Unique ID within this collector
    mac: str                     # MAC address (normalized, lowercase, no separators)
    name: str | None = None      # Human-readable name
    model: str | None = None     # Camera model
    firmware: str | None = None


@dataclass
class PlateDetection:
    """Normalized plate detection from camera."""
    text: str                    # Raw plate text from camera
    confidence: float            # 0.0 to 1.0
    region: str | None = None    # Plate region/country code


@dataclass
class VehicleInfo:
    """Optional vehicle metadata."""
    direction: str | None = None      # "entering", "exiting", "unknown"
    vehicle_type: str | None = None   # "car", "truck", "motorcycle", "bus"
    color: str | None = None
    brand: str | None = None          # Vehicle make (e.g., "Toyota", "Ford")
    model: str | None = None          # Vehicle model (e.g., "Camry", "F-150")


@dataclass
class ImageData:
    """Image captured during detection."""
    full_scene: bytes | None = None   # Full camera frame (JPEG)
    plate_crop: bytes | None = None   # Cropped plate region (JPEG)


@dataclass
class Detection:
    """Normalized detection from any camera vendor."""
    id: str                     # Unique detection ID (from camera or generated)
    timestamp: datetime         # When detection occurred (camera time)
    camera: CameraInfo
    plate: PlateDetection
    vehicle: VehicleInfo | None = None
    images: ImageData | None = None
    vendor: str = ""            # "hikvision", "unifi", etc.
    raw_data: dict | None = None  # Original vendor payload (for debugging)


class CameraAdapter(ABC):
    """
    Abstract base class for camera vendor adapters.

    Each adapter implementation must:
    1. Connect to camera(s) using vendor-specific protocol
    2. Subscribe to detections
    3. Normalize to Detection format
    4. Handle reconnection on failure
    """

    def __init__(self, config: dict):
        self.config = config
        self._status: CameraStatus = CameraStatus.OFFLINE
        self._cameras: dict[str, CameraInfo] = {}

    @property
    @abstractmethod
    def vendor_name(self) -> str:
        """Return vendor identifier (e.g., 'hikvision', 'unifi')."""
        pass

    @abstractmethod
    async def connect(self) -> bool:
        """
        Establish connection to camera(s).

        Returns:
            True if connection successful, False otherwise.
        """
        pass

    @abstractmethod
    async def disconnect(self) -> None:
        """Cleanly disconnect from camera(s)."""
        pass

    @abstractmethod
    async def subscribe_detections(
        self,
        callback: Callable[[Detection], Awaitable[None]]
    ) -> None:
        """
        Subscribe to detections.

        Args:
            callback: Async function called for each detection.
        """
        pass

    @abstractmethod
    async def get_cameras(self) -> list[CameraInfo]:
        """Get list of cameras managed by this adapter."""
        pass

    @property
    def status(self) -> CameraStatus:
        """Current connection status."""
        return self._status

    @property
    def cameras(self) -> dict[str, CameraInfo]:
        """Dictionary of cameras by ID."""
        return self._cameras

    async def health_check(self) -> bool:
        """Check if connection is healthy."""
        return self._status == CameraStatus.ONLINE

Pattern: Detection Normalization

Problem: Each vendor returns different data formats; need consistent format for processing.

Solution: Normalize all vendor-specific values to standard formats.

MAC Address Normalization:

def normalize_mac(mac: str) -> str:
    """
    Normalize MAC address to lowercase hex without separators.

    Examples:
        "00:1A:2B:3C:4D:5E" -> "001a2b3c4d5e"
        "00-1A-2B-3C-4D-5E" -> "001a2b3c4d5e"
    """
    return mac.lower().replace(":", "").replace("-", "")

Timestamp Normalization:

from datetime import datetime, timezone
from zoneinfo import ZoneInfo

def normalize_timestamp(camera_time: str, camera_tz: str = "UTC") -> datetime:
    """
    Convert camera timestamp to UTC datetime.

    Args:
        camera_time: Timestamp string from camera
        camera_tz: Camera's configured timezone

    Returns:
        UTC datetime object
    """
    # Parse vendor-specific format (example: Hikvision)
    local_dt = datetime.strptime(camera_time, "%Y-%m-%dT%H:%M:%S")

    # Apply camera timezone
    tz = ZoneInfo(camera_tz)
    local_dt = local_dt.replace(tzinfo=tz)

    # Convert to UTC
    return local_dt.astimezone(timezone.utc)

Direction Normalization:

DIRECTION_MAP = {
    # Vendor values -> normalized
    "in": "entering",
    "entry": "entering",
    "entering": "entering",
    "approach": "entering",
    "out": "exiting",
    "exit": "exiting",
    "exiting": "exiting",
    "leaving": "exiting",
}

def normalize_direction(vendor_direction: str | None) -> str:
    """Normalize direction to standard values."""
    if not vendor_direction:
        return "unknown"
    return DIRECTION_MAP.get(vendor_direction.lower(), "unknown")

Vehicle Type Normalization:

VEHICLE_TYPE_MAP = {
    "car": "car",
    "sedan": "car",
    "suv": "car",
    "automobile": "car",
    "truck": "truck",
    "lorry": "truck",
    "pickup": "truck",
    "motorcycle": "motorcycle",
    "motorbike": "motorcycle",
    "bike": "motorcycle",
    "bus": "bus",
    "coach": "bus",
    "van": "van",
    "minivan": "van",
}

def normalize_vehicle_type(vendor_type: str | None) -> str | None:
    """Normalize vehicle type or return None if unknown."""
    if not vendor_type:
        return None
    return VEHICLE_TYPE_MAP.get(vendor_type.lower())

Pattern: Hikvision ISAPI Adapter

Problem: Integrate with Hikvision cameras using their proprietary ISAPI protocol.

Solution: HTTP client with Digest Auth, XML parsing, long-poll event stream.

Implementation:

# edge/src/adapters/hikvision.py
import httpx
import uuid
from xml.etree import ElementTree
from .base import CameraAdapter, Detection, CameraInfo, PlateDetection, VehicleInfo, ImageData, CameraStatus

class HikvisionAdapter(CameraAdapter):
    """Hikvision ISAPI camera adapter."""

    @property
    def vendor_name(self) -> str:
        return "hikvision"

    async def connect(self) -> bool:
        """Connect to Hikvision camera."""
        self._client = httpx.AsyncClient(
            base_url=f"http://{self.config['host']}:{self.config.get('port', 80)}",
            auth=httpx.DigestAuth(
                self.config['username'],
                self.config['password']
            ),
            timeout=30.0
        )

        try:
            resp = await self._client.get("/ISAPI/System/deviceInfo")
            if resp.status_code == 200:
                self._parse_device_info(resp.content)
                self._status = CameraStatus.ONLINE
                return True
        except Exception as e:
            self._status = CameraStatus.ERROR
            return False

        return False

    async def disconnect(self) -> None:
        """Disconnect from camera."""
        if self._client:
            await self._client.aclose()
        self._status = CameraStatus.OFFLINE

    async def subscribe_detections(self, callback):
        """Subscribe to ISAPI traffic detections via long-poll."""
        url = "/ISAPI/Event/notification/alertStream"

        async with self._client.stream("GET", url) as response:
            buffer = b""
            async for chunk in response.aiter_bytes():
                buffer += chunk

                # ISAPI uses multipart boundaries
                if b"--boundary" in buffer:
                    detections = self._split_multipart(buffer)
                    for detection_data in detections:
                        detection = self._parse_alert(detection_data)
                        if detection:
                            await callback(detection)
                    buffer = b""

    def _parse_alert(self, data: bytes) -> Detection | None:
        """Parse ISAPI alert XML to Detection."""
        try:
            root = ElementTree.fromstring(data)

            plate_elem = root.find(".//plateNumber")
            if plate_elem is None:
                return None

            plate_text = plate_elem.text
            confidence = float(root.findtext(".//confidence", "0.8"))

            # Extract images (base64 encoded in XML)
            full_image = self._decode_image(root.find(".//fullImage"))
            crop_image = self._decode_image(root.find(".//plateImage"))

            # Extract vehicle info
            direction = root.findtext(".//direction", "unknown")
            vehicle_type = root.findtext(".//vehicleType")
            vehicle_color = root.findtext(".//vehicleColor")
            vehicle_brand = root.findtext(".//vehicleBrand")
            vehicle_model = root.findtext(".//vehicleModel")

            return Detection(
                id=root.findtext(".//eventId", str(uuid.uuid4())),
                timestamp=normalize_timestamp(
                    root.findtext(".//dateTime"),
                    self.config.get("timezone", "UTC")
                ),
                camera=self._cameras.get(self.config['cameras'][0]['id']),
                plate=PlateDetection(
                    text=plate_text,
                    confidence=confidence
                ),
                vehicle=VehicleInfo(
                    direction=normalize_direction(direction),
                    vehicle_type=normalize_vehicle_type(vehicle_type),
                    color=vehicle_color,
                    brand=vehicle_brand,
                    model=vehicle_model
                ),
                images=ImageData(
                    full_scene=full_image,
                    plate_crop=crop_image
                ),
                vendor="hikvision"
            )
        except Exception as e:
            logger.error(f"Failed to parse Hikvision detection: {e}")
            return None

    def _decode_image(self, elem) -> bytes | None:
        """Decode base64 image from XML element."""
        if elem is None or not elem.text:
            return None
        import base64
        return base64.b64decode(elem.text)

    async def get_cameras(self) -> list[CameraInfo]:
        return list(self._cameras.values())

Hikvision Quirks: - Uses HTTP Digest authentication (not Basic) - Event stream uses multipart/x-mixed-replace - Timestamps in local camera time (configure timezone) - Some models send duplicate events within 1 second - Image data is base64 encoded in XML


Pattern: Unifi Protect Adapter

Problem: Integrate with Unifi Protect cameras via NVR API.

Solution: REST API for bootstrap, WebSocket for real-time detections.

Implementation:

# edge/src/adapters/unifi.py
import httpx
import websockets
import json
from .base import CameraAdapter, Detection, CameraStatus

class UnifiAdapter(CameraAdapter):
    """Unifi Protect camera adapter."""

    @property
    def vendor_name(self) -> str:
        return "unifi"

    async def connect(self) -> bool:
        """Connect to Unifi Protect console."""
        self._client = httpx.AsyncClient(
            base_url=self.config['console_url'],
            headers={"Authorization": f"Bearer {self.config['api_key']}"},
            verify=False,  # Self-signed cert on console
            timeout=30.0
        )

        try:
            resp = await self._client.get("/proxy/protect/api/bootstrap")
            if resp.status_code == 200:
                self._parse_bootstrap(resp.json())
                self._status = CameraStatus.ONLINE
                return True
        except Exception:
            self._status = CameraStatus.ERROR
            return False

        return False

    async def subscribe_detections(self, callback):
        """Subscribe to Protect WebSocket detections."""
        ws_url = self.config['console_url'].replace("https://", "wss://")
        ws_url = f"{ws_url}/proxy/protect/ws/updates"

        headers = {"Authorization": f"Bearer {self.config['api_key']}"}

        async with websockets.connect(ws_url, extra_headers=headers, ssl=False) as ws:
            async for message in ws:
                data = json.loads(message)
                if data.get("modelKey") == "event" and data.get("action") == "add":
                    detection = self._parse_detection(data["payload"])
                    if detection and detection.plate:
                        await callback(detection)

    def _parse_detection(self, payload: dict) -> Detection | None:
        """Parse Protect payload to Detection."""
        # Unifi detection payloads vary by camera model
        # Extract plate info if present
        if "licensePlate" not in payload:
            return None

        return Detection(
            id=payload["id"],
            timestamp=datetime.fromtimestamp(
                payload["start"] / 1000, tz=timezone.utc
            ),
            camera=self._cameras.get(payload["camera"]),
            plate=PlateDetection(
                text=payload["licensePlate"]["value"],
                confidence=payload["licensePlate"].get("confidence", 0.8)
            ),
            vendor="unifi"
        )

    async def get_cameras(self) -> list[CameraInfo]:
        return list(self._cameras.values())

Unifi Quirks: - Uses self-signed SSL certificates (verify=False) - Detections come through NVR, not directly from camera - WebSocket connection may drop; need reconnection logic - Images must be fetched separately via API call - Rate limiting on image downloads


Pattern: Reconnection with Backoff

Problem: Camera connections drop; need automatic reconnection without flooding.

Solution: Exponential backoff with jitter for reconnection attempts.

Implementation:

import asyncio
import random
import logging

logger = logging.getLogger(__name__)

class CameraAdapter(ABC):
    # ... other methods ...

    async def run_with_reconnect(
        self,
        callback,
        max_retries: int = -1  # -1 = infinite
    ):
        """
        Run detection subscription with automatic reconnection.

        Args:
            callback: Event handler function
            max_retries: Max reconnection attempts (-1 = infinite)
        """
        retries = 0
        backoff = 1  # seconds

        while max_retries == -1 or retries < max_retries:
            try:
                if await self.connect():
                    logger.info(f"{self.vendor_name} connected successfully")
                    backoff = 1  # Reset backoff on success
                    retries = 0
                    await self.subscribe_detections(callback)

            except ConnectionError as e:
                logger.warning(f"{self.vendor_name} connection lost: {e}")
                self._status = CameraStatus.OFFLINE

            except Exception as e:
                logger.error(f"{self.vendor_name} error: {e}")
                self._status = CameraStatus.ERROR

            # Calculate backoff with jitter
            jitter = random.uniform(0, 1)
            wait_time = backoff + jitter

            logger.info(f"{self.vendor_name} reconnecting in {wait_time:.1f}s")
            await asyncio.sleep(wait_time)

            # Exponential backoff, max 60 seconds
            backoff = min(backoff * 2, 60)
            retries += 1

        logger.error(f"{self.vendor_name} max retries exceeded")

Pattern: Adapter Registry

Problem: Need to instantiate adapters by type from configuration.

Solution: Registry pattern for adapter lookup.

Implementation:

# edge/src/adapters/__init__.py
from .hikvision import HikvisionAdapter
from .unifi import UnifiAdapter
from .base import CameraAdapter

ADAPTER_REGISTRY: dict[str, type[CameraAdapter]] = {
    "hikvision": HikvisionAdapter,
    "unifi": UnifiAdapter,
}

def create_adapter(adapter_type: str, config: dict) -> CameraAdapter:
    """
    Factory function to create adapter by type.

    Args:
        adapter_type: Vendor name ("hikvision", "unifi")
        config: Adapter-specific configuration

    Returns:
        Configured adapter instance

    Raises:
        ValueError: Unknown adapter type
    """
    if adapter_type not in ADAPTER_REGISTRY:
        raise ValueError(f"Unknown adapter type: {adapter_type}")

    adapter_class = ADAPTER_REGISTRY[adapter_type]
    return adapter_class(config)


def register_adapter(name: str, adapter_class: type[CameraAdapter]):
    """Register a new adapter type."""
    ADAPTER_REGISTRY[name] = adapter_class

Testing Strategies

Unit Testing Adapters

import pytest
from unittest.mock import AsyncMock, patch, Mock

@pytest.fixture
def hikvision_config():
    return {
        "host": "192.168.1.100",
        "port": 80,
        "username": "admin",
        "password": "test",
        "timezone": "America/New_York",
        "cameras": [{"id": "cam-001", "channel": 1}]
    }

@pytest.fixture
def mock_hikvision_response():
    """Sample ISAPI response XML."""
    return b"""
    <EventNotificationAlert>
        <eventId>12345</eventId>
        <dateTime>2025-01-15T14:32:05</dateTime>
        <plateNumber>ABC1234</plateNumber>
        <confidence>0.95</confidence>
        <direction>in</direction>
    </EventNotificationAlert>
    """

async def test_hikvision_connect_success(hikvision_config):
    with patch("httpx.AsyncClient") as mock_client:
        mock_client.return_value.get = AsyncMock(
            return_value=Mock(status_code=200, content=b"<device/>")
        )

        adapter = HikvisionAdapter(hikvision_config)
        result = await adapter.connect()

        assert result is True
        assert adapter.status == CameraStatus.ONLINE

async def test_hikvision_parse_detection(hikvision_config, mock_hikvision_response):
    adapter = HikvisionAdapter(hikvision_config)
    detection = adapter._parse_alert(mock_hikvision_response)

    assert detection is not None
    assert detection.plate.text == "ABC1234"
    assert detection.plate.confidence == 0.95
    assert detection.vehicle.direction == "entering"

async def test_normalization_functions():
    """Test normalization utilities."""
    assert normalize_mac("00:1A:2B:3C:4D:5E") == "001a2b3c4d5e"
    assert normalize_mac("00-1a-2b-3c-4d-5e") == "001a2b3c4d5e"

    assert normalize_direction("in") == "entering"
    assert normalize_direction("EXIT") == "exiting"
    assert normalize_direction(None) == "unknown"

    assert normalize_vehicle_type("sedan") == "car"
    assert normalize_vehicle_type("TRUCK") == "truck"
    assert normalize_vehicle_type("spaceship") is None

Adding New Vendors

Checklist

  1. [ ] Create adapter class extending CameraAdapter
  2. [ ] Implement all abstract methods
  3. [ ] Add normalization for vendor-specific values
  4. [ ] Handle vendor-specific authentication
  5. [ ] Implement reconnection logic
  6. [ ] Add configuration schema
  7. [ ] Write unit tests with mocked vendor responses
  8. [ ] Write integration tests (if test camera available)
  9. [ ] Document vendor quirks and limitations
  10. [ ] Register adapter in ADAPTER_REGISTRY

Template

# edge/src/adapters/newvendor.py
from .base import CameraAdapter, Detection, CameraInfo, CameraStatus

class NewVendorAdapter(CameraAdapter):
    """
    Adapter for [Vendor Name] cameras.

    Protocol: [HTTP/WebSocket/RTSP/etc.]
    Authentication: [Method]
    Documentation: [Link to vendor docs]
    """

    @property
    def vendor_name(self) -> str:
        return "newvendor"

    async def connect(self) -> bool:
        # 1. Create HTTP/WebSocket client
        # 2. Authenticate
        # 3. Verify connection
        # 4. Discover and populate self._cameras dict
        # 5. Set self._status = CameraStatus.ONLINE
        raise NotImplementedError

    async def disconnect(self) -> None:
        # 1. Unsubscribe from events
        # 2. Close connections
        # 3. Set self._status = CameraStatus.OFFLINE
        raise NotImplementedError

    async def subscribe_events(self, callback):
        # 1. Connect to event stream
        # 2. Parse vendor events
        # 3. Normalize to Detection
        # 4. Call callback for each detection
        raise NotImplementedError

    async def get_cameras(self) -> list[CameraInfo]:
        return list(self._cameras.values())


Maintainer: Development Team Review Cycle: When adding new vendors