Skip to content

Data Integrity and Evidence Chain-of-Custody

Overview

Plate detection data may be used as evidence in law enforcement investigations. This document defines integrity controls to ensure data is:

  • Immutable - Cannot be modified after capture
  • Tamper-evident - Any modification is detectable
  • Auditable - All access is logged
  • Legally defensible - Chain of custody is documented

WORM (Write Once Read Many) Implementation

detections table Protection

Three-layer protection for the events table:

Layer 1: Application Level

No UPDATE/DELETE endpoints exist for detection data:

# API endpoints for events
@router.post("/api/v1/events/batch")    # INSERT only
@router.get("/api/v1/events/{id}")      # SELECT only
@router.get("/api/v1/events/search")    # SELECT only

# No PUT, PATCH, DELETE endpoints for events

Layer 2: Database Role Permissions

Application user cannot modify detection data:

-- Application role (used by FastAPI)
CREATE ROLE alpr_app WITH LOGIN PASSWORD 'xxx';
GRANT CONNECT ON DATABASE alpr TO alpr_app;
GRANT USAGE ON SCHEMA public TO alpr_app;

-- detections table: INSERT and SELECT only
GRANT SELECT, INSERT ON events TO alpr_app;
GRANT USAGE, SELECT ON SEQUENCE events_id_seq TO alpr_app;

-- NO UPDATE or DELETE granted on events

-- Retention cleanup role (separate credentials, audited)
CREATE ROLE alpr_retention WITH LOGIN PASSWORD 'yyy';
GRANT SELECT, DELETE ON events TO alpr_retention;
-- Used only by automated retention job

Layer 3: Database Triggers

Reject any modification attempt regardless of permissions:

-- Immutability enforcement trigger
CREATE OR REPLACE FUNCTION enforce_event_immutability()
RETURNS TRIGGER AS $$
DECLARE
    has_legal_hold BOOLEAN;
BEGIN
    IF TG_OP = 'UPDATE' THEN
        RAISE EXCEPTION 'detections table is append-only. UPDATE not allowed. Detection ID: %', OLD.id;
    END IF;

    IF TG_OP = 'DELETE' THEN
        -- Check legal hold via junction table (no column on events)
        SELECT EXISTS(
            SELECT 1 FROM legal_holds
            WHERE detection_id = OLD.id AND released_at IS NULL
        ) INTO has_legal_hold;

        IF has_legal_hold THEN
            RAISE EXCEPTION 'Detection % is under legal hold. DELETE not allowed.', OLD.id;
        END IF;

        -- Check retention period (7 years)
        IF OLD.captured_at > NOW() - INTERVAL '7 years' THEN
            RAISE EXCEPTION 'Detection % is within retention period. DELETE not allowed.', OLD.id;
        END IF;

        -- Log deletion (allowed only for expired records)
        INSERT INTO deletion_audit_log (
            table_name, record_id, deleted_at, deleted_by, reason
        ) VALUES (
            'events', OLD.id, NOW(), current_user, 'Retention policy'
        );
    END IF;

    RETURN OLD;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER events_worm_protection
    BEFORE UPDATE OR DELETE ON events
    FOR EACH ROW
    EXECUTE FUNCTION enforce_event_immutability();

Protected Tables

Table INSERT SELECT UPDATE DELETE
events App App Blocked Retention only*
event_images App App Blocked Retention only*
alerts App App Blocked Retention only*
audit_log App App Blocked Blocked
legal_holds App App Release only** Blocked

Retention deletion allowed only for records past 7-year retention AND not under legal hold. *Only release fields (released_at, released_by, release_reason) can be set on unreleased holds.


Image Integrity

Hash on Ingest

Every image hashed when received from edge collector:

import hashlib

def compute_image_hash(image_bytes: bytes) -> str:
    """Compute SHA-256 hash of image."""
    return hashlib.sha256(image_bytes).hexdigest()

# On detection ingest
detection.full_image_hash = compute_image_hash(full_image_bytes)
detection.crop_image_hash = compute_image_hash(crop_image_bytes)

Database Schema

CREATE TABLE events (
    id BIGSERIAL PRIMARY KEY,
    -- ... other fields ...

    -- Image integrity
    full_image_hash CHAR(64) NOT NULL,    -- SHA-256
    crop_image_hash CHAR(64),              -- SHA-256 (optional)

    -- Metadata integrity
    record_hash CHAR(64) NOT NULL          -- Hash of entire record

    -- NOTE: Legal holds are tracked in separate legal_holds table
    -- to maintain WORM immutability of the detections table
);

Verification on Retrieval

Option to verify image integrity when accessed:

async def get_detection_with_verification(detection_id: int) -> Detection:
    """Retrieve detection and optionally verify image integrity."""
    detection = await db.get_detection(detection_id)

    # Fetch image from MinIO
    image_bytes = await storage.download(detection.full_image_path)

    # Verify hash
    computed_hash = compute_image_hash(image_bytes)
    if computed_hash != detection.full_image_hash:
        logger.critical("Image integrity violation",
            detection_id=detection_id,
            stored_hash=detection.full_image_hash,
            computed_hash=computed_hash)
        raise IntegrityError(f"Image hash mismatch for detection {detection_id}")

    return detection

Record Hash Chain (Optional)

For cryptographic tamper evidence, each record includes hash of previous:

CREATE TABLE events (
    id BIGSERIAL PRIMARY KEY,
    -- ... other fields ...

    prev_record_hash CHAR(64),  -- Hash of previous record (NULL for first)
    record_hash CHAR(64) NOT NULL  -- Hash of this record
);

-- Index for chain verification
CREATE INDEX idx_events_record_hash ON events (record_hash);

Hash Computation

import json
import hashlib

def compute_record_hash(detection: dict, prev_hash: str | None) -> str:
    """Compute SHA-256 hash of detection record."""
    # Canonical JSON representation (sorted keys, no whitespace)
    canonical = json.dumps({
        "id": detection["id"],
        "captured_at": detection["captured_at"].isoformat(),
        "collector_id": detection["collector_id"],
        "camera_id": detection["camera_id"],
        "plate_number": detection["plate_number"],
        "confidence": detection["confidence"],
        "full_image_hash": detection["full_image_hash"],
        "crop_image_hash": detection["crop_image_hash"],
        "prev_hash": prev_hash
    }, sort_keys=True, separators=(',', ':'))

    return hashlib.sha256(canonical.encode()).hexdigest()

Chain Verification

Periodic job to verify chain integrity:

async def verify_hash_chain(start_id: int, end_id: int) -> bool:
    """Verify hash chain integrity for a range of detections."""
    detections = await db.query(Detection).filter(
        Detection.id >= start_id,
        Detection.id <= end_id
    ).order_by(Detection.id).all()

    for i, detection in enumerate(detections):
        # Compute expected hash
        prev_hash = detections[i-1].record_hash if i > 0 else None
        expected = compute_record_hash(detection.to_dict(), prev_hash)

        if expected != detection.record_hash:
            logger.critical("Hash chain broken",
                detection_id=detection.id,
                expected_hash=expected,
                stored_hash=detection.record_hash)
            return False

    return True

Access Audit Logging

What to Log

Action Logged Fields
View detection user_id, detection_id, timestamp, src_ip
Search plates user_id, search_query, result_count, timestamp
Export data user_id, export_type, filter_criteria, record_count
View image user_id, detection_id, image_type, timestamp
Generate report user_id, report_type, parameters, timestamp

Audit Log Schema

CREATE TABLE access_audit_log (
    id BIGSERIAL PRIMARY KEY,
    accessed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    user_id UUID NOT NULL,
    username VARCHAR(255) NOT NULL,
    action VARCHAR(50) NOT NULL,
    resource_type VARCHAR(100) NOT NULL,
    resource_id VARCHAR(255),
    query_params JSONB,
    result_count INTEGER,
    src_ip VARCHAR(45) NOT NULL,
    user_agent TEXT,
    request_id UUID
);

-- Indexes for audit queries
CREATE INDEX idx_access_audit_user ON access_audit_log (user_id, accessed_at DESC);
CREATE INDEX idx_access_audit_resource ON access_audit_log (resource_type, resource_id);
CREATE INDEX idx_access_audit_time ON access_audit_log (accessed_at DESC);

Audit Log Protection

Access audit log is append-only with no deletion:

CREATE TRIGGER access_audit_immutable
    BEFORE UPDATE OR DELETE ON access_audit_log
    FOR EACH ROW
    EXECUTE FUNCTION prevent_modification();  -- Always raises exception

Purpose

Prevent deletion of evidence relevant to active investigations.

Design Decision: Junction Table

Problem: The detections table is WORM-protected (no updates allowed), but legal holds need to be set and released dynamically.

Solution: Use a separate legal_holds junction table instead of columns on the detections table. This: - Maintains true WORM immutability on the detections table - Allows legal holds to be set/released via INSERT (never UPDATE) - Provides complete audit trail of all hold actions - Supports multiple holds per detection (different cases)

Schema

-- Legal holds junction table (not a column on events)
CREATE TABLE legal_holds (
    id BIGSERIAL PRIMARY KEY,
    case_id VARCHAR(100) NOT NULL,
    detection_id BIGINT NOT NULL REFERENCES events(id),
    reason TEXT,

    -- Set information
    set_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    set_by UUID NOT NULL REFERENCES users(id),
    hold_until TIMESTAMPTZ,  -- Optional explicit expiration

    -- Release information (NULL = still active)
    released_at TIMESTAMPTZ,
    released_by UUID REFERENCES users(id),
    release_reason TEXT,

    -- Constraints
    CONSTRAINT unique_active_hold UNIQUE (case_id, detection_id)
);

-- Indexes for efficient lookups
CREATE INDEX idx_legal_holds_event ON legal_holds (detection_id) WHERE released_at IS NULL;
CREATE INDEX idx_legal_holds_case ON legal_holds (case_id);
CREATE INDEX idx_legal_holds_active ON legal_holds (set_at) WHERE released_at IS NULL;
-- Set legal hold on matching events
INSERT INTO legal_holds (case_id, detection_id, reason, set_by, hold_until)
SELECT
    'CASE-2025-001234',
    id,
    'Suspected vehicle in robbery investigation',
    'usr_admin_001'::uuid,
    '2026-01-31'::timestamptz  -- Optional: explicit hold expiration
FROM events
WHERE plate_number = 'ABC123'
  AND captured_at BETWEEN '2025-01-01' AND '2025-01-31'
ON CONFLICT (case_id, detection_id) DO NOTHING;  -- Idempotent
-- Release hold by updating release fields
-- Note: This UPDATE is on legal_holds table, NOT detections table
UPDATE legal_holds
SET released_at = NOW(),
    released_by = 'usr_admin_001'::uuid,
    release_reason = 'Case closed - no charges filed'
WHERE case_id = 'CASE-2025-001234'
  AND released_at IS NULL;
-- Check if detection is under active legal hold
SELECT EXISTS(
    SELECT 1 FROM legal_holds
    WHERE detection_id = 12345
      AND released_at IS NULL
      AND (hold_until IS NULL OR hold_until > NOW())
) AS has_active_hold;

-- List all active holds for a detection
SELECT case_id, reason, set_at, set_by, hold_until
FROM legal_holds
WHERE detection_id = 12345
  AND released_at IS NULL
  AND (hold_until IS NULL OR hold_until > NOW());

The legal_holds table itself is append-only for set records:

-- Protect legal_holds from unauthorized modification
CREATE OR REPLACE FUNCTION enforce_legal_holds_immutability()
RETURNS TRIGGER AS $$
BEGIN
    IF TG_OP = 'DELETE' THEN
        RAISE EXCEPTION 'Legal hold records cannot be deleted. Release the hold instead.';
    END IF;

    IF TG_OP = 'UPDATE' THEN
        -- Only allow setting release fields on unreleased holds
        IF OLD.released_at IS NOT NULL THEN
            RAISE EXCEPTION 'Released legal holds cannot be modified.';
        END IF;

        -- Only allow setting release fields, nothing else
        IF NEW.case_id != OLD.case_id OR
           NEW.detection_id != OLD.detection_id OR
           NEW.reason IS DISTINCT FROM OLD.reason OR
           NEW.set_at != OLD.set_at OR
           NEW.set_by != OLD.set_by OR
           NEW.hold_until IS DISTINCT FROM OLD.hold_until THEN
            RAISE EXCEPTION 'Only release fields can be modified on legal holds.';
        END IF;
    END IF;

    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER legal_holds_protection
    BEFORE UPDATE OR DELETE ON legal_holds
    FOR EACH ROW
    EXECUTE FUNCTION enforce_legal_holds_immutability();

API Endpoints

POST   /api/v1/legal-holds              # Set hold (admin only)
POST   /api/v1/legal-holds/{id}/release # Release hold (admin only)
GET    /api/v1/legal-holds              # List active holds
GET    /api/v1/legal-holds/{id}         # Get hold details with affected detections
GET    /api/v1/detections/{id}/legal-holds  # Get holds for specific detection

Automatic Expiration

Optional background job to auto-release expired holds:

async def release_expired_holds():
    """Release legal holds past their hold_until date."""
    await db.execute("""
        UPDATE legal_holds
        SET released_at = NOW(),
            released_by = NULL,  -- System release
            release_reason = 'Automatic: hold_until date passed'
        WHERE released_at IS NULL
          AND hold_until IS NOT NULL
          AND hold_until < NOW()
    """)

Evidence Export

Export Package

When exporting evidence for law enforcement:

@dataclass
class EvidencePackage:
    export_id: str                    # Unique export identifier
    exported_at: datetime             # Export timestamp
    exported_by: str                  # User who exported
    case_reference: str               # External case number
    detections: List[Detection]       # Detection records
    images: List[ImageFile]           # Image files
    integrity_manifest: dict          # Hashes of all contents
    chain_of_custody: List[dict]      # Access history

Integrity Manifest

{
  "export_id": "exp_20250115_abc123",
  "created_at": "2025-01-15T12:34:56.789Z",
  "created_by": "usr_admin_001",
  "algorithm": "SHA-256",
  "files": [
    {"path": "events.json", "hash": "abc123..."},
    {"path": "images/evt_001_full.jpg", "hash": "def456..."},
    {"path": "images/evt_001_crop.jpg", "hash": "ghi789..."}
  ],
  "package_hash": "xyz987..."
}

Export Audit

Every export logged with full details:

CREATE TABLE evidence_export_log (
    id BIGSERIAL PRIMARY KEY,
    export_id VARCHAR(100) UNIQUE NOT NULL,
    exported_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    exported_by UUID NOT NULL,
    case_reference VARCHAR(100),
    event_count INTEGER NOT NULL,
    detection_ids BIGINT[] NOT NULL,
    date_range_start TIMESTAMPTZ,
    date_range_end TIMESTAMPTZ,
    filter_criteria JSONB,
    package_hash CHAR(64) NOT NULL,
    recipient TEXT,
    notes TEXT
);

Integrity Monitoring

Automated Checks

Check Frequency Action on Failure
Hash chain verification Daily Alert + investigate
Image hash spot-check Daily (sample) Alert + investigate
Trigger presence Hourly Alert + restore
Permission audit Daily Alert + remediate

Monitoring Queries

-- Check for missing triggers
SELECT tgname FROM pg_trigger
WHERE tgrelid = 'events'::regclass
AND tgname = 'events_worm_protection';

-- Check for unexpected permissions
SELECT grantee, privilege_type
FROM information_schema.table_privileges
WHERE table_name = 'events'
AND privilege_type IN ('UPDATE', 'DELETE');

-- Check for broken hash chain (sample)
SELECT e1.id, e1.record_hash, e2.prev_record_hash
FROM events e1
JOIN events e2 ON e2.id = e1.id + 1
WHERE e1.record_hash != e2.prev_record_hash
LIMIT 10;

Summary

Control Layer Purpose
No UPDATE/DELETE API Application Prevent accidental modification
Role permissions Database Restrict capabilities
WORM triggers Database Enforce immutability
Image hashing Application Detect tampering
Record hash chain Database Cryptographic integrity
Access audit log Application Track who viewed what
Legal hold (junction table) Database Preserve evidence without breaking WORM
Evidence export Application Chain of custody

Decision Date: 2025-12-29 Status: Approved Rationale: Evidence integrity is critical for law enforcement use cases; multi-layer protection provides defense in depth