Data Integrity and Evidence Chain-of-Custody¶
Overview¶
Plate detection data may be used as evidence in law enforcement investigations. This document defines integrity controls to ensure data is:
- Immutable - Cannot be modified after capture
- Tamper-evident - Any modification is detectable
- Auditable - All access is logged
- Legally defensible - Chain of custody is documented
WORM (Write Once Read Many) Implementation¶
detections table Protection¶
Three-layer protection for the events table:
Layer 1: Application Level¶
No UPDATE/DELETE endpoints exist for detection data:
# API endpoints for events
@router.post("/api/v1/events/batch") # INSERT only
@router.get("/api/v1/events/{id}") # SELECT only
@router.get("/api/v1/events/search") # SELECT only
# No PUT, PATCH, DELETE endpoints for events
Layer 2: Database Role Permissions¶
Application user cannot modify detection data:
-- Application role (used by FastAPI)
CREATE ROLE alpr_app WITH LOGIN PASSWORD 'xxx';
GRANT CONNECT ON DATABASE alpr TO alpr_app;
GRANT USAGE ON SCHEMA public TO alpr_app;
-- detections table: INSERT and SELECT only
GRANT SELECT, INSERT ON events TO alpr_app;
GRANT USAGE, SELECT ON SEQUENCE events_id_seq TO alpr_app;
-- NO UPDATE or DELETE granted on events
-- Retention cleanup role (separate credentials, audited)
CREATE ROLE alpr_retention WITH LOGIN PASSWORD 'yyy';
GRANT SELECT, DELETE ON events TO alpr_retention;
-- Used only by automated retention job
Layer 3: Database Triggers¶
Reject any modification attempt regardless of permissions:
-- Immutability enforcement trigger
CREATE OR REPLACE FUNCTION enforce_event_immutability()
RETURNS TRIGGER AS $$
DECLARE
has_legal_hold BOOLEAN;
BEGIN
IF TG_OP = 'UPDATE' THEN
RAISE EXCEPTION 'detections table is append-only. UPDATE not allowed. Detection ID: %', OLD.id;
END IF;
IF TG_OP = 'DELETE' THEN
-- Check legal hold via junction table (no column on events)
SELECT EXISTS(
SELECT 1 FROM legal_holds
WHERE detection_id = OLD.id AND released_at IS NULL
) INTO has_legal_hold;
IF has_legal_hold THEN
RAISE EXCEPTION 'Detection % is under legal hold. DELETE not allowed.', OLD.id;
END IF;
-- Check retention period (7 years)
IF OLD.captured_at > NOW() - INTERVAL '7 years' THEN
RAISE EXCEPTION 'Detection % is within retention period. DELETE not allowed.', OLD.id;
END IF;
-- Log deletion (allowed only for expired records)
INSERT INTO deletion_audit_log (
table_name, record_id, deleted_at, deleted_by, reason
) VALUES (
'events', OLD.id, NOW(), current_user, 'Retention policy'
);
END IF;
RETURN OLD;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER events_worm_protection
BEFORE UPDATE OR DELETE ON events
FOR EACH ROW
EXECUTE FUNCTION enforce_event_immutability();
Protected Tables¶
| Table | INSERT | SELECT | UPDATE | DELETE |
|---|---|---|---|---|
events |
App | App | Blocked | Retention only* |
event_images |
App | App | Blocked | Retention only* |
alerts |
App | App | Blocked | Retention only* |
audit_log |
App | App | Blocked | Blocked |
legal_holds |
App | App | Release only** | Blocked |
Retention deletion allowed only for records past 7-year retention AND not under legal hold. *Only release fields (released_at, released_by, release_reason) can be set on unreleased holds.
Image Integrity¶
Hash on Ingest¶
Every image hashed when received from edge collector:
import hashlib
def compute_image_hash(image_bytes: bytes) -> str:
"""Compute SHA-256 hash of image."""
return hashlib.sha256(image_bytes).hexdigest()
# On detection ingest
detection.full_image_hash = compute_image_hash(full_image_bytes)
detection.crop_image_hash = compute_image_hash(crop_image_bytes)
Database Schema¶
CREATE TABLE events (
id BIGSERIAL PRIMARY KEY,
-- ... other fields ...
-- Image integrity
full_image_hash CHAR(64) NOT NULL, -- SHA-256
crop_image_hash CHAR(64), -- SHA-256 (optional)
-- Metadata integrity
record_hash CHAR(64) NOT NULL -- Hash of entire record
-- NOTE: Legal holds are tracked in separate legal_holds table
-- to maintain WORM immutability of the detections table
);
Verification on Retrieval¶
Option to verify image integrity when accessed:
async def get_detection_with_verification(detection_id: int) -> Detection:
"""Retrieve detection and optionally verify image integrity."""
detection = await db.get_detection(detection_id)
# Fetch image from MinIO
image_bytes = await storage.download(detection.full_image_path)
# Verify hash
computed_hash = compute_image_hash(image_bytes)
if computed_hash != detection.full_image_hash:
logger.critical("Image integrity violation",
detection_id=detection_id,
stored_hash=detection.full_image_hash,
computed_hash=computed_hash)
raise IntegrityError(f"Image hash mismatch for detection {detection_id}")
return detection
Record Hash Chain (Optional)¶
For cryptographic tamper evidence, each record includes hash of previous:
CREATE TABLE events (
id BIGSERIAL PRIMARY KEY,
-- ... other fields ...
prev_record_hash CHAR(64), -- Hash of previous record (NULL for first)
record_hash CHAR(64) NOT NULL -- Hash of this record
);
-- Index for chain verification
CREATE INDEX idx_events_record_hash ON events (record_hash);
Hash Computation¶
import json
import hashlib
def compute_record_hash(detection: dict, prev_hash: str | None) -> str:
"""Compute SHA-256 hash of detection record."""
# Canonical JSON representation (sorted keys, no whitespace)
canonical = json.dumps({
"id": detection["id"],
"captured_at": detection["captured_at"].isoformat(),
"collector_id": detection["collector_id"],
"camera_id": detection["camera_id"],
"plate_number": detection["plate_number"],
"confidence": detection["confidence"],
"full_image_hash": detection["full_image_hash"],
"crop_image_hash": detection["crop_image_hash"],
"prev_hash": prev_hash
}, sort_keys=True, separators=(',', ':'))
return hashlib.sha256(canonical.encode()).hexdigest()
Chain Verification¶
Periodic job to verify chain integrity:
async def verify_hash_chain(start_id: int, end_id: int) -> bool:
"""Verify hash chain integrity for a range of detections."""
detections = await db.query(Detection).filter(
Detection.id >= start_id,
Detection.id <= end_id
).order_by(Detection.id).all()
for i, detection in enumerate(detections):
# Compute expected hash
prev_hash = detections[i-1].record_hash if i > 0 else None
expected = compute_record_hash(detection.to_dict(), prev_hash)
if expected != detection.record_hash:
logger.critical("Hash chain broken",
detection_id=detection.id,
expected_hash=expected,
stored_hash=detection.record_hash)
return False
return True
Access Audit Logging¶
What to Log¶
| Action | Logged Fields |
|---|---|
| View detection | user_id, detection_id, timestamp, src_ip |
| Search plates | user_id, search_query, result_count, timestamp |
| Export data | user_id, export_type, filter_criteria, record_count |
| View image | user_id, detection_id, image_type, timestamp |
| Generate report | user_id, report_type, parameters, timestamp |
Audit Log Schema¶
CREATE TABLE access_audit_log (
id BIGSERIAL PRIMARY KEY,
accessed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
user_id UUID NOT NULL,
username VARCHAR(255) NOT NULL,
action VARCHAR(50) NOT NULL,
resource_type VARCHAR(100) NOT NULL,
resource_id VARCHAR(255),
query_params JSONB,
result_count INTEGER,
src_ip VARCHAR(45) NOT NULL,
user_agent TEXT,
request_id UUID
);
-- Indexes for audit queries
CREATE INDEX idx_access_audit_user ON access_audit_log (user_id, accessed_at DESC);
CREATE INDEX idx_access_audit_resource ON access_audit_log (resource_type, resource_id);
CREATE INDEX idx_access_audit_time ON access_audit_log (accessed_at DESC);
Audit Log Protection¶
Access audit log is append-only with no deletion:
CREATE TRIGGER access_audit_immutable
BEFORE UPDATE OR DELETE ON access_audit_log
FOR EACH ROW
EXECUTE FUNCTION prevent_modification(); -- Always raises exception
Legal Hold¶
Purpose¶
Prevent deletion of evidence relevant to active investigations.
Design Decision: Junction Table¶
Problem: The detections table is WORM-protected (no updates allowed), but legal holds need to be set and released dynamically.
Solution: Use a separate legal_holds junction table instead of columns on the detections table. This:
- Maintains true WORM immutability on the detections table
- Allows legal holds to be set/released via INSERT (never UPDATE)
- Provides complete audit trail of all hold actions
- Supports multiple holds per detection (different cases)
Schema¶
-- Legal holds junction table (not a column on events)
CREATE TABLE legal_holds (
id BIGSERIAL PRIMARY KEY,
case_id VARCHAR(100) NOT NULL,
detection_id BIGINT NOT NULL REFERENCES events(id),
reason TEXT,
-- Set information
set_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
set_by UUID NOT NULL REFERENCES users(id),
hold_until TIMESTAMPTZ, -- Optional explicit expiration
-- Release information (NULL = still active)
released_at TIMESTAMPTZ,
released_by UUID REFERENCES users(id),
release_reason TEXT,
-- Constraints
CONSTRAINT unique_active_hold UNIQUE (case_id, detection_id)
);
-- Indexes for efficient lookups
CREATE INDEX idx_legal_holds_event ON legal_holds (detection_id) WHERE released_at IS NULL;
CREATE INDEX idx_legal_holds_case ON legal_holds (case_id);
CREATE INDEX idx_legal_holds_active ON legal_holds (set_at) WHERE released_at IS NULL;
Setting a Legal Hold (INSERT, never UPDATE)¶
-- Set legal hold on matching events
INSERT INTO legal_holds (case_id, detection_id, reason, set_by, hold_until)
SELECT
'CASE-2025-001234',
id,
'Suspected vehicle in robbery investigation',
'usr_admin_001'::uuid,
'2026-01-31'::timestamptz -- Optional: explicit hold expiration
FROM events
WHERE plate_number = 'ABC123'
AND captured_at BETWEEN '2025-01-01' AND '2025-01-31'
ON CONFLICT (case_id, detection_id) DO NOTHING; -- Idempotent
Releasing a Legal Hold (INSERT release record, never UPDATE original)¶
-- Release hold by updating release fields
-- Note: This UPDATE is on legal_holds table, NOT detections table
UPDATE legal_holds
SET released_at = NOW(),
released_by = 'usr_admin_001'::uuid,
release_reason = 'Case closed - no charges filed'
WHERE case_id = 'CASE-2025-001234'
AND released_at IS NULL;
Checking Legal Hold Status¶
-- Check if detection is under active legal hold
SELECT EXISTS(
SELECT 1 FROM legal_holds
WHERE detection_id = 12345
AND released_at IS NULL
AND (hold_until IS NULL OR hold_until > NOW())
) AS has_active_hold;
-- List all active holds for a detection
SELECT case_id, reason, set_at, set_by, hold_until
FROM legal_holds
WHERE detection_id = 12345
AND released_at IS NULL
AND (hold_until IS NULL OR hold_until > NOW());
Legal Hold Immutability¶
The legal_holds table itself is append-only for set records:
-- Protect legal_holds from unauthorized modification
CREATE OR REPLACE FUNCTION enforce_legal_holds_immutability()
RETURNS TRIGGER AS $$
BEGIN
IF TG_OP = 'DELETE' THEN
RAISE EXCEPTION 'Legal hold records cannot be deleted. Release the hold instead.';
END IF;
IF TG_OP = 'UPDATE' THEN
-- Only allow setting release fields on unreleased holds
IF OLD.released_at IS NOT NULL THEN
RAISE EXCEPTION 'Released legal holds cannot be modified.';
END IF;
-- Only allow setting release fields, nothing else
IF NEW.case_id != OLD.case_id OR
NEW.detection_id != OLD.detection_id OR
NEW.reason IS DISTINCT FROM OLD.reason OR
NEW.set_at != OLD.set_at OR
NEW.set_by != OLD.set_by OR
NEW.hold_until IS DISTINCT FROM OLD.hold_until THEN
RAISE EXCEPTION 'Only release fields can be modified on legal holds.';
END IF;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER legal_holds_protection
BEFORE UPDATE OR DELETE ON legal_holds
FOR EACH ROW
EXECUTE FUNCTION enforce_legal_holds_immutability();
API Endpoints¶
POST /api/v1/legal-holds # Set hold (admin only)
POST /api/v1/legal-holds/{id}/release # Release hold (admin only)
GET /api/v1/legal-holds # List active holds
GET /api/v1/legal-holds/{id} # Get hold details with affected detections
GET /api/v1/detections/{id}/legal-holds # Get holds for specific detection
Automatic Expiration¶
Optional background job to auto-release expired holds:
async def release_expired_holds():
"""Release legal holds past their hold_until date."""
await db.execute("""
UPDATE legal_holds
SET released_at = NOW(),
released_by = NULL, -- System release
release_reason = 'Automatic: hold_until date passed'
WHERE released_at IS NULL
AND hold_until IS NOT NULL
AND hold_until < NOW()
""")
Evidence Export¶
Export Package¶
When exporting evidence for law enforcement:
@dataclass
class EvidencePackage:
export_id: str # Unique export identifier
exported_at: datetime # Export timestamp
exported_by: str # User who exported
case_reference: str # External case number
detections: List[Detection] # Detection records
images: List[ImageFile] # Image files
integrity_manifest: dict # Hashes of all contents
chain_of_custody: List[dict] # Access history
Integrity Manifest¶
{
"export_id": "exp_20250115_abc123",
"created_at": "2025-01-15T12:34:56.789Z",
"created_by": "usr_admin_001",
"algorithm": "SHA-256",
"files": [
{"path": "events.json", "hash": "abc123..."},
{"path": "images/evt_001_full.jpg", "hash": "def456..."},
{"path": "images/evt_001_crop.jpg", "hash": "ghi789..."}
],
"package_hash": "xyz987..."
}
Export Audit¶
Every export logged with full details:
CREATE TABLE evidence_export_log (
id BIGSERIAL PRIMARY KEY,
export_id VARCHAR(100) UNIQUE NOT NULL,
exported_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
exported_by UUID NOT NULL,
case_reference VARCHAR(100),
event_count INTEGER NOT NULL,
detection_ids BIGINT[] NOT NULL,
date_range_start TIMESTAMPTZ,
date_range_end TIMESTAMPTZ,
filter_criteria JSONB,
package_hash CHAR(64) NOT NULL,
recipient TEXT,
notes TEXT
);
Integrity Monitoring¶
Automated Checks¶
| Check | Frequency | Action on Failure |
|---|---|---|
| Hash chain verification | Daily | Alert + investigate |
| Image hash spot-check | Daily (sample) | Alert + investigate |
| Trigger presence | Hourly | Alert + restore |
| Permission audit | Daily | Alert + remediate |
Monitoring Queries¶
-- Check for missing triggers
SELECT tgname FROM pg_trigger
WHERE tgrelid = 'events'::regclass
AND tgname = 'events_worm_protection';
-- Check for unexpected permissions
SELECT grantee, privilege_type
FROM information_schema.table_privileges
WHERE table_name = 'events'
AND privilege_type IN ('UPDATE', 'DELETE');
-- Check for broken hash chain (sample)
SELECT e1.id, e1.record_hash, e2.prev_record_hash
FROM events e1
JOIN events e2 ON e2.id = e1.id + 1
WHERE e1.record_hash != e2.prev_record_hash
LIMIT 10;
Summary¶
| Control | Layer | Purpose |
|---|---|---|
| No UPDATE/DELETE API | Application | Prevent accidental modification |
| Role permissions | Database | Restrict capabilities |
| WORM triggers | Database | Enforce immutability |
| Image hashing | Application | Detect tampering |
| Record hash chain | Database | Cryptographic integrity |
| Access audit log | Application | Track who viewed what |
| Legal hold (junction table) | Database | Preserve evidence without breaking WORM |
| Evidence export | Application | Chain of custody |
Decision Date: 2025-12-29 Status: Approved Rationale: Evidence integrity is critical for law enforcement use cases; multi-layer protection provides defense in depth