Async Alert Engine Design¶
Problem¶
Synchronous alert matching during detection ingestion creates a bottleneck:
- 100 detections/minute × 1000 watchlist entries = 100,000 comparisons/minute
- Database queries per detection slow ingestion
- High-traffic bursts create backpressure to edge collectors
- Alert processing failures block detection storage
Solution: Decoupled Async Processing¶
Separate detection ingestion (fast path) from alert matching (background processing).
Detection Ingestion (Fast Path) Alert Processing (Background)
───────────────────────────── ─────────────────────────────
Batch Upload Alert Worker
│ │
▼ ▼
Store to PostgreSQL ────────────▶ Poll Alert Queue
│ │
▼ ▼
Store images to MinIO Match against Watchlist (in-memory)
│ │
▼ ▼
Enqueue detection IDs Create Alert Record
│ │
▼ ▼
Return 201 (<100ms) Push via WebSocket
Architecture¶
1. Detection Ingestion Endpoint¶
Responsibilities: - Accept detection batch from edge collector - Store detection metadata to PostgreSQL - Store images to MinIO - Enqueue detection IDs for alert processing - Return success immediately
Does NOT: Query watchlists, match plates, send notifications
Target latency: <100ms for batch of 50 detections
2. Alert Queue¶
Simple queue for pending alert checks.
POC Implementation: PostgreSQL table
- id, detection_id, status (pending/processing/complete), created_at
- Worker polls for status = 'pending'
- Index on (status, created_at)
Future Scaling Options: - PostgreSQL LISTEN/NOTIFY for real-time polling - Redis list for higher throughput - Dedicated message queue (RabbitMQ, SQS)
3. Alert Worker¶
Background process that: 1. Polls alert queue for pending detections 2. Loads detection details from database 3. Checks plate against in-memory watchlist 4. On match: creates alert, notifies subscribers
Scaling: Can run multiple workers for horizontal scaling
4. Watchlist Cache¶
In-memory set for O(1) plate lookups.
Cache refresh strategy: - Full reload every 60 seconds - Immediate invalidation on watchlist CRUD via pub/sub
Data structure: - Set of normalized plate numbers - Map of plate → watchlist entry details (for alert context)
5. Notification Delivery¶
On watchlist match: 1. Create alert record in database (audit trail) 2. Look up subscribed users for that watchlist 3. Push to connected WebSocket clients 4. Future: Queue push notification for mobile app
Data Flow¶
1. Edge collector uploads batch
│
▼
2. /api/v1/detections/batch
- Validate API key
- Store detections to PostgreSQL
- Store images to MinIO
- INSERT INTO alert_queue (detection_id, status='pending')
- Return 201 Accepted
│
▼
3. Alert Worker (polling every 1s)
- SELECT detection_id FROM alert_queue WHERE status='pending' LIMIT 100
- UPDATE status='processing'
│
▼
4. For each detection:
- Load detection from database
- Check: normalized_plate IN watchlist_cache?
- If match:
- INSERT INTO alerts (detection_id, watchlist_id, ...)
- Get subscribers for watchlist_id
- Push to WebSocket manager
- UPDATE alert_queue SET status='complete'
Performance Characteristics¶
| Metric | Target |
|---|---|
| Ingestion latency | <100ms per batch |
| Alert latency | 1-5 seconds after ingestion |
| Watchlist lookup | O(1) via in-memory set |
| Worker throughput | 1000+ detections/second per worker |
Trade-offs¶
| Aspect | Trade-off |
|---|---|
| Alert latency | 1-5s delay vs immediate (acceptable for use case) |
| Complexity | Additional worker process to manage |
| Consistency | Brief window where detection exists but alert hasn't fired |
Failure Handling¶
| Scenario | Behavior |
|---|---|
| Worker crashes | Detections remain in queue, picked up on restart |
| WebSocket disconnected | Alert stored in DB, user sees on reconnect |
| Watchlist cache stale | Max 60s stale, acceptable for use case |
| Database unavailable | Worker retries with backoff |
Queue Table Schema¶
alert_queue
├── id (PK)
├── detection_id (FK → detections.id)
├── status (pending | processing | complete | failed)
├── attempts (int, default 0)
├── last_error (text, nullable)
├── created_at (timestamp)
├── processed_at (timestamp, nullable)
Index: (status, created_at) for efficient polling
Monitoring¶
Metrics to track:
- alert_queue_depth - Pending items in queue (gauge)
- alert_processing_latency_ms - Time from enqueue to complete (histogram)
- alert_matches_total - Watchlist matches found (counter)
- alert_notifications_sent_total - WebSocket pushes (counter)
- watchlist_cache_size - Entries in cache (gauge)
- watchlist_cache_refresh_ms - Cache reload duration (histogram)
Alerts: - Queue depth > 1000 for > 5 minutes - Processing latency p99 > 10 seconds - Worker not polling for > 30 seconds
Notification Channels¶
Alert notifications are delivered through multiple channels based on user preferences and system phase.
Channel Overview¶
| Channel | Phase | Use Case | Delivery | Cost |
|---|---|---|---|---|
| Web UI (WebSocket) | POC | Operators monitoring dashboard | Real-time | Free |
| Telegram Bot | POC | Field volunteers, off-duty alerts | Near real-time | Free |
| Push Notifications | Mobile App | All users with mobile app | Real-time | Free tier |
Notification Flow¶
Alert Worker detects watchlist match
│
▼
Create alert record in database
│
▼
Look up user subscriptions for watchlist
│
├──────────────────────────────────────────────────────┐
│ │
▼ ▼
[Web UI Channel] [External Channels]
│ │
▼ ▼
WebSocket Manager Notification Queue
│ │
▼ ▼
Push to connected clients Async delivery worker
│
┌────────────────┼────────────────┐
│ │ │
▼ ▼ ▼
Telegram Push (Future) Other (Future)
Web UI Notifications (WebSocket)¶
Real-time updates for operators actively monitoring the dashboard.
Implementation Details: See WebSocket PRP for FastAPI WebSocket implementation patterns, connection manager code, and client reconnection logic.
Purpose¶
- Live detection feed showing detections as they arrive
- Instant alert popups for watchlist matches
- Collector status updates (online/offline)
Connection Lifecycle¶
1. User authenticates via normal login flow
2. Browser establishes WebSocket connection to /ws
3. Server validates session from HTTP-only cookie
4. Connection registered with user ID in connection manager
5. Server pushes messages to relevant connections
6. Client reconnects automatically on disconnect
7. Connection removed from manager on close
Message Types¶
| Type | Direction | Purpose |
|---|---|---|
detection |
Server → Client | New plate detection (for live feed) |
alert |
Server → Client | Watchlist match notification |
collector_status |
Server → Client | Collector online/offline change |
ping |
Server → Client | Connection health check |
pong |
Client → Server | Response to ping |
subscribe |
Client → Server | Subscribe to specific feeds |
unsubscribe |
Client → Server | Unsubscribe from feeds |
Subscription Model¶
Users can subscribe to different notification feeds:
| Feed | Description | Default |
|---|---|---|
alerts:* |
All alerts for user's subscribed watchlists | On |
alerts:{watchlist_id} |
Alerts for specific watchlist | On (if subscribed) |
detections:live |
Real-time detection feed (high volume) | Off |
collectors:status |
Collector online/offline changes | Admin only |
Connection Management¶
| Concern | Approach |
|---|---|
| Authentication | Validate JWT from HTTP-only cookie on connect |
| Authorization | Check user role for feed subscriptions |
| Scaling | In-memory manager for POC; Redis pub/sub for multi-instance |
| Reconnection | Client auto-reconnects with exponential backoff |
| Missed messages | Client fetches recent alerts via REST on reconnect |
| Health check | Server pings every 30s; close connection after 3 missed pongs |
| Max connections | Configurable limit per user (default: 5) |
Mobile Network Handling¶
For mobile web users on unstable connections:
- Aggressive reconnection with short initial backoff (1 second)
- Visibility API detection: reconnect when tab becomes visible
- Network change detection: reconnect on online event
- Fetch missed alerts via REST API after reconnection gap
WebSocket Protocol Specification¶
Connection Endpoint¶
Authentication via HTTP-only cookie (JWT session) on initial HTTP upgrade.
Protocol Versioning¶
Protocol version is included in all server messages to support backward compatibility:
| Version | Status | Notes |
|---|---|---|
1 |
Current | Initial protocol |
Versioning Strategy:
- Server includes "protocol_version": 1 in the initial connection acknowledgment
- Clients should ignore unknown message types for forward compatibility
- Breaking changes require new version number and deprecation period
- Clients can request specific version via query param: /ws?v=1
Connection State Machine¶
┌─────────────┐ HTTP Upgrade ┌─────────────┐
│ Disconnected├────────────────────▶│ Connecting │
└──────▲──────┘ └──────┬──────┘
│ │
│ Auth Success
│ │
│ ▼
│ ┌─────────────┐ ┌─────────────┐
│◀────────│ Error │◀───│Authenticating│
│ Auth └─────────────┘ └──────┬──────┘
│ Fail │
│ Auth OK │
│ ▼
│ ┌─────────────┐
│◀───────────────────────────│ Connected │◀──────┐
│ Connection Lost └──────┬──────┘ │
│ │ │
│ Ping/Pong OK │
│ └──────────────┘
Message Format¶
All messages are JSON objects with a required type field:
{
"type": "message_type",
"payload": { ... },
"timestamp": "2025-01-15T14:32:05.123Z",
"id": "msg_abc123"
}
Server → Client Messages¶
1. Detection Message (detection)
Real-time plate detection for live feed display.
{
"type": "detection",
"payload": {
"id": "det_12345",
"plate_number": "ABC1234",
"plate_normalized": "ABC1234",
"confidence": 0.95,
"captured_at": "2025-01-15T14:32:05Z",
"collector": {
"id": "col_abc123",
"name": "Main St Entrance"
},
"camera": {
"id": "cam_001",
"name": "Camera #3"
},
"direction": "entering",
"image_urls": {
"full": "https://minio.example.com/images/2025/01/15/det_12345_full.jpg?token=...",
"crop": "https://minio.example.com/images/2025/01/15/det_12345_crop.jpg?token=..."
}
},
"timestamp": "2025-01-15T14:32:05.456Z",
"id": "msg_det_001"
}
2. Alert Notification (alert)
Watchlist match notification.
{
"type": "alert",
"payload": {
"alert_id": "alt_67890",
"detection_id": "det_12345", // Reference to the detection that triggered this alert
"plate_number": "ABC1234",
"watchlist": {
"id": "wl_stolen",
"name": "Stolen Vehicles",
"priority": "high"
},
"entry": {
"notes": "Reported stolen 2025-01-10",
"added_by": "Officer Smith",
"added_at": "2025-01-10T09:00:00Z"
},
"detection": {
"captured_at": "2025-01-15T14:32:05Z",
"confidence": 0.95,
"collector_name": "Main St Entrance",
"camera_name": "Camera #3",
"image_urls": {
"full": "https://minio.example.com/images/...",
"crop": "https://minio.example.com/images/..."
}
}
},
"timestamp": "2025-01-15T14:32:06.789Z",
"id": "msg_alt_001"
}
3. Collector Status (collector_status)
Collector online/offline state change (admin only).
{
"type": "collector_status",
"payload": {
"collector_id": "col_abc123",
"collector_name": "Main St Entrance",
"status": "offline",
"previous_status": "online",
"last_heartbeat_at": "2025-01-15T14:30:00Z",
"cameras_online": 0,
"cameras_total": 2
},
"timestamp": "2025-01-15T14:32:00.000Z",
"id": "msg_col_001"
}
4. Ping (ping)
Health check from server (every 30 seconds).
{
"type": "ping",
"payload": {
"server_time": "2025-01-15T14:32:00.000Z"
},
"timestamp": "2025-01-15T14:32:00.000Z",
"id": "msg_ping_001"
}
5. Subscription Confirmation (subscribed)
Response to client subscription request.
{
"type": "subscribed",
"payload": {
"feed": "alerts:wl_stolen",
"success": true
},
"timestamp": "2025-01-15T14:32:00.000Z",
"id": "msg_sub_001"
}
6. Error (error)
Server-side error notification.
{
"type": "error",
"payload": {
"code": "SUBSCRIPTION_DENIED",
"message": "You do not have permission to subscribe to this feed",
"feed": "collectors:status"
},
"timestamp": "2025-01-15T14:32:00.000Z",
"id": "msg_err_001"
}
Client → Server Messages¶
1. Pong (pong)
Response to server ping.
2. Subscribe (subscribe)
Subscribe to a notification feed.
3. Unsubscribe (unsubscribe)
Unsubscribe from a notification feed.
Error Codes¶
| Code | Description |
|---|---|
AUTH_FAILED |
Authentication failed or session expired |
AUTH_REQUIRED |
Message sent before authentication complete |
SUBSCRIPTION_DENIED |
User lacks permission for requested feed |
INVALID_MESSAGE |
Malformed JSON or missing required fields |
RATE_LIMITED |
Too many messages from client |
INTERNAL_ERROR |
Server-side error |
Reconnection Protocol¶
- On disconnect, wait
min(1s * 2^attempt, 60s)+ random jitter (0-1s) - Attempt reconnection
- On success, send
subscribefor previously active feeds - Call
GET /api/v1/alerts?since={last_received_timestamp}to fetch missed alerts - Reset attempt counter on successful reconnection
Rate Limits¶
| Direction | Limit | Window |
|---|---|---|
| Client → Server | 10 messages | 1 second |
| Server → Client | 100 messages | 1 second (per connection) |
Clients exceeding rate limits receive RATE_LIMITED error and may be disconnected.
Telegram Bot Notifications¶
Instant alerts delivered to Telegram for field volunteers and off-duty monitoring.
Purpose¶
- Alert delivery when not actively monitoring web UI
- Field-accessible notifications on mobile devices
- Group chat support for team alerts
- Image delivery (plate crop) with alert context
Bot Architecture¶
┌─────────────────────────────────────────────────────────────┐
│ Central Server │
│ │
│ Alert Worker ──▶ Notification Queue ──▶ Telegram Worker │
│ │
└─────────────────────────────────────────────────────────────┘
│
│ HTTPS (outbound only)
▼
┌─────────────────┐
│ Telegram API │
│ api.telegram.org│
└─────────────────┘
│
▼
┌─────────────────┐
│ User Devices │
│ (Telegram App) │
└─────────────────┘
User Registration Flow¶
1. User requests Telegram notifications in web UI
2. System generates unique linking code (expires in 10 minutes)
3. User sends linking code to bot via Telegram
4. Bot validates code and associates Telegram chat_id with user
5. User can now receive alerts via Telegram
6. User can unlink at any time via web UI or /unlink command
Bot Commands¶
| Command | Description |
|---|---|
/start |
Welcome message with setup instructions |
/link {code} |
Link Telegram account to ALPR user |
/unlink |
Disconnect Telegram notifications |
/status |
Show current subscription status |
/mute {duration} |
Temporarily mute notifications |
/unmute |
Resume notifications |
/help |
List available commands |
Alert Message Format¶
🚨 WATCHLIST ALERT
Plate: ABC-1234
Watchlist: Stolen Vehicles
Time: 2025-01-15 14:32:05
Location: Main St Camera #3
Confidence: 98%
[View Details] (link to web UI)
[Plate Crop Image]
Delivery Guarantees¶
| Concern | Approach |
|---|---|
| Rate limits | Respect Telegram API limits (see below) |
| Failures | Retry with exponential backoff (max 3 attempts) |
| Offline users | Telegram handles delivery when user comes online |
| Blocked bot | Mark user's Telegram as inactive; notify via web UI |
| Group throttling | Aggregate multiple alerts within 5 seconds |
Telegram API Rate Limits: - 30 messages/second to same chat (single user) - 20 messages/minute to same group - 1 message/second when sending to multiple chats (burst: 30) - 429 Too Many Requests response triggers exponential backoff
Implementation: - Queue messages per chat_id with rate limiter - Aggregate burst alerts within 5-second window - Track 429 responses and apply backoff globally
Security Considerations¶
| Concern | Mitigation |
|---|---|
| Linking code guessing | 8-character alphanumeric, 10-minute expiry, rate limited |
| Unauthorized access | Only linked users receive alerts; chat_id validated |
| Sensitive data | Plate numbers visible; no PII in messages |
| Bot token security | Stored as secret; never logged or exposed |
Database Schema¶
telegram_links
├── id (PK)
├── user_id (FK → users.id)
├── chat_id (Telegram chat ID, unique)
├── linked_at (timestamp)
├── is_active (boolean, default true)
├── muted_until (timestamp, nullable)
└── last_delivery_at (timestamp, nullable)
telegram_link_codes
├── id (PK)
├── user_id (FK → users.id)
├── code (unique, indexed)
├── created_at (timestamp)
├── expires_at (timestamp)
└── used_at (timestamp, nullable)
Push Notifications (Mobile App Phase)¶
Native push notifications for the future mobile application.
Phase¶
This channel will be implemented during the mobile app development phase (post-POC).
Planned Architecture¶
┌─────────────────────────────────────────────────────────────┐
│ Central Server │
│ │
│ Alert Worker ──▶ Notification Queue ──▶ Push Worker │
│ │
└─────────────────────────────────────────────────────────────┘
│
┌────────────┴────────────┐
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ FCM │ │ APNs │
│ (Android) │ │ (iOS) │
└─────────────┘ └─────────────┘
│ │
└────────────┬────────────┘
▼
┌─────────────────┐
│ Mobile Devices │
└─────────────────┘
Platform Support¶
| Platform | Service | Notes |
|---|---|---|
| Android | Firebase Cloud Messaging (FCM) | Free tier sufficient |
| iOS | Apple Push Notification service (APNs) | Requires Apple Developer account |
Key Considerations for Implementation¶
| Concern | Approach |
|---|---|
| Token management | Store device tokens; handle refresh and invalidation |
| Platform abstraction | Unified push service supporting both FCM and APNs |
| Silent push | Background sync for non-urgent updates |
| Rich notifications | Include plate crop image in notification |
| Action buttons | "View", "Dismiss", "Mark as Seen" |
| Delivery tracking | Track delivery and open rates for reliability |
| Battery impact | Batch low-priority notifications; respect OS limits |
Database Schema (Planned)¶
push_tokens
├── id (PK)
├── user_id (FK → users.id)
├── device_id (unique device identifier)
├── platform (ios | android)
├── token (FCM/APNs token)
├── created_at (timestamp)
├── last_used_at (timestamp)
└── is_active (boolean)
User Notification Preferences¶
Users configure which channels receive which alert types.
Preference Model¶
notification_preferences
├── id (PK)
├── user_id (FK → users.id)
├── watchlist_id (FK → watchlists.id, nullable for defaults)
├── channel (web | telegram | push)
├── enabled (boolean)
├── quiet_hours_start (time, nullable)
├── quiet_hours_end (time, nullable)
└── updated_at (timestamp)
Default Behavior¶
| Channel | Default State | Quiet Hours |
|---|---|---|
| Web UI | Enabled for all subscribed watchlists | No quiet hours |
| Telegram | Disabled until linked | Respects user setting |
| Push | Enabled when app installed | Respects OS settings |
Quiet Hours¶
- Per-channel quiet hours configuration
- Alerts still logged and visible in web UI
- Option to override for critical watchlists
- Timezone-aware scheduling
Notification Queue¶
External channel notifications (Telegram, Push) are processed asynchronously.
Queue Schema¶
notification_queue
├── id (PK)
├── alert_id (FK → alerts.id)
├── user_id (FK → users.id)
├── channel (telegram | push)
├── status (pending | processing | sent | failed)
├── attempts (int, default 0)
├── last_error (text, nullable)
├── created_at (timestamp)
├── sent_at (timestamp, nullable)
└── next_retry_at (timestamp, nullable)
Processing¶
- Dedicated worker per channel (or shared with channel routing)
- Batch processing for efficiency
- Exponential backoff on failures (1s, 5s, 30s)
- Max 3 retry attempts before marking failed
- Failed notifications logged for manual review
Monitoring¶
Additional metrics for notification channels:
| Metric | Description |
|---|---|
notifications_sent_total{channel} |
Successful deliveries by channel |
notifications_failed_total{channel} |
Failed deliveries by channel |
notification_latency_ms{channel} |
Time from alert to delivery |
telegram_links_active |
Currently linked Telegram users |
websocket_connections_active |
Current WebSocket connections |
Related Documentation¶
- PRP: WebSocket PRP - WebSocket implementation patterns
- Architecture: Data Retention - Alert history retention (7 years) and legal hold policies
Decision Date: 2025-12-29 Status: Approved Rationale: Addresses alert engine scalability concern identified in architecture review