MFA, Recovery Codes & Account Policies — Akira
Riferimento documento: Akira v5.2, cap. 5.2 (Auth & MFA) Versione: 1.0 — 2026-05-13 Owner: Security Engineering
1. Sintesi
Akira implementa autenticazione a più fattori con TOTP standard, codici di recovery hashati Argon2id, lockout progressivo per fail di login (user + IP), sessioni con idle/absolute timeout, password policy NIST-aligned e alerting di sicurezza via Telegram/email.
2. Recovery codes
2.1 Generazione
Alla prima enrollment TOTP MFA per un utente, il backend genera 10 recovery codes monouso:
- Formato: 8 caratteri alphanumerici raggruppati 4+4 (es.
K3M9-PX7Q). - Case-insensitive in input (normalizzati upper prima dell'hash compare).
- Charset:
ABCDEFGHJKLMNPQRSTUVWXYZ23456789(no0/O,1/I/Lper leggibilità). - Sorgente entropia:
secrets.token_bytes(cryptographically secure).
import secrets
ALPHABET = "ABCDEFGHJKLMNPQRSTUVWXYZ23456789"
def gen_recovery_code() -> str:
raw = "".join(secrets.choice(ALPHABET) for _ in range(8))
return f"{raw[:4]}-{raw[4:]}"
2.2 Storage (Argon2id)
CREATE TABLE recovery_codes (
id BIGSERIAL PRIMARY KEY,
user_account_id BIGINT NOT NULL REFERENCES user_accounts(id) ON DELETE CASCADE,
code_hash_argon2id TEXT NOT NULL,
hash_algo VARCHAR(16) NOT NULL DEFAULT 'argon2id',
used_at TIMESTAMPTZ NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
used_from_ip INET NULL
);
CREATE INDEX idx_recovery_user_unused
ON recovery_codes(user_account_id) WHERE used_at IS NULL;
Hash:
from argon2 import PasswordHasher
hasher = PasswordHasher(
time_cost=3, # iterazioni
memory_cost=65536, # 64 MiB
parallelism=4,
hash_len=32,
salt_len=16,
)
def hash_code(plain: str) -> str:
return hasher.hash(plain.upper())
def verify_code(plain: str, hashed: str) -> bool:
try:
return hasher.verify(hashed, plain.upper())
except Exception:
return False
2.3 UI flow
- Mostrati una sola volta dopo l'enrollment, con:
- Download
.txt(akira-recovery-codes-<user>-<yyyymmdd>.txt) - Bottone "Copia tutti"
- Stampa fisica consigliata
- Download
- Avviso: "Conserva in modo sicuro. Non potrai vederli di nuovo. Ogni codice è utilizzabile una sola volta."
- Nessuna API espone i codici in chiaro dopo la generazione.
2.4 Login con recovery code
1. Utente: username + password → ok
2. Backend: chiede TOTP
3. Utente: link "Use recovery code" → input "K3M9-PX7Q"
4. Backend: per ogni recovery code non usato del user_id → verify(code)
5. Match: UPDATE recovery_codes SET used_at=now(), used_from_ip=:ip WHERE id=:id
6. Login completa.
7. Notifica: email + Telegram all'utente "Recovery code utilizzato da IP X
(geoip: Y) alle Z. Se non sei stato tu, contatta admin."
2.5 Re-enroll forzato
Dopo 5 codici usati (su 10 totali), al successivo login il backend forza:
- generazione 10 nuovi codici;
- invalidazione di quelli rimasti;
- UI: "Hai usato 5 dei tuoi recovery code. Ne stiamo generando 10 nuovi. Salvali subito."
-- Check soglia: chiamato dopo ogni login
SELECT COUNT(*) FILTER (WHERE used_at IS NOT NULL) AS used,
COUNT(*) AS total
FROM recovery_codes
WHERE user_account_id = :uid;
-- Se used >= 5 → trigger re-enroll
3. Account lock policy
3.1 Soglie
| Trigger | Action |
|---|---|
5 fail consecutive in 15 min sullo stesso user_account_id | Lock account |
10 fail in 5 min sulla stessa source_ip (qualsiasi user) | Lock IP per 1h |
3.2 Durata lock (exponential per user)
| Lock streak | Durata |
|---|---|
| 1° | 15 min |
| 2° | 30 min |
| 3° | 60 min |
| 4°+ | 24 h |
Streak si resetta a 0 dopo 7 giorni senza ulteriori lock.
3.3 Schema DB
ALTER TABLE user_accounts
ADD COLUMN failed_login_count INT DEFAULT 0,
ADD COLUMN locked_until TIMESTAMPTZ NULL,
ADD COLUMN lock_streak INT DEFAULT 0,
ADD COLUMN last_lock_at TIMESTAMPTZ NULL;
CREATE TABLE login_attempts (
id BIGSERIAL PRIMARY KEY,
user_account_id BIGINT REFERENCES user_accounts(id),
source_ip INET,
success BOOLEAN NOT NULL,
failure_reason VARCHAR(64), -- 'bad_password','no_user','expired','locked','mfa_failed','recovery_failed'
user_agent TEXT,
attempted_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX idx_login_attempts_ip_ts ON login_attempts(source_ip, attempted_at DESC);
CREATE INDEX idx_login_attempts_user_ts ON login_attempts(user_account_id, attempted_at DESC);
-- IP-level lock (separato dal user lock)
CREATE TABLE ip_locks (
source_ip INET PRIMARY KEY,
locked_until TIMESTAMPTZ NOT NULL,
reason VARCHAR(64) NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
3.4 Logica check (pseudo Python)
async def check_login(username: str, password: str, ip: str) -> LoginResult:
if await is_ip_locked(ip):
await record_attempt(None, ip, success=False, reason="ip_locked")
raise LoginError("ip_locked")
user = await find_user(username)
if user and user.locked_until and user.locked_until > now():
await record_attempt(user.id, ip, success=False, reason="locked")
raise LoginError("account_locked", retry_at=user.locked_until)
if not user or not verify_password(password, user.password_hash):
await record_attempt(user.id if user else None, ip, success=False,
reason="bad_password" if user else "no_user")
if user:
await bump_user_failures(user.id)
await maybe_lock_ip(ip)
raise LoginError("invalid_credentials")
# password ok → reset user counter, prosegui con MFA
await reset_user_failures(user.id)
return LoginResult(user_id=user.id, mfa_required=True)
async def bump_user_failures(user_id: int):
# Conta fail negli ultimi 15 min
fails = await db.scalar("""
SELECT COUNT(*) FROM login_attempts
WHERE user_account_id = :uid
AND success = false
AND attempted_at > now() - interval '15 minutes'
""", {"uid": user_id})
if fails >= 5:
streak = await compute_streak(user_id)
duration = [15, 30, 60, 1440][min(streak, 3)] # minuti
await db.execute("""
UPDATE user_accounts SET
locked_until = now() + (:dur || ' minutes')::interval,
lock_streak = :streak,
last_lock_at = now()
WHERE id = :uid
""", {"uid": user_id, "dur": duration, "streak": streak + 1})
await notify_lock(user_id)
async def maybe_lock_ip(ip: str):
fails = await db.scalar("""
SELECT COUNT(*) FROM login_attempts
WHERE source_ip = :ip
AND success = false
AND attempted_at > now() - interval '5 minutes'
""", {"ip": ip})
if fails >= 10:
await db.execute("""
INSERT INTO ip_locks(source_ip, locked_until, reason)
VALUES (:ip, now() + interval '1 hour', 'brute_force')
ON CONFLICT (source_ip)
DO UPDATE SET locked_until = EXCLUDED.locked_until
""", {"ip": ip})
3.5 Unlock manuale
Admin con system.users.manage può forzare unlock via API:
POST /api/admin/users/{id}/unlock
→ UPDATE user_accounts SET locked_until=NULL, failed_login_count=0 WHERE id=:id
→ audit log obbligatorio (motivo richiesto in body)
4. Session timeout
4.1 Regole
- Idle timeout: 30 minuti senza API call.
- Ogni API call rinnova
users_sessions.last_activity_at. - Refresh token rinnova l'access token finché entro la finestra idle.
- Ogni API call rinnova
- Absolute timeout: 8 ore dal login, indipendente dall'attività.
- Logout forzato: se
users_sessions.invalidated_at IS NOT NULL(revocato manualmente da admin, password change, MFA reset).
4.2 Schema
CREATE TABLE users_sessions (
id BIGSERIAL PRIMARY KEY,
user_account_id BIGINT NOT NULL REFERENCES user_accounts(id),
session_token UUID NOT NULL UNIQUE,
refresh_token UUID NOT NULL UNIQUE,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
last_activity_at TIMESTAMPTZ NOT NULL DEFAULT now(),
expires_at TIMESTAMPTZ NOT NULL, -- absolute: created_at + 8h
invalidated_at TIMESTAMPTZ NULL,
source_ip INET,
user_agent TEXT
);
CREATE INDEX idx_users_sessions_user ON users_sessions(user_account_id) WHERE invalidated_at IS NULL;
4.3 Middleware FastAPI
async def session_guard(request: Request):
sess = await load_session(request.headers["Authorization"])
if not sess or sess.invalidated_at:
raise HTTPException(401, "session_invalid")
if sess.expires_at < now():
raise HTTPException(401, "session_absolute_expired")
if (now() - sess.last_activity_at).total_seconds() > 1800:
raise HTTPException(401, "session_idle_expired")
await db.execute(
"UPDATE users_sessions SET last_activity_at = now() WHERE id = :id",
{"id": sess.id},
)
return sess
5. Password policy
Allineata a NIST SP 800-63B (no rotation forzata).
| Requisito | Valore |
|---|---|
| Lunghezza minima | 12 caratteri |
| Lunghezza massima | 128 caratteri |
| Charset | qualsiasi (compresi spazi e Unicode) |
| Complessità imposta | nessuna (no "almeno 1 maiuscola/numero/simbolo") |
| Hash | Argon2id, memory_cost=64 MiB, time_cost=3, parallelism=4 |
| Check breached | Pwned Passwords k-anonymity API (ON di default) |
| Rotation | NON forzata; solo su compromissione sospetta |
| Re-use check | ultime 5 password hash conservate, no riuso |
5.1 Configurazione Argon2id
from argon2 import PasswordHasher
password_hasher = PasswordHasher(
time_cost=3,
memory_cost=65536, # 64 MiB
parallelism=4,
hash_len=32,
salt_len=16,
)
5.2 Pwned Passwords check
import hashlib, httpx
async def is_pwned(password: str) -> bool:
sha1 = hashlib.sha1(password.encode()).hexdigest().upper()
prefix, suffix = sha1[:5], sha1[5:]
async with httpx.AsyncClient(timeout=2.5) as c:
r = await c.get(f"https://api.pwnedpasswords.com/range/{prefix}")
if r.status_code != 200:
return False # fail-open: API down non blocca il login
return any(line.split(":")[0] == suffix for line in r.text.splitlines())
Setting SECURITY_PWNED_CHECK_ENABLED=true (default).
6. MFA TOTP
| Parametro | Valore |
|---|---|
| Issuer | Akira-staging.asheep.it (env MFA_ISSUER) |
| Algoritmo | SHA-1 (compat. Google Authenticator / Authy / 1Password) |
| Digits | 6 |
| Period | 30 s |
| Window | ±1 step (60 s totali di tolleranza) |
| Backup codes | 10 recovery codes Argon2id (sezione 2) |
| Hardware MFA | WebAuthn/YubiKey → roadmap v2 (via Authlib) |
6.1 Enrollment
import pyotp
secret = pyotp.random_base32()
uri = pyotp.totp.TOTP(secret).provisioning_uri(
name=user.email,
issuer_name=settings.MFA_ISSUER,
)
# user_accounts.mfa_secret_encrypted = encrypt(secret, kms_key)
QR code mostrato in UI, secret salvato cifrato a riposo (KMS o pgcrypto).
6.2 Verifica
totp = pyotp.TOTP(decrypt(user.mfa_secret_encrypted))
ok = totp.verify(code, valid_window=1)
6.3 Reset MFA
- Self-service: con recovery code valido.
- Admin: con
system.users.manage+ MFA re-challenge admin. Genera notifica Telegram + email all'utente target.
7. Alert security
| Evento | Notifica |
|---|---|
| Login da nuova IP (geoip MaxMind Lite) | email user + Telegram admin |
| Login da nuovo paese | email user + Telegram admin |
| 3+ injection attempt AgentCore in 1 h | Telegram admin |
| Account lock | email user + Telegram admin |
| IP lock | Telegram admin |
| Recovery code usato | email user (per conferma "sono stato io") |
| MFA reset | email user + Telegram admin |
| Password change | email user |
| Admin unlock di un altro utente | Telegram admin (audit visibile) |
7.1 Detection nuova IP
async def is_new_ip_for_user(user_id: int, ip: str) -> bool:
seen = await db.scalar("""
SELECT 1 FROM login_attempts
WHERE user_account_id = :uid AND source_ip = :ip AND success = true
LIMIT 1
""", {"uid": user_id, "ip": ip})
return seen is None
7.2 Geoip lookup
import geoip2.database
reader = geoip2.database.Reader("/var/lib/geoip/GeoLite2-City.mmdb")
def lookup(ip: str) -> dict:
try:
r = reader.city(ip)
return {"country": r.country.iso_code, "city": r.city.name}
except Exception:
return {"country": None, "city": None}
8. Configurazione di default (env)
MFA_ISSUER=Akira-staging.asheep.it
SECURITY_PWNED_CHECK_ENABLED=true
SESSION_IDLE_TIMEOUT_MIN=30
SESSION_ABSOLUTE_TIMEOUT_HOURS=8
LOCK_USER_THRESHOLD=5
LOCK_USER_WINDOW_MIN=15
LOCK_IP_THRESHOLD=10
LOCK_IP_WINDOW_MIN=5
LOCK_IP_DURATION_MIN=60
RECOVERY_CODES_COUNT=10
RECOVERY_REENROLL_AFTER_USED=5
9. Checklist deploy security
-
recovery_codes,login_attempts,ip_locks,users_sessionscreate. - Argon2id parametri verificati su VPS target (memory_cost compatibile RAM).
-
GeoLite2-City.mmdbaggiornato (cron settimanalegeoipupdate). - Pwned Passwords API raggiungibile, timeout 2.5 s.
- Telegram bot chat_id_admin configurato in
settings.telegram.config. - Email SMTP
noreply@asheep.ittestata per notifiche security. - Test E2E: lock dopo 5 fail, exponential su 2° lock, IP lock su 10 fail multi-user.
- Test recovery code: enrollment, login con code, marcatura
used_at, re-enroll su 5 usati. - Test session: idle 30m, absolute 8h, admin revoke.