This document describes the two-factor authentication architecture for the Chaverim ALPR Platform. 2FA adds a critical security layer by requiring users to verify their identity with both something they know (password) and something they have (authenticator app).
Document Status: Approved
Last Updated: 2025-12-29
sequenceDiagram
participant U as User
participant B as Browser
participant A as FastAPI
participant D as Database
U->>B: Enter email + password
B->>A: POST /auth/login
A->>D: Verify credentials (Argon2id)
alt Invalid credentials
A->>B: 401 Unauthorized
else Valid credentials
A->>D: Check 2FA status
alt 2FA not enrolled
A->>B: 302 Redirect to /auth/2fa/setup
Note over U,B: User must enroll before access
else 2FA enrolled
A->>B: 200 + partial_token (short-lived)
B->>U: Show TOTP input form
U->>B: Enter 6-digit code
B->>A: POST /auth/2fa/verify
A->>D: Validate TOTP code
alt Valid code
A->>D: Log successful auth
A->>B: Set JWT cookie + 302 to dashboard
else Invalid code
A->>D: Log failed attempt
A->>B: 401 Invalid code
end
end
end
sequenceDiagram
participant U as User
participant B as Browser
participant A as FastAPI
participant D as Database
Note over U,D: User authenticated with password, needs 2FA setup
U->>B: Navigate to /auth/2fa/setup
B->>A: GET /auth/2fa/setup
A->>A: Generate TOTP secret
A->>D: Store encrypted secret (pending)
A->>B: Return QR code + manual key
U->>U: Scan QR with authenticator app
U->>B: Enter verification code
B->>A: POST /auth/2fa/setup/verify
A->>D: Validate TOTP code
alt Valid code
A->>D: Mark 2FA as active
A->>A: Generate 10 backup codes
A->>D: Store hashed backup codes
A->>B: Display backup codes (one time only)
U->>U: Save backup codes securely
B->>A: POST /auth/2fa/setup/confirm
A->>B: 302 Redirect to dashboard
else Invalid code
A->>B: 400 Invalid code, retry
end
sequenceDiagram
participant U as User
participant B as Browser
participant A as FastAPI
participant D as Database
U->>B: Click "Lost access to authenticator?"
B->>A: GET /auth/2fa/recovery
A->>B: Show backup code input
U->>B: Enter backup code
B->>A: POST /auth/2fa/recovery
A->>D: Validate + invalidate backup code
alt Valid backup code
A->>D: Log recovery event
A->>D: Disable current 2FA
A->>B: 302 Redirect to /auth/2fa/setup
Note over U,B: User must set up new 2FA immediately
else Invalid code
A->>D: Log failed attempt
A->>B: 401 Invalid backup code
end
-- Add 2FA status tracking to users tableALTERTABLEauth.usersADDCOLUMNtwofa_requiredBOOLEANNOTNULLDEFAULTTRUE;ALTERTABLEauth.usersADDCOLUMNtwofa_enrolledBOOLEANNOTNULLDEFAULTFALSE;ALTERTABLEauth.usersADDCOLUMNtwofa_enrolled_atTIMESTAMPTZ;
classLoginResponse(BaseModel):"""Response when password is valid but 2FA needed."""requires_2fa:bool=Truepartial_token:str# Short-lived token for 2FA stepexpires_in:int# Seconds until partial token expires (300)classPartialToken(BaseModel):"""JWT claims for partial authentication."""sub:str# User IDtype:Literal["partial"]exp:datetime# 5 minutes from issueiat:datetime
classTwoFASetupResponse(BaseModel):"""Response with QR code for authenticator app."""qr_code_uri:str# data:image/png;base64,...manual_entry_key:str# Base32 encoded secret for manual entryissuer:str# "Chaverim ALPR"account_name:str# User's emailclassBackupCodesResponse(BaseModel):"""One-time display of backup codes after enrollment."""codes:list[str]# 10 plaintext codes (display once only)warning:str# "Save these codes securely..."
# src/services/totp_service.pyimportpyotpimportqrcodeimportioimportbase64importhashlibimportsecretsfromcryptography.fernetimportFernetfromdatetimeimportdatetime,timedeltaclassTOTPService:"""TOTP generation and verification service."""ISSUER="Chaverim ALPR"DIGITS=6INTERVAL=30# secondsVALID_WINDOW=1# Allow 1 interval before/after for clock skewdef__init__(self,encryption_key:bytes):self.fernet=Fernet(encryption_key)defgenerate_secret(self)->tuple[str,bytes,bytes]:""" Generate a new TOTP secret. Returns: (plaintext_secret, encrypted_secret, salt) """secret=pyotp.random_base32()salt=secrets.token_bytes(16)encrypted=self._encrypt_secret(secret,salt)returnsecret,encrypted,saltdef_encrypt_secret(self,secret:str,salt:bytes)->bytes:"""Encrypt TOTP secret with user-specific salt."""salted=salt+secret.encode()returnself.fernet.encrypt(salted)def_decrypt_secret(self,encrypted:bytes,salt:bytes)->str:"""Decrypt TOTP secret."""decrypted=self.fernet.decrypt(encrypted)returndecrypted[len(salt):].decode()defgenerate_qr_code(self,secret:str,email:str)->str:""" Generate QR code as base64 data URI. """totp=pyotp.TOTP(secret)uri=totp.provisioning_uri(name=email,issuer_name=self.ISSUER)qr=qrcode.QRCode(version=1,box_size=10,border=5)qr.add_data(uri)qr.make(fit=True)img=qr.make_image(fill_color="black",back_color="white")buffer=io.BytesIO()img.save(buffer,format="PNG")b64=base64.b64encode(buffer.getvalue()).decode()returnf"data:image/png;base64,{b64}"defverify_code(self,code:str,encrypted_secret:bytes,salt:bytes,last_used_code:str|None=None)->bool:""" Verify a TOTP code. Args: code: 6-digit code from authenticator encrypted_secret: Encrypted TOTP secret salt: User's salt last_used_code: Last successfully used code (replay protection) Returns: True if valid, False otherwise """# Replay protectioniflast_used_codeandcode==last_used_code:returnFalsesecret=self._decrypt_secret(encrypted_secret,salt)totp=pyotp.TOTP(secret)# Verify with window for clock skewreturntotp.verify(code,valid_window=self.VALID_WINDOW)defgenerate_backup_codes(self,count:int=10)->list[tuple[str,str]]:""" Generate backup codes. Returns: List of (plaintext_code, sha256_hash) tuples """codes=[]for_inrange(count):# Format: XXXX-XXXX (8 chars, easy to read)code=f"{secrets.token_hex(2).upper()}-{secrets.token_hex(2).upper()}"code_hash=hashlib.sha256(code.encode()).hexdigest()codes.append((code,code_hash))returncodesdefverify_backup_code(self,code:str,stored_hash:str)->bool:"""Verify a backup code against stored hash."""code_hash=hashlib.sha256(code.upper().replace("-","").encode()).hexdigest()returnsecrets.compare_digest(code_hash,stored_hash)
# src/services/rate_limiter.pyfromdatetimeimportdatetime,timedeltafromsqlalchemyimportselect,funcfromsqlalchemy.ext.asyncioimportAsyncSessionclassTwoFARateLimiter:"""Rate limiting for 2FA attempts."""MAX_ATTEMPTS=5WINDOW_MINUTES=5LOCKOUT_MINUTES=15asyncdefcheck_rate_limit(self,db:AsyncSession,user_id:str)->tuple[bool,int|None]:""" Check if user is rate limited. Returns: (is_allowed, seconds_until_unlock or None) """window_start=datetime.utcnow()-timedelta(minutes=self.WINDOW_MINUTES)# Count failed attempts in windowresult=awaitdb.execute(select(func.count()).select_from(TwoFAAttempt).where(TwoFAAttempt.user_id==user_id).where(TwoFAAttempt.success==False).where(TwoFAAttempt.created_at>=window_start))failed_count=result.scalar()iffailed_count>=self.MAX_ATTEMPTS:# Calculate lockout remaininglockout_end=window_start+timedelta(minutes=self.WINDOW_MINUTES+self.LOCKOUT_MINUTES)remaining=(lockout_end-datetime.utcnow()).total_seconds()returnFalse,max(0,int(remaining))returnTrue,Noneasyncdefrecord_attempt(self,db:AsyncSession,user_id:str,attempt_type:str,success:bool,ip_address:str|None=None,user_agent:str|None=None,failure_reason:str|None=None):"""Record a 2FA attempt for rate limiting and audit."""attempt=TwoFAAttempt(user_id=user_id,attempt_type=attempt_type,success=success,ip_address=ip_address,user_agent=user_agent,failure_reason=failure_reason)db.add(attempt)awaitdb.commit()
# src/auth/dependencies.pyfromfastapiimportDepends,HTTPException,status,Requestfromjoseimportjwt,JWTErrorasyncdefget_current_user(request:Request,db:AsyncSession=Depends(get_db))->User:""" Validate JWT and ensure 2FA is complete. """token=request.cookies.get("access_token")ifnottoken:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not authenticated")try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])token_type=payload.get("type")# Reject partial tokens (2FA not completed)iftoken_type=="partial":raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="2FA verification required",headers={"X-2FA-Required":"true"})user_id=payload.get("sub")ifnotuser_id:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Invalid token")exceptJWTError:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Invalid token")user=awaitget_user_by_id(db,user_id)ifnotuserornotuser.is_active:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="User not found or inactive")# Ensure 2FA is enrolled (mandatory for all users)ifnotuser.twofa_enrolled:raiseHTTPException(status_code=status.HTTP_403_FORBIDDEN,detail="2FA enrollment required",headers={"X-2FA-Setup-Required":"true"})returnuser
┌─────────────────────────────────────────────────────────────┐
│ Save Your Backup Codes │
│─────────────────────────────────────────────────────────────│
│ │
│ ⚠️ IMPORTANT: Save these codes in a safe place! │
│ │
│ If you lose access to your authenticator app, you can │
│ use one of these codes to recover your account. │
│ │
│ Each code can only be used once. │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ A7B2-C8D9 E4F5-G6H7 J8K9-L0M1 │ │
│ │ N2P3-Q4R5 S6T7-U8V9 W0X1-Y2Z3 │ │
│ │ A1B2-C3D4 E5F6-G7H8 J9K0-L1M2 │ │
│ │ N3P4-Q5R6 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ [ Copy to Clipboard ] [ Download as File ] │
│ │
│ ☑️ I have saved my backup codes │
│ │
│ [ Continue to Dashboard ] │
│ │
└─────────────────────────────────────────────────────────────┘