Passa al contenuto principale

MFA, Recovery Codes & Account Policies — Akira

Riferimento documento: Akira v5.2, cap. 5.2 (Auth & MFA) Versione: 1.0 — 2026-05-13 Owner: Security Engineering

1. Sintesi

Akira implementa autenticazione a più fattori con TOTP standard, codici di recovery hashati Argon2id, lockout progressivo per fail di login (user + IP), sessioni con idle/absolute timeout, password policy NIST-aligned e alerting di sicurezza via Telegram/email.

2. Recovery codes

2.1 Generazione

Alla prima enrollment TOTP MFA per un utente, il backend genera 10 recovery codes monouso:

  • Formato: 8 caratteri alphanumerici raggruppati 4+4 (es. K3M9-PX7Q).
  • Case-insensitive in input (normalizzati upper prima dell'hash compare).
  • Charset: ABCDEFGHJKLMNPQRSTUVWXYZ23456789 (no 0/O, 1/I/L per leggibilità).
  • Sorgente entropia: secrets.token_bytes (cryptographically secure).
import secrets
ALPHABET = "ABCDEFGHJKLMNPQRSTUVWXYZ23456789"

def gen_recovery_code() -> str:
raw = "".join(secrets.choice(ALPHABET) for _ in range(8))
return f"{raw[:4]}-{raw[4:]}"

2.2 Storage (Argon2id)

CREATE TABLE recovery_codes (
id BIGSERIAL PRIMARY KEY,
user_account_id BIGINT NOT NULL REFERENCES user_accounts(id) ON DELETE CASCADE,
code_hash_argon2id TEXT NOT NULL,
hash_algo VARCHAR(16) NOT NULL DEFAULT 'argon2id',
used_at TIMESTAMPTZ NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
used_from_ip INET NULL
);

CREATE INDEX idx_recovery_user_unused
ON recovery_codes(user_account_id) WHERE used_at IS NULL;

Hash:

from argon2 import PasswordHasher

hasher = PasswordHasher(
time_cost=3, # iterazioni
memory_cost=65536, # 64 MiB
parallelism=4,
hash_len=32,
salt_len=16,
)

def hash_code(plain: str) -> str:
return hasher.hash(plain.upper())

def verify_code(plain: str, hashed: str) -> bool:
try:
return hasher.verify(hashed, plain.upper())
except Exception:
return False

2.3 UI flow

  • Mostrati una sola volta dopo l'enrollment, con:
    • Download .txt (akira-recovery-codes-<user>-<yyyymmdd>.txt)
    • Bottone "Copia tutti"
    • Stampa fisica consigliata
  • Avviso: "Conserva in modo sicuro. Non potrai vederli di nuovo. Ogni codice è utilizzabile una sola volta."
  • Nessuna API espone i codici in chiaro dopo la generazione.

2.4 Login con recovery code

1. Utente: username + password → ok
2. Backend: chiede TOTP
3. Utente: link "Use recovery code" → input "K3M9-PX7Q"
4. Backend: per ogni recovery code non usato del user_id → verify(code)
5. Match: UPDATE recovery_codes SET used_at=now(), used_from_ip=:ip WHERE id=:id
6. Login completa.
7. Notifica: email + Telegram all'utente "Recovery code utilizzato da IP X
(geoip: Y) alle Z. Se non sei stato tu, contatta admin."

2.5 Re-enroll forzato

Dopo 5 codici usati (su 10 totali), al successivo login il backend forza:

  • generazione 10 nuovi codici;
  • invalidazione di quelli rimasti;
  • UI: "Hai usato 5 dei tuoi recovery code. Ne stiamo generando 10 nuovi. Salvali subito."
-- Check soglia: chiamato dopo ogni login
SELECT COUNT(*) FILTER (WHERE used_at IS NOT NULL) AS used,
COUNT(*) AS total
FROM recovery_codes
WHERE user_account_id = :uid;
-- Se used >= 5 → trigger re-enroll

3. Account lock policy

3.1 Soglie

TriggerAction
5 fail consecutive in 15 min sullo stesso user_account_idLock account
10 fail in 5 min sulla stessa source_ip (qualsiasi user)Lock IP per 1h

3.2 Durata lock (exponential per user)

Lock streakDurata
15 min
30 min
60 min
4°+24 h

Streak si resetta a 0 dopo 7 giorni senza ulteriori lock.

3.3 Schema DB

ALTER TABLE user_accounts
ADD COLUMN failed_login_count INT DEFAULT 0,
ADD COLUMN locked_until TIMESTAMPTZ NULL,
ADD COLUMN lock_streak INT DEFAULT 0,
ADD COLUMN last_lock_at TIMESTAMPTZ NULL;

CREATE TABLE login_attempts (
id BIGSERIAL PRIMARY KEY,
user_account_id BIGINT REFERENCES user_accounts(id),
source_ip INET,
success BOOLEAN NOT NULL,
failure_reason VARCHAR(64), -- 'bad_password','no_user','expired','locked','mfa_failed','recovery_failed'
user_agent TEXT,
attempted_at TIMESTAMPTZ NOT NULL DEFAULT now()
);

CREATE INDEX idx_login_attempts_ip_ts ON login_attempts(source_ip, attempted_at DESC);
CREATE INDEX idx_login_attempts_user_ts ON login_attempts(user_account_id, attempted_at DESC);

-- IP-level lock (separato dal user lock)
CREATE TABLE ip_locks (
source_ip INET PRIMARY KEY,
locked_until TIMESTAMPTZ NOT NULL,
reason VARCHAR(64) NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);

3.4 Logica check (pseudo Python)

async def check_login(username: str, password: str, ip: str) -> LoginResult:
if await is_ip_locked(ip):
await record_attempt(None, ip, success=False, reason="ip_locked")
raise LoginError("ip_locked")

user = await find_user(username)
if user and user.locked_until and user.locked_until > now():
await record_attempt(user.id, ip, success=False, reason="locked")
raise LoginError("account_locked", retry_at=user.locked_until)

if not user or not verify_password(password, user.password_hash):
await record_attempt(user.id if user else None, ip, success=False,
reason="bad_password" if user else "no_user")
if user:
await bump_user_failures(user.id)
await maybe_lock_ip(ip)
raise LoginError("invalid_credentials")

# password ok → reset user counter, prosegui con MFA
await reset_user_failures(user.id)
return LoginResult(user_id=user.id, mfa_required=True)
async def bump_user_failures(user_id: int):
# Conta fail negli ultimi 15 min
fails = await db.scalar("""
SELECT COUNT(*) FROM login_attempts
WHERE user_account_id = :uid
AND success = false
AND attempted_at > now() - interval '15 minutes'
""", {"uid": user_id})
if fails >= 5:
streak = await compute_streak(user_id)
duration = [15, 30, 60, 1440][min(streak, 3)] # minuti
await db.execute("""
UPDATE user_accounts SET
locked_until = now() + (:dur || ' minutes')::interval,
lock_streak = :streak,
last_lock_at = now()
WHERE id = :uid
""", {"uid": user_id, "dur": duration, "streak": streak + 1})
await notify_lock(user_id)
async def maybe_lock_ip(ip: str):
fails = await db.scalar("""
SELECT COUNT(*) FROM login_attempts
WHERE source_ip = :ip
AND success = false
AND attempted_at > now() - interval '5 minutes'
""", {"ip": ip})
if fails >= 10:
await db.execute("""
INSERT INTO ip_locks(source_ip, locked_until, reason)
VALUES (:ip, now() + interval '1 hour', 'brute_force')
ON CONFLICT (source_ip)
DO UPDATE SET locked_until = EXCLUDED.locked_until
""", {"ip": ip})

3.5 Unlock manuale

Admin con system.users.manage può forzare unlock via API:

POST /api/admin/users/{id}/unlock
→ UPDATE user_accounts SET locked_until=NULL, failed_login_count=0 WHERE id=:id
→ audit log obbligatorio (motivo richiesto in body)

4. Session timeout

4.1 Regole

  • Idle timeout: 30 minuti senza API call.
    • Ogni API call rinnova users_sessions.last_activity_at.
    • Refresh token rinnova l'access token finché entro la finestra idle.
  • Absolute timeout: 8 ore dal login, indipendente dall'attività.
  • Logout forzato: se users_sessions.invalidated_at IS NOT NULL (revocato manualmente da admin, password change, MFA reset).

4.2 Schema

CREATE TABLE users_sessions (
id BIGSERIAL PRIMARY KEY,
user_account_id BIGINT NOT NULL REFERENCES user_accounts(id),
session_token UUID NOT NULL UNIQUE,
refresh_token UUID NOT NULL UNIQUE,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
last_activity_at TIMESTAMPTZ NOT NULL DEFAULT now(),
expires_at TIMESTAMPTZ NOT NULL, -- absolute: created_at + 8h
invalidated_at TIMESTAMPTZ NULL,
source_ip INET,
user_agent TEXT
);
CREATE INDEX idx_users_sessions_user ON users_sessions(user_account_id) WHERE invalidated_at IS NULL;

4.3 Middleware FastAPI

async def session_guard(request: Request):
sess = await load_session(request.headers["Authorization"])
if not sess or sess.invalidated_at:
raise HTTPException(401, "session_invalid")
if sess.expires_at < now():
raise HTTPException(401, "session_absolute_expired")
if (now() - sess.last_activity_at).total_seconds() > 1800:
raise HTTPException(401, "session_idle_expired")
await db.execute(
"UPDATE users_sessions SET last_activity_at = now() WHERE id = :id",
{"id": sess.id},
)
return sess

5. Password policy

Allineata a NIST SP 800-63B (no rotation forzata).

RequisitoValore
Lunghezza minima12 caratteri
Lunghezza massima128 caratteri
Charsetqualsiasi (compresi spazi e Unicode)
Complessità impostanessuna (no "almeno 1 maiuscola/numero/simbolo")
HashArgon2id, memory_cost=64 MiB, time_cost=3, parallelism=4
Check breachedPwned Passwords k-anonymity API (ON di default)
RotationNON forzata; solo su compromissione sospetta
Re-use checkultime 5 password hash conservate, no riuso

5.1 Configurazione Argon2id

from argon2 import PasswordHasher
password_hasher = PasswordHasher(
time_cost=3,
memory_cost=65536, # 64 MiB
parallelism=4,
hash_len=32,
salt_len=16,
)

5.2 Pwned Passwords check

import hashlib, httpx

async def is_pwned(password: str) -> bool:
sha1 = hashlib.sha1(password.encode()).hexdigest().upper()
prefix, suffix = sha1[:5], sha1[5:]
async with httpx.AsyncClient(timeout=2.5) as c:
r = await c.get(f"https://api.pwnedpasswords.com/range/{prefix}")
if r.status_code != 200:
return False # fail-open: API down non blocca il login
return any(line.split(":")[0] == suffix for line in r.text.splitlines())

Setting SECURITY_PWNED_CHECK_ENABLED=true (default).

6. MFA TOTP

ParametroValore
IssuerAkira-staging.asheep.it (env MFA_ISSUER)
AlgoritmoSHA-1 (compat. Google Authenticator / Authy / 1Password)
Digits6
Period30 s
Window±1 step (60 s totali di tolleranza)
Backup codes10 recovery codes Argon2id (sezione 2)
Hardware MFAWebAuthn/YubiKey → roadmap v2 (via Authlib)

6.1 Enrollment

import pyotp

secret = pyotp.random_base32()
uri = pyotp.totp.TOTP(secret).provisioning_uri(
name=user.email,
issuer_name=settings.MFA_ISSUER,
)
# user_accounts.mfa_secret_encrypted = encrypt(secret, kms_key)

QR code mostrato in UI, secret salvato cifrato a riposo (KMS o pgcrypto).

6.2 Verifica

totp = pyotp.TOTP(decrypt(user.mfa_secret_encrypted))
ok = totp.verify(code, valid_window=1)

6.3 Reset MFA

  • Self-service: con recovery code valido.
  • Admin: con system.users.manage + MFA re-challenge admin. Genera notifica Telegram + email all'utente target.

7. Alert security

EventoNotifica
Login da nuova IP (geoip MaxMind Lite)email user + Telegram admin
Login da nuovo paeseemail user + Telegram admin
3+ injection attempt AgentCore in 1 hTelegram admin
Account lockemail user + Telegram admin
IP lockTelegram admin
Recovery code usatoemail user (per conferma "sono stato io")
MFA resetemail user + Telegram admin
Password changeemail user
Admin unlock di un altro utenteTelegram admin (audit visibile)

7.1 Detection nuova IP

async def is_new_ip_for_user(user_id: int, ip: str) -> bool:
seen = await db.scalar("""
SELECT 1 FROM login_attempts
WHERE user_account_id = :uid AND source_ip = :ip AND success = true
LIMIT 1
""", {"uid": user_id, "ip": ip})
return seen is None

7.2 Geoip lookup

import geoip2.database
reader = geoip2.database.Reader("/var/lib/geoip/GeoLite2-City.mmdb")

def lookup(ip: str) -> dict:
try:
r = reader.city(ip)
return {"country": r.country.iso_code, "city": r.city.name}
except Exception:
return {"country": None, "city": None}

8. Configurazione di default (env)

MFA_ISSUER=Akira-staging.asheep.it
SECURITY_PWNED_CHECK_ENABLED=true
SESSION_IDLE_TIMEOUT_MIN=30
SESSION_ABSOLUTE_TIMEOUT_HOURS=8
LOCK_USER_THRESHOLD=5
LOCK_USER_WINDOW_MIN=15
LOCK_IP_THRESHOLD=10
LOCK_IP_WINDOW_MIN=5
LOCK_IP_DURATION_MIN=60
RECOVERY_CODES_COUNT=10
RECOVERY_REENROLL_AFTER_USED=5

9. Checklist deploy security

  • recovery_codes, login_attempts, ip_locks, users_sessions create.
  • Argon2id parametri verificati su VPS target (memory_cost compatibile RAM).
  • GeoLite2-City.mmdb aggiornato (cron settimanale geoipupdate).
  • Pwned Passwords API raggiungibile, timeout 2.5 s.
  • Telegram bot chat_id_admin configurato in settings.telegram.config.
  • Email SMTP noreply@asheep.it testata per notifiche security.
  • Test E2E: lock dopo 5 fail, exponential su 2° lock, IP lock su 10 fail multi-user.
  • Test recovery code: enrollment, login con code, marcatura used_at, re-enroll su 5 usati.
  • Test session: idle 30m, absolute 8h, admin revoke.