Skip to content

Async Alert Engine Design

Problem

Synchronous alert matching during detection ingestion creates a bottleneck:

  • 100 detections/minute × 1000 watchlist entries = 100,000 comparisons/minute
  • Database queries per detection slow ingestion
  • High-traffic bursts create backpressure to edge collectors
  • Alert processing failures block detection storage

Solution: Decoupled Async Processing

Separate detection ingestion (fast path) from alert matching (background processing).

Detection Ingestion (Fast Path)          Alert Processing (Background)
─────────────────────────────            ─────────────────────────────

Batch Upload                             Alert Worker
     │                                        │
     ▼                                        ▼
Store to PostgreSQL ────────────▶ Poll Alert Queue
     │                                        │
     ▼                                        ▼
Store images to MinIO                   Match against Watchlist (in-memory)
     │                                        │
     ▼                                        ▼
Enqueue detection IDs                   Create Alert Record
     │                                        │
     ▼                                        ▼
Return 201 (<100ms)                     Push via WebSocket

Architecture

1. Detection Ingestion Endpoint

Responsibilities: - Accept detection batch from edge collector - Store detection metadata to PostgreSQL - Store images to MinIO - Enqueue detection IDs for alert processing - Return success immediately

Does NOT: Query watchlists, match plates, send notifications

Target latency: <100ms for batch of 50 detections

2. Alert Queue

Simple queue for pending alert checks.

POC Implementation: PostgreSQL table - id, detection_id, status (pending/processing/complete), created_at - Worker polls for status = 'pending' - Index on (status, created_at)

Future Scaling Options: - PostgreSQL LISTEN/NOTIFY for real-time polling - Redis list for higher throughput - Dedicated message queue (RabbitMQ, SQS)

3. Alert Worker

Background process that: 1. Polls alert queue for pending detections 2. Loads detection details from database 3. Checks plate against in-memory watchlist 4. On match: creates alert, notifies subscribers

Scaling: Can run multiple workers for horizontal scaling

4. Watchlist Cache

In-memory set for O(1) plate lookups.

Cache refresh strategy: - Full reload every 60 seconds - Immediate invalidation on watchlist CRUD via pub/sub

Data structure: - Set of normalized plate numbers - Map of plate → watchlist entry details (for alert context)

5. Notification Delivery

On watchlist match: 1. Create alert record in database (audit trail) 2. Look up subscribed users for that watchlist 3. Push to connected WebSocket clients 4. Future: Queue push notification for mobile app

Data Flow

1. Edge collector uploads batch
2. /api/v1/detections/batch
   - Validate API key
   - Store detections to PostgreSQL
   - Store images to MinIO
   - INSERT INTO alert_queue (detection_id, status='pending')
   - Return 201 Accepted
3. Alert Worker (polling every 1s)
   - SELECT detection_id FROM alert_queue WHERE status='pending' LIMIT 100
   - UPDATE status='processing'
4. For each detection:
   - Load detection from database
   - Check: normalized_plate IN watchlist_cache?
   - If match:
     - INSERT INTO alerts (detection_id, watchlist_id, ...)
     - Get subscribers for watchlist_id
     - Push to WebSocket manager
   - UPDATE alert_queue SET status='complete'

Performance Characteristics

Metric Target
Ingestion latency <100ms per batch
Alert latency 1-5 seconds after ingestion
Watchlist lookup O(1) via in-memory set
Worker throughput 1000+ detections/second per worker

Trade-offs

Aspect Trade-off
Alert latency 1-5s delay vs immediate (acceptable for use case)
Complexity Additional worker process to manage
Consistency Brief window where detection exists but alert hasn't fired

Failure Handling

Scenario Behavior
Worker crashes Detections remain in queue, picked up on restart
WebSocket disconnected Alert stored in DB, user sees on reconnect
Watchlist cache stale Max 60s stale, acceptable for use case
Database unavailable Worker retries with backoff

Queue Table Schema

alert_queue
├── id (PK)
├── detection_id (FK → detections.id)
├── status (pending | processing | complete | failed)
├── attempts (int, default 0)
├── last_error (text, nullable)
├── created_at (timestamp)
├── processed_at (timestamp, nullable)

Index: (status, created_at) for efficient polling

Monitoring

Metrics to track: - alert_queue_depth - Pending items in queue (gauge) - alert_processing_latency_ms - Time from enqueue to complete (histogram) - alert_matches_total - Watchlist matches found (counter) - alert_notifications_sent_total - WebSocket pushes (counter) - watchlist_cache_size - Entries in cache (gauge) - watchlist_cache_refresh_ms - Cache reload duration (histogram)

Alerts: - Queue depth > 1000 for > 5 minutes - Processing latency p99 > 10 seconds - Worker not polling for > 30 seconds


Notification Channels

Alert notifications are delivered through multiple channels based on user preferences and system phase.

Channel Overview

Channel Phase Use Case Delivery Cost
Web UI (WebSocket) POC Operators monitoring dashboard Real-time Free
Telegram Bot POC Field volunteers, off-duty alerts Near real-time Free
Push Notifications Mobile App All users with mobile app Real-time Free tier

Notification Flow

Alert Worker detects watchlist match
Create alert record in database
Look up user subscriptions for watchlist
        ├──────────────────────────────────────────────────────┐
        │                                                      │
        ▼                                                      ▼
[Web UI Channel]                                    [External Channels]
        │                                                      │
        ▼                                                      ▼
WebSocket Manager                                   Notification Queue
        │                                                      │
        ▼                                                      ▼
Push to connected clients                          Async delivery worker
                                              ┌────────────────┼────────────────┐
                                              │                │                │
                                              ▼                ▼                ▼
                                         Telegram         Push (Future)    Other (Future)

Web UI Notifications (WebSocket)

Real-time updates for operators actively monitoring the dashboard.

Implementation Details: See WebSocket PRP for FastAPI WebSocket implementation patterns, connection manager code, and client reconnection logic.

Purpose

  • Live detection feed showing detections as they arrive
  • Instant alert popups for watchlist matches
  • Collector status updates (online/offline)

Connection Lifecycle

1. User authenticates via normal login flow
2. Browser establishes WebSocket connection to /ws
3. Server validates session from HTTP-only cookie
4. Connection registered with user ID in connection manager
5. Server pushes messages to relevant connections
6. Client reconnects automatically on disconnect
7. Connection removed from manager on close

Message Types

Type Direction Purpose
detection Server → Client New plate detection (for live feed)
alert Server → Client Watchlist match notification
collector_status Server → Client Collector online/offline change
ping Server → Client Connection health check
pong Client → Server Response to ping
subscribe Client → Server Subscribe to specific feeds
unsubscribe Client → Server Unsubscribe from feeds

Subscription Model

Users can subscribe to different notification feeds:

Feed Description Default
alerts:* All alerts for user's subscribed watchlists On
alerts:{watchlist_id} Alerts for specific watchlist On (if subscribed)
detections:live Real-time detection feed (high volume) Off
collectors:status Collector online/offline changes Admin only

Connection Management

Concern Approach
Authentication Validate JWT from HTTP-only cookie on connect
Authorization Check user role for feed subscriptions
Scaling In-memory manager for POC; Redis pub/sub for multi-instance
Reconnection Client auto-reconnects with exponential backoff
Missed messages Client fetches recent alerts via REST on reconnect
Health check Server pings every 30s; close connection after 3 missed pongs
Max connections Configurable limit per user (default: 5)

Mobile Network Handling

For mobile web users on unstable connections:

  • Aggressive reconnection with short initial backoff (1 second)
  • Visibility API detection: reconnect when tab becomes visible
  • Network change detection: reconnect on online event
  • Fetch missed alerts via REST API after reconnection gap

WebSocket Protocol Specification

Connection Endpoint

wss://server.example.com/ws

Authentication via HTTP-only cookie (JWT session) on initial HTTP upgrade.

Protocol Versioning

Protocol version is included in all server messages to support backward compatibility:

Version Status Notes
1 Current Initial protocol

Versioning Strategy: - Server includes "protocol_version": 1 in the initial connection acknowledgment - Clients should ignore unknown message types for forward compatibility - Breaking changes require new version number and deprecation period - Clients can request specific version via query param: /ws?v=1

Connection State Machine

┌─────────────┐    HTTP Upgrade     ┌─────────────┐
│ Disconnected├────────────────────▶│ Connecting  │
└──────▲──────┘                     └──────┬──────┘
       │                                   │
       │                          Auth Success
       │                                   │
       │                                   ▼
       │         ┌─────────────┐    ┌─────────────┐
       │◀────────│   Error     │◀───│Authenticating│
       │  Auth   └─────────────┘    └──────┬──────┘
       │  Fail                             │
       │                           Auth OK │
       │                                   ▼
       │                            ┌─────────────┐
       │◀───────────────────────────│  Connected  │◀──────┐
       │     Connection Lost        └──────┬──────┘       │
       │                                   │              │
       │                           Ping/Pong OK          │
       │                                   └──────────────┘

Message Format

All messages are JSON objects with a required type field:

{
  "type": "message_type",
  "payload": { ... },
  "timestamp": "2025-01-15T14:32:05.123Z",
  "id": "msg_abc123"
}

Server → Client Messages

1. Detection Message (detection)

Real-time plate detection for live feed display.

{
  "type": "detection",
  "payload": {
    "id": "det_12345",
    "plate_number": "ABC1234",
    "plate_normalized": "ABC1234",
    "confidence": 0.95,
    "captured_at": "2025-01-15T14:32:05Z",
    "collector": {
      "id": "col_abc123",
      "name": "Main St Entrance"
    },
    "camera": {
      "id": "cam_001",
      "name": "Camera #3"
    },
    "direction": "entering",
    "image_urls": {
      "full": "https://minio.example.com/images/2025/01/15/det_12345_full.jpg?token=...",
      "crop": "https://minio.example.com/images/2025/01/15/det_12345_crop.jpg?token=..."
    }
  },
  "timestamp": "2025-01-15T14:32:05.456Z",
  "id": "msg_det_001"
}

2. Alert Notification (alert)

Watchlist match notification.

{
  "type": "alert",
  "payload": {
    "alert_id": "alt_67890",
    "detection_id": "det_12345",  // Reference to the detection that triggered this alert
    "plate_number": "ABC1234",
    "watchlist": {
      "id": "wl_stolen",
      "name": "Stolen Vehicles",
      "priority": "high"
    },
    "entry": {
      "notes": "Reported stolen 2025-01-10",
      "added_by": "Officer Smith",
      "added_at": "2025-01-10T09:00:00Z"
    },
    "detection": {
      "captured_at": "2025-01-15T14:32:05Z",
      "confidence": 0.95,
      "collector_name": "Main St Entrance",
      "camera_name": "Camera #3",
      "image_urls": {
        "full": "https://minio.example.com/images/...",
        "crop": "https://minio.example.com/images/..."
      }
    }
  },
  "timestamp": "2025-01-15T14:32:06.789Z",
  "id": "msg_alt_001"
}

3. Collector Status (collector_status)

Collector online/offline state change (admin only).

{
  "type": "collector_status",
  "payload": {
    "collector_id": "col_abc123",
    "collector_name": "Main St Entrance",
    "status": "offline",
    "previous_status": "online",
    "last_heartbeat_at": "2025-01-15T14:30:00Z",
    "cameras_online": 0,
    "cameras_total": 2
  },
  "timestamp": "2025-01-15T14:32:00.000Z",
  "id": "msg_col_001"
}

4. Ping (ping)

Health check from server (every 30 seconds).

{
  "type": "ping",
  "payload": {
    "server_time": "2025-01-15T14:32:00.000Z"
  },
  "timestamp": "2025-01-15T14:32:00.000Z",
  "id": "msg_ping_001"
}

5. Subscription Confirmation (subscribed)

Response to client subscription request.

{
  "type": "subscribed",
  "payload": {
    "feed": "alerts:wl_stolen",
    "success": true
  },
  "timestamp": "2025-01-15T14:32:00.000Z",
  "id": "msg_sub_001"
}

6. Error (error)

Server-side error notification.

{
  "type": "error",
  "payload": {
    "code": "SUBSCRIPTION_DENIED",
    "message": "You do not have permission to subscribe to this feed",
    "feed": "collectors:status"
  },
  "timestamp": "2025-01-15T14:32:00.000Z",
  "id": "msg_err_001"
}

Client → Server Messages

1. Pong (pong)

Response to server ping.

{
  "type": "pong",
  "payload": {
    "client_time": "2025-01-15T14:32:00.123Z"
  }
}

2. Subscribe (subscribe)

Subscribe to a notification feed.

{
  "type": "subscribe",
  "payload": {
    "feeds": ["alerts:*", "detections:live"]
  }
}

3. Unsubscribe (unsubscribe)

Unsubscribe from a notification feed.

{
  "type": "unsubscribe",
  "payload": {
    "feeds": ["detections:live"]
  }
}

Error Codes

Code Description
AUTH_FAILED Authentication failed or session expired
AUTH_REQUIRED Message sent before authentication complete
SUBSCRIPTION_DENIED User lacks permission for requested feed
INVALID_MESSAGE Malformed JSON or missing required fields
RATE_LIMITED Too many messages from client
INTERNAL_ERROR Server-side error

Reconnection Protocol

  1. On disconnect, wait min(1s * 2^attempt, 60s) + random jitter (0-1s)
  2. Attempt reconnection
  3. On success, send subscribe for previously active feeds
  4. Call GET /api/v1/alerts?since={last_received_timestamp} to fetch missed alerts
  5. Reset attempt counter on successful reconnection

Rate Limits

Direction Limit Window
Client → Server 10 messages 1 second
Server → Client 100 messages 1 second (per connection)

Clients exceeding rate limits receive RATE_LIMITED error and may be disconnected.


Telegram Bot Notifications

Instant alerts delivered to Telegram for field volunteers and off-duty monitoring.

Purpose

  • Alert delivery when not actively monitoring web UI
  • Field-accessible notifications on mobile devices
  • Group chat support for team alerts
  • Image delivery (plate crop) with alert context

Bot Architecture

┌─────────────────────────────────────────────────────────────┐
│                     Central Server                          │
│                                                             │
│  Alert Worker ──▶ Notification Queue ──▶ Telegram Worker   │
│                                                             │
└─────────────────────────────────────────────────────────────┘
                              │ HTTPS (outbound only)
                    ┌─────────────────┐
                    │  Telegram API   │
                    │  api.telegram.org│
                    └─────────────────┘
                    ┌─────────────────┐
                    │  User Devices   │
                    │  (Telegram App) │
                    └─────────────────┘

User Registration Flow

1. User requests Telegram notifications in web UI
2. System generates unique linking code (expires in 10 minutes)
3. User sends linking code to bot via Telegram
4. Bot validates code and associates Telegram chat_id with user
5. User can now receive alerts via Telegram
6. User can unlink at any time via web UI or /unlink command

Bot Commands

Command Description
/start Welcome message with setup instructions
/link {code} Link Telegram account to ALPR user
/unlink Disconnect Telegram notifications
/status Show current subscription status
/mute {duration} Temporarily mute notifications
/unmute Resume notifications
/help List available commands

Alert Message Format

🚨 WATCHLIST ALERT

Plate: ABC-1234
Watchlist: Stolen Vehicles
Time: 2025-01-15 14:32:05
Location: Main St Camera #3
Confidence: 98%

[View Details] (link to web UI)

[Plate Crop Image]

Delivery Guarantees

Concern Approach
Rate limits Respect Telegram API limits (see below)
Failures Retry with exponential backoff (max 3 attempts)
Offline users Telegram handles delivery when user comes online
Blocked bot Mark user's Telegram as inactive; notify via web UI
Group throttling Aggregate multiple alerts within 5 seconds

Telegram API Rate Limits: - 30 messages/second to same chat (single user) - 20 messages/minute to same group - 1 message/second when sending to multiple chats (burst: 30) - 429 Too Many Requests response triggers exponential backoff

Implementation: - Queue messages per chat_id with rate limiter - Aggregate burst alerts within 5-second window - Track 429 responses and apply backoff globally

Security Considerations

Concern Mitigation
Linking code guessing 8-character alphanumeric, 10-minute expiry, rate limited
Unauthorized access Only linked users receive alerts; chat_id validated
Sensitive data Plate numbers visible; no PII in messages
Bot token security Stored as secret; never logged or exposed

Database Schema

telegram_links
├── id (PK)
├── user_id (FK → users.id)
├── chat_id (Telegram chat ID, unique)
├── linked_at (timestamp)
├── is_active (boolean, default true)
├── muted_until (timestamp, nullable)
└── last_delivery_at (timestamp, nullable)

telegram_link_codes
├── id (PK)
├── user_id (FK → users.id)
├── code (unique, indexed)
├── created_at (timestamp)
├── expires_at (timestamp)
└── used_at (timestamp, nullable)

Push Notifications (Mobile App Phase)

Native push notifications for the future mobile application.

Phase

This channel will be implemented during the mobile app development phase (post-POC).

Planned Architecture

┌─────────────────────────────────────────────────────────────┐
│                     Central Server                          │
│                                                             │
│  Alert Worker ──▶ Notification Queue ──▶ Push Worker       │
│                                                             │
└─────────────────────────────────────────────────────────────┘
                 ┌────────────┴────────────┐
                 │                         │
                 ▼                         ▼
        ┌─────────────┐           ┌─────────────┐
        │    FCM      │           │    APNs     │
        │  (Android)  │           │   (iOS)     │
        └─────────────┘           └─────────────┘
                 │                         │
                 └────────────┬────────────┘
                    ┌─────────────────┐
                    │  Mobile Devices │
                    └─────────────────┘

Platform Support

Platform Service Notes
Android Firebase Cloud Messaging (FCM) Free tier sufficient
iOS Apple Push Notification service (APNs) Requires Apple Developer account

Key Considerations for Implementation

Concern Approach
Token management Store device tokens; handle refresh and invalidation
Platform abstraction Unified push service supporting both FCM and APNs
Silent push Background sync for non-urgent updates
Rich notifications Include plate crop image in notification
Action buttons "View", "Dismiss", "Mark as Seen"
Delivery tracking Track delivery and open rates for reliability
Battery impact Batch low-priority notifications; respect OS limits

Database Schema (Planned)

push_tokens
├── id (PK)
├── user_id (FK → users.id)
├── device_id (unique device identifier)
├── platform (ios | android)
├── token (FCM/APNs token)
├── created_at (timestamp)
├── last_used_at (timestamp)
└── is_active (boolean)

User Notification Preferences

Users configure which channels receive which alert types.

Preference Model

notification_preferences
├── id (PK)
├── user_id (FK → users.id)
├── watchlist_id (FK → watchlists.id, nullable for defaults)
├── channel (web | telegram | push)
├── enabled (boolean)
├── quiet_hours_start (time, nullable)
├── quiet_hours_end (time, nullable)
└── updated_at (timestamp)

Default Behavior

Channel Default State Quiet Hours
Web UI Enabled for all subscribed watchlists No quiet hours
Telegram Disabled until linked Respects user setting
Push Enabled when app installed Respects OS settings

Quiet Hours

  • Per-channel quiet hours configuration
  • Alerts still logged and visible in web UI
  • Option to override for critical watchlists
  • Timezone-aware scheduling

Notification Queue

External channel notifications (Telegram, Push) are processed asynchronously.

Queue Schema

notification_queue
├── id (PK)
├── alert_id (FK → alerts.id)
├── user_id (FK → users.id)
├── channel (telegram | push)
├── status (pending | processing | sent | failed)
├── attempts (int, default 0)
├── last_error (text, nullable)
├── created_at (timestamp)
├── sent_at (timestamp, nullable)
└── next_retry_at (timestamp, nullable)

Processing

  • Dedicated worker per channel (or shared with channel routing)
  • Batch processing for efficiency
  • Exponential backoff on failures (1s, 5s, 30s)
  • Max 3 retry attempts before marking failed
  • Failed notifications logged for manual review

Monitoring

Additional metrics for notification channels:

Metric Description
notifications_sent_total{channel} Successful deliveries by channel
notifications_failed_total{channel} Failed deliveries by channel
notification_latency_ms{channel} Time from alert to delivery
telegram_links_active Currently linked Telegram users
websocket_connections_active Current WebSocket connections

  • PRP: WebSocket PRP - WebSocket implementation patterns
  • Architecture: Data Retention - Alert history retention (7 years) and legal hold policies

Decision Date: 2025-12-29 Status: Approved Rationale: Addresses alert engine scalability concern identified in architecture review