Skip to content

Logging - Project Reference Pattern

Component: Application Logging & Observability Status: 🟢 Stable Created: 2025-12-29 Last Updated: 2025-12-29


Overview

Purpose

This PRP provides detailed implementation patterns for application logging across all components (central server, edge collectors, background workers). Consistent, structured logging enables effective troubleshooting, security auditing, and operational monitoring.

All logs MUST be JSON formatted (single line) with timestamp as the first field, enabling easy parsing, filtering, and aggregation by any log management platform.

Scope

Responsibilities: - HTTP request/response logging - Application event logging - External service call logging (object storage, external APIs) - Error and exception logging - Audit trail logging for user actions - Edge collector detection and connectivity logging

Out of Scope: - Log aggregation infrastructure selection (see docs/architecture/observability.md) - Log rotation and retention policies (see docs/architecture/observability.md) - Metrics collection (see docs/architecture/observability.md)

Dependencies

Requires: - structlog Python library for structured logging - FastAPI middleware support (central server) - S3-compatible storage client (MinIO)

Used By: - Central server (FastAPI backend) - Edge collectors (Python services) - Background workers


Quick Reference

Mandatory Log Format

{"timestamp":"2025-01-15T12:34:56.789Z","level":"INFO","service":"central","src_ip":"192.168.1.100","path":"/api/v1/events","method":"POST","status_code":201,"duration_ms":45,"request_id":"550e8400-e29b-41d4-a716-446655440000","message":"Request completed"}

Required Fields

Field When Required Type Description
timestamp Always (FIRST) string ISO 8601 with milliseconds, UTC
level Always string DEBUG, INFO, WARNING, ERROR, CRITICAL
service Always string "central", "edge", "worker"
message Always string Human-readable description
src_ip HTTP requests string Client IP address
path HTTP requests string Request path
method HTTP requests string HTTP method
status_code HTTP requests number Response status code
duration_ms Timed operations number Duration in milliseconds
request_id When available string UUID for correlation
user_id When authenticated string User identifier
collector_id Edge operations string Collector identifier

Key Patterns

  1. Structured JSON Logging: Use structlog with JSON renderer
  2. Request Context Binding: Bind request metadata at middleware level
  3. Context Unbinding: Use unbind_contextvars() for async safety
  4. DRY Logging Helpers: Extract repetitive logging into helper methods
  5. Rate-Limited Warnings: Prevent log spam for repeated warnings
  6. Service Wrappers: Wrap external service calls with logging
  7. Error Context: Always include stack traces and operation context

Required Imports

import structlog
from structlog.contextvars import bind_contextvars, unbind_contextvars

Implementation Task List

Use this checklist when implementing logging:

Setup & Configuration

  • [ ] Install structlog: pip install structlog
  • [ ] Create core/logging.py with setup function
  • [ ] Configure JSON formatter with timestamp first
  • [ ] Set up log levels per environment (DEBUG dev, INFO prod)
  • [ ] Configure environment variables (LOG_LEVEL, LOG_FORMAT)

Core Implementation

  • [ ] Implement FastAPI logging middleware
  • [ ] Implement request ID middleware
  • [ ] Create object storage logging wrapper
  • [ ] Set up database query logging (slow query warnings)
  • [ ] Add global exception handler

Error Handling

  • [ ] Include exc_info=True for ERROR/CRITICAL logs
  • [ ] Add context (operation, user, request) to all errors
  • [ ] Extract error_type and error_message as separate fields

Security

  • [ ] Never log passwords, tokens, or API keys
  • [ ] Sanitize PII in logs
  • [ ] Filter sensitive headers (Authorization, Cookie, X-API-Key)
  • [ ] Secure X-Forwarded-For handling (trust only from proxy)

Testing

  • [ ] Unit tests for log formatters
  • [ ] Integration tests verify log output format
  • [ ] Test timestamp is first field in JSON
  • [ ] Test sensitive data sanitization

Patterns

Pattern: Structured JSON Logging Setup

Problem: Default Python logging produces unstructured text that's hard to parse, search, and aggregate.

Solution: Use structlog with a JSON renderer that outputs timestamp as the first field.

Implementation:

# central/src/core/logging.py
import logging
import sys
from datetime import datetime, timezone
from typing import Any

import structlog
from structlog.types import EventDict, Processor


def add_timestamp_first(
    logger: logging.Logger, method_name: str, event_dict: EventDict
) -> EventDict:
    """Ensure timestamp is the first key in the output."""
    timestamp = datetime.now(timezone.utc).isoformat(timespec='milliseconds').replace('+00:00', 'Z')
    return {"timestamp": timestamp, **event_dict}


def setup_logging(log_level: str = "INFO", log_format: str = "json") -> None:
    """
    Configure structured JSON logging for the application.

    Args:
        log_level: Minimum log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
        log_format: Output format ("json" for production, "console" for development)
    """
    shared_processors: list[Processor] = [
        structlog.contextvars.merge_contextvars,
        structlog.stdlib.add_log_level,
        structlog.stdlib.add_logger_name,
        structlog.stdlib.PositionalArgumentsFormatter(),
        structlog.processors.StackInfoRenderer(),
        structlog.processors.format_exc_info,
        structlog.processors.UnicodeDecoder(),
    ]

    if log_format == "json":
        # Production: Single-line JSON with timestamp first
        processors = shared_processors + [
            add_timestamp_first,
            structlog.processors.JSONRenderer(),
        ]
    else:
        # Development: Pretty-printed console output
        processors = shared_processors + [
            structlog.dev.ConsoleRenderer(colors=True),
        ]

    structlog.configure(
        processors=processors,
        wrapper_class=structlog.stdlib.BoundLogger,
        context_class=dict,
        logger_factory=structlog.stdlib.LoggerFactory(),
        cache_logger_on_first_use=True,
    )

    # Configure standard library logging
    logging.basicConfig(
        format="%(message)s",
        stream=sys.stdout,
        level=getattr(logging, log_level.upper()),
    )


def get_logger(name: str) -> structlog.stdlib.BoundLogger:
    """Get a logger instance with the given name."""
    return structlog.get_logger(name)

When to Use: - Application startup (call once before any logging) - All modules should use get_logger(__name__)

Trade-offs: - Pros: Consistent format, easy parsing, context propagation - Cons: Slightly more verbose than print(), requires setup


Pattern: FastAPI Logging Middleware

Problem: Need to log every HTTP request/response with consistent fields including timing, status code, and client IP.

Solution: Create middleware that captures request start time, binds context, and logs on response.

Implementation:

# central/src/middleware/logging_middleware.py
import time
import uuid
from typing import Callable

import structlog
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware

from core.logging import get_logger

logger = get_logger(__name__)


def get_client_ip(request: Request) -> str:
    """
    Extract client IP, handling proxied requests.

    SECURITY: Only trust X-Forwarded-For from known reverse proxies.
    """
    # Check X-Forwarded-For header (from reverse proxy)
    forwarded_for = request.headers.get("X-Forwarded-For")
    if forwarded_for:
        # Take the first IP (original client)
        return forwarded_for.split(",")[0].strip()

    # Check X-Real-IP header
    real_ip = request.headers.get("X-Real-IP")
    if real_ip:
        return real_ip

    # Fall back to direct client
    if request.client:
        return request.client.host

    return "unknown"


class LoggingMiddleware(BaseHTTPMiddleware):
    """Middleware to log all HTTP requests and responses."""

    async def dispatch(
        self, request: Request, call_next: Callable
    ) -> Response:
        # Generate or extract request ID
        request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
        request.state.request_id = request_id

        # Extract client IP
        src_ip = get_client_ip(request)

        # Get user ID if authenticated (set by auth middleware)
        user_id = getattr(request.state, "user_id", None)

        # Bind context for this request
        # IMPORTANT: Use unbind at end, not clear (avoids race conditions in async)
        structlog.contextvars.bind_contextvars(
            request_id=request_id,
            src_ip=src_ip,
            path=request.url.path,
            method=request.method,
        )
        if user_id:
            structlog.contextvars.bind_contextvars(user_id=str(user_id))

        # Time the request
        start_time = time.perf_counter()

        try:
            response = await call_next(request)

            # Calculate duration
            duration_ms = round((time.perf_counter() - start_time) * 1000, 2)

            # Log successful request
            logger.info(
                "Request completed",
                status_code=response.status_code,
                duration_ms=duration_ms,
            )

            # Add request ID to response headers for tracing
            response.headers["X-Request-ID"] = request_id

            return response

        except Exception as exc:
            # Calculate duration
            duration_ms = round((time.perf_counter() - start_time) * 1000, 2)

            # Log error
            logger.error(
                "Request failed",
                status_code=500,
                duration_ms=duration_ms,
                error_type=type(exc).__name__,
                error_message=str(exc),
                exc_info=True,
            )
            raise

        finally:
            # CRITICAL: Unbind context to prevent leaking to other requests
            # Use unbind_contextvars (not clear) to avoid race conditions
            structlog.contextvars.unbind_contextvars(
                "request_id", "src_ip", "path", "method"
            )
            if user_id:
                structlog.contextvars.unbind_contextvars("user_id")

Registration in FastAPI:

# central/src/main.py
from fastapi import FastAPI
from core.logging import setup_logging, get_logger
from middleware.logging_middleware import LoggingMiddleware
from config import settings

# Initialize logging BEFORE anything else
setup_logging(log_level=settings.LOG_LEVEL, log_format=settings.LOG_FORMAT)

logger = get_logger(__name__)

app = FastAPI()

# Add logging middleware first (wraps all other middleware)
app.add_middleware(LoggingMiddleware)

logger.info(
    "Application starting",
    service="central",
    environment=settings.ENVIRONMENT,
)

When to Use: - All FastAPI applications in this project

Trade-offs: - Pros: Automatic logging for all requests, consistent format - Cons: Small overhead per request (~1ms)


Pattern: Global Exception Handler

Problem: Unhandled exceptions need to be logged with full context before returning error responses.

Solution: Register a global exception handler that logs and returns consistent error responses.

Implementation:

# central/src/main.py
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse

from core.logging import get_logger

logger = get_logger(__name__)


@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
    """Global exception handler for unhandled errors."""
    logger.error(
        "Unhandled exception",
        error_type=type(exc).__name__,
        error_message=str(exc),
        path=str(request.url.path),
        exc_info=True,
    )
    return JSONResponse(
        status_code=500,
        content={
            "error": {
                "code": "INTERNAL_ERROR",
                "message": "An unexpected error occurred",
            }
        },
    )

When to Use: - All FastAPI applications


Pattern: Object Storage (S3) Operation Logging

Problem: S3-compatible storage operations (MinIO) need to be logged with timing and result for troubleshooting.

Solution: Create a wrapper service that logs all operations with timing.

Implementation:

# central/src/services/storage.py
import time
from typing import BinaryIO, Optional

import structlog

logger = structlog.get_logger("storage")


class StorageService:
    """S3-compatible storage service with structured logging."""

    def __init__(self, client, default_bucket: str):
        self.client = client
        self.default_bucket = default_bucket

    def upload_file(
        self,
        file_data: BinaryIO,
        object_name: str,
        bucket: Optional[str] = None,
        content_type: str = "application/octet-stream",
    ) -> dict:
        """Upload a file to object storage with logging."""
        bucket = bucket or self.default_bucket
        start_time = time.perf_counter()

        log_context = {
            "operation": "upload",
            "bucket": bucket,
            "object_name": object_name,
            "content_type": content_type,
            "path": f"s3://{bucket}/{object_name}",
        }

        try:
            # Get file size
            file_data.seek(0, 2)
            file_size = file_data.tell()
            file_data.seek(0)

            log_context["file_size_bytes"] = file_size

            result = self.client.put_object(
                bucket_name=bucket,
                object_name=object_name,
                data=file_data,
                length=file_size,
                content_type=content_type,
            )

            duration_ms = round((time.perf_counter() - start_time) * 1000, 2)
            etag = getattr(result, 'etag', None)

            logger.info(
                "Storage upload completed",
                **log_context,
                status_code=200,
                duration_ms=duration_ms,
                etag=etag,
            )

            return {
                "bucket": bucket,
                "object_name": object_name,
                "etag": etag,
                "size": file_size,
            }

        except Exception as e:
            duration_ms = round((time.perf_counter() - start_time) * 1000, 2)
            status_code = getattr(e, 'code', 500)

            logger.error(
                "Storage upload failed",
                **log_context,
                status_code=status_code,
                duration_ms=duration_ms,
                error_type=type(e).__name__,
                error_message=str(e),
                exc_info=True,
            )
            raise

When to Use: - All object storage operations (MinIO image uploads/downloads)


Pattern: DRY Logging Helpers

Problem: Parsers and services often have repetitive logging patterns, violating DRY and making log message changes tedious.

Solution: Extract logging patterns into private helper methods with consistent formatting.

Implementation:

# edge/src/adapters/hikvision.py
from typing import Optional
import structlog

logger = structlog.get_logger(__name__)


class HikvisionAdapter:
    """Adapter with DRY logging helpers."""

    _timezone_warned_cameras: set[str] = set()  # Rate-limiting tracker

    def _log_parse_error(
        self,
        reason: str,
        camera_mac: Optional[str] = None,
        payload_size: Optional[int] = None,
        **extra_context
    ) -> None:
        """
        Log a parse error with consistent format.

        Args:
            reason: Short description of why parsing failed
            camera_mac: Camera MAC address if available
            payload_size: Size of the payload in bytes
            **extra_context: Additional context fields
        """
        context = {"reason": reason, "vendor": "hikvision"}
        if camera_mac:
            context["camera_mac"] = camera_mac
        if payload_size is not None:
            context["payload_size"] = payload_size
        context.update(extra_context)
        logger.error("Failed to parse detection", **context)

    def _log_timezone_warning(
        self,
        camera_mac: str,
        raw_timestamp: str,
    ) -> None:
        """
        Log timezone warning with rate-limiting per camera.

        Only logs WARNING on first occurrence per camera to prevent log spam.
        Subsequent occurrences logged at DEBUG level.
        """
        context = {
            "camera_mac": camera_mac,
            "raw_timestamp": raw_timestamp,
            "vendor": "hikvision",
        }

        if camera_mac not in self._timezone_warned_cameras:
            self._timezone_warned_cameras.add(camera_mac)
            logger.warning(
                "Detection timestamp missing timezone, assuming UTC (first occurrence)",
                **context
            )
        else:
            logger.debug("Detection timestamp missing timezone, assuming UTC", **context)

    def parse_detection(self, body: bytes) -> dict:
        """Parse with DRY logging."""
        if not body:
            self._log_parse_error("empty body")
            raise ValueError("Empty body")

        # ... parsing logic ...

Benefits: - Single point of change for log message format - Consistent context fields across all error logs - Rate-limiting prevents log spam - Cleaner code (less boilerplate)

When to Use: - Parser classes with 3+ similar logging calls - Any module with repetitive logging patterns


Pattern: Edge Collector Logging

Problem: Edge collectors need to log detections, upload status, and connectivity with collector context.

Solution: Bind collector_id to logger context at startup, log all operations with consistent format.

Implementation:

# edge/src/main.py
import structlog
from core.logging import setup_logging, get_logger
from config import settings

# Initialize logging
setup_logging(log_level=settings.LOG_LEVEL)

# Bind collector context globally
structlog.contextvars.bind_contextvars(
    service="edge",
    collector_id=settings.COLLECTOR_ID,
)

logger = get_logger(__name__)


# Detection logging
async def on_detection(detection):
    logger.info(
        "Plate detected",
        camera_id=detection.camera_id,
        plate_number=detection.plate_number,
        confidence=detection.confidence,
        direction=detection.direction,
    )


# Upload logging with retry context
async def upload_batch(events, attempt: int = 1):
    start_time = time.perf_counter()
    try:
        await client.post("/api/v1/events", json=events)
        duration_ms = round((time.perf_counter() - start_time) * 1000, 2)

        logger.info(
            "Batch uploaded",
            events_count=len(events),
            duration_ms=duration_ms,
            status_code=201,
        )
    except ConnectionError as e:
        duration_ms = round((time.perf_counter() - start_time) * 1000, 2)

        logger.warning(
            "Upload failed, will retry",
            events_count=len(events),
            retry_attempt=attempt,
            duration_ms=duration_ms,
            error_type=type(e).__name__,
            error_message=str(e),
        )
        raise


# Connectivity logging
async def heartbeat():
    try:
        await client.post("/api/v1/heartbeat", json=get_status())
        logger.debug("Heartbeat sent")
    except ConnectionError as e:
        logger.warning(
            "Central server unreachable",
            error_type=type(e).__name__,
            error_message=str(e),
            buffer_depth=buffer.count(),
        )

Pattern: Audit Trail Logging

Problem: Need to track all user-initiated changes for compliance, security, and operational visibility.

Solution: Write audit events to BOTH database AND structured log output.

Implementation:

# central/src/core/audit.py
from datetime import datetime, timezone
from typing import Any, Optional

import structlog
from fastapi import Request
from sqlalchemy.orm import Session

from models.audit import ChangeAuditLog

audit_logger = structlog.get_logger("audit")


class AuditService:
    """Service for recording audit trail events."""

    def __init__(self, db: Session):
        self.db = db

    def log_change(
        self,
        request: Request,
        action: str,
        entity_type: str,
        entity_id: str,
        changes: Optional[dict[str, dict[str, Any]]] = None,
        reason: Optional[str] = None,
    ) -> ChangeAuditLog:
        """
        Record a change audit event to database AND structured log.

        Args:
            request: FastAPI request object
            action: CREATE, UPDATE, DELETE
            entity_type: Table/resource name (e.g., "camera", "user")
            entity_id: ID of the affected record
            changes: Dict of {field: {old: value, new: value}} for updates
            reason: Optional reason for the change
        """
        user_id = getattr(request.state, "user_id", None)
        username = getattr(request.state, "username", "unknown")
        request_id = getattr(request.state, "request_id", None)

        # Get client IP
        src_ip = request.headers.get("X-Forwarded-For")
        if src_ip:
            src_ip = src_ip.split(",")[0].strip()
        else:
            src_ip = request.client.host if request.client else "unknown"

        # 1. Write to database
        audit_record = ChangeAuditLog(
            changed_at=datetime.now(timezone.utc),
            user_id=user_id,
            username=username,
            action=action,
            entity_type=entity_type,
            entity_id=str(entity_id),
            changes=changes,
            src_ip=src_ip,
            request_path=str(request.url.path),
            request_id=request_id,
            reason=reason,
        )
        self.db.add(audit_record)

        # 2. Write to structured log
        audit_logger.info(
            f"Audit: {action} {entity_type}",
            action=action,
            entity_type=entity_type,
            entity_id=str(entity_id),
            user_id=str(user_id) if user_id else None,
            username=username,
            src_ip=src_ip,
            path=str(request.url.path),
            status_code=200,
            duration_ms=0,
            request_id=str(request_id) if request_id else None,
            changes=changes,
            reason=reason,
        )

        return audit_record

When to Use: - All user-initiated data modifications (CREATE, UPDATE, DELETE) - Configuration changes - Permission and role changes


Anti-Patterns

Anti-Pattern: Print Statement Debugging

Problem: Using print() instead of structured logging loses all context.

Example of BAD code:

def process_plate(plate_data: dict) -> None:
    print(f"Processing plate: {plate_data}")  # BAD
    result = detect_plate(plate_data)
    print(f"Result: {result}")  # BAD

Why It's Wrong: - No timestamp, log level, or request context - Can't filter by severity - Won't appear in log aggregation systems

Correct Approach:

def process_plate(plate_data: dict) -> None:
    logger.info("Processing plate", plate_number=plate_data.get("plate"))
    result = detect_plate(plate_data)
    logger.info("Plate detection completed", confidence=result.confidence)


Anti-Pattern: Logging Without Context

Problem: Logging messages without required fields makes troubleshooting impossible.

Example of BAD code:

logger.info("Request completed")  # BAD - no context
logger.error("Database error")    # BAD - no details

Correct Approach:

logger.info(
    "Request completed",
    status_code=200,
    duration_ms=45,
)

logger.error(
    "Database error",
    status_code=500,
    duration_ms=120,
    error_type="IntegrityError",
    error_message="Duplicate key violation",
    exc_info=True,
)


Anti-Pattern: Using clear_contextvars in Async Code

Problem: Using clear_contextvars() in async middleware can clear context from concurrent requests.

Example of BAD code:

async def dispatch(self, request, call_next):
    structlog.contextvars.clear_contextvars()  # BAD - clears ALL context
    structlog.contextvars.bind_contextvars(request_id=request_id)
    # ...

Why It's Wrong: - In async code, multiple requests run concurrently - clear_contextvars() affects all requests, not just the current one - Can cause context from request A to appear in request B's logs

Correct Approach:

async def dispatch(self, request, call_next):
    # Bind context for this request
    structlog.contextvars.bind_contextvars(request_id=request_id)
    try:
        response = await call_next(request)
        return response
    finally:
        # Unbind ONLY the keys we bound (not clear all)
        structlog.contextvars.unbind_contextvars("request_id")


Anti-Pattern: Logging Sensitive Data

Problem: Accidentally logging passwords, tokens, or PII.

Example of BAD code:

logger.info("User login", username=username, password=password)  # BAD
logger.info("API call", headers=dict(request.headers))  # BAD - may include auth

Correct Approach:

logger.info("User login", username=username)  # Password omitted

# Sanitize headers
SENSITIVE_HEADERS = {"authorization", "cookie", "x-api-key"}
safe_headers = {k: v for k, v in request.headers.items()
                if k.lower() not in SENSITIVE_HEADERS}
logger.info("API call", headers=safe_headers)


Anti-Pattern: Multi-Line Log Format

Problem: Multi-line or pretty-printed logs break log aggregation.

Example of BAD code:

logger.error("""
Error occurred:
  User: john@example.com
  Reason: invalid input
""")

Correct Approach:

logger.error("Error occurred", user="john@example.com", reason="invalid_input")


Anti-Pattern: Swallowing Exceptions

Problem: Catching exceptions without logging them hides errors.

Example of BAD code:

try:
    process_data()
except Exception:
    pass  # Silent failure - BAD

Correct Approach:

try:
    process_data()
except Exception as e:
    logger.error(
        "Processing failed",
        error_type=type(e).__name__,
        error_message=str(e),
        exc_info=True,
    )
    raise  # Or handle appropriately


Configuration

Environment Variables

Variable Required Default Description
LOG_LEVEL No INFO Minimum log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
LOG_FORMAT No json Output format ("json" for production, "console" for development)

Configuration Example

# central/src/config.py
from pydantic_settings import BaseSettings


class Settings(BaseSettings):
    LOG_LEVEL: str = "INFO"
    LOG_FORMAT: str = "json"  # "json" or "console"

    # Edge collector only
    COLLECTOR_ID: str = ""

    class Config:
        env_file = ".env"

Testing Strategies

Unit Testing Log Output

# tests/test_logging.py
import json
import structlog


def test_log_format_is_valid_json(capsys):
    """Verify log output is valid JSON."""
    logger = structlog.get_logger("test")
    logger.info("Test message", key="value")

    captured = capsys.readouterr()
    log_entry = json.loads(captured.out.strip())

    assert "timestamp" in log_entry
    assert log_entry["level"] == "info"
    assert log_entry["key"] == "value"


def test_timestamp_is_first_field(capsys):
    """Verify timestamp is the first field in JSON output."""
    logger = structlog.get_logger("test")
    logger.info("Test message")

    captured = capsys.readouterr()
    assert captured.out.strip().startswith('{"timestamp"')

Integration Testing

# tests/integration/test_logging_middleware.py
import json
from fastapi.testclient import TestClient


def test_request_logging_includes_required_fields(client: TestClient, capsys):
    """Verify middleware logs all required fields."""
    response = client.get("/api/v1/health")

    captured = capsys.readouterr()
    log_entry = json.loads(captured.out.strip())

    required_fields = [
        "timestamp", "level", "src_ip", "path",
        "method", "status_code", "duration_ms", "request_id"
    ]

    for field in required_fields:
        assert field in log_entry, f"Missing required field: {field}"

Examples

Example 1: Complete Request Logging

Scenario: Full request/response cycle with authentication

{"timestamp":"2025-01-15T12:34:56.789Z","level":"INFO","service":"central","src_ip":"192.168.1.100","path":"/api/v1/plates/search","method":"POST","status_code":200,"duration_ms":145,"user_id":"usr_admin_001","request_id":"550e8400-e29b-41d4-a716-446655440000","message":"Request completed","result_count":15}

Example 2: Error with Stack Trace

Scenario: Database connection failure

{"timestamp":"2025-01-15T12:34:56.789Z","level":"ERROR","service":"central","src_ip":"192.168.1.100","path":"/api/v1/cameras","method":"GET","status_code":500,"duration_ms":5023,"user_id":"usr_operator_002","request_id":"uuid-here","message":"Database connection failed","error_type":"OperationalError","error_message":"connection refused","exc_info":"Traceback (most recent call last):\n  File ..."}

Example 3: Edge Collector Detection

Scenario: Plate detected at edge

{"timestamp":"2025-01-15T12:34:56.789Z","level":"INFO","service":"edge","collector_id":"col_site01","camera_id":"cam_entrance","message":"Plate detected","plate_number":"ABC123","confidence":0.95,"direction":"entering"}

Example 4: Storage Operation

Scenario: Image upload to MinIO

{"timestamp":"2025-01-15T12:34:56.789Z","level":"INFO","service":"central","path":"s3://alpr-images/2025/01/15/abc123.jpg","status_code":200,"duration_ms":89,"request_id":"uuid-here","message":"Storage upload completed","operation":"upload","bucket":"alpr-images","object_name":"2025/01/15/abc123.jpg","file_size_bytes":245678}

Example 5: Audit Trail Entry

Scenario: Camera configuration update

{"timestamp":"2025-01-15T12:34:56.789Z","level":"INFO","service":"central","src_ip":"192.168.1.50","path":"/api/v1/cameras/cam_abc123","method":"PATCH","status_code":200,"user_id":"usr_admin_001","request_id":"uuid-here","message":"Audit: UPDATE camera","action":"UPDATE","entity_type":"camera","entity_id":"cam_abc123","changes":{"name":{"old":"Main St Camera","new":"Main Street North"}}}

Internal References

External Resources


Quick Checklist

When implementing logging:

  • [ ] Read this entire PRP
  • [ ] Check global.md for cross-cutting concerns
  • [ ] Set up structlog with JSON formatter
  • [ ] Use add_timestamp_first processor
  • [ ] Implement logging middleware with unbind_contextvars
  • [ ] Wrap external service calls with logging
  • [ ] Add DRY helpers for repetitive patterns
  • [ ] Verify all required fields are included
  • [ ] Test that output is valid single-line JSON
  • [ ] Ensure sensitive data is not logged
  • [ ] Set up audit logging for user actions

Remember: PRPs are living documents. Update as patterns evolve!