Passa al contenuto principale

Convention: Error Handling & Correlation ID

Convention documentale (non ADR formal) — vincolante per tutto il codice in apps/ di Akira.

Scopo

Garantire che ogni errore sia:

  1. Tracciabile end-to-end via correlation_id (HTTP → DB → NATS → AgentCore → log).
  2. Classificato in una gerarchia di exception tipizzate.
  3. Mappato consistentemente in HTTP response e log strutturati.

1. Gerarchia AkiraException

Tutte le eccezioni di dominio derivano da AkiraException. Niente raise Exception("...") sparso.

# apps/api/akira/errors.py

class AkiraException(Exception):
"""Base class. Subclasses set http_status and error_code."""
http_status: int = 500
error_code: str = "AKIRA_INTERNAL_ERROR"

def __init__(self, message: str, *, details: dict | None = None):
super().__init__(message)
self.message = message
self.details = details or {}

class AkiraValidationError(AkiraException):
http_status = 422
error_code = "AKIRA_VALIDATION_ERROR"

class AkiraNotFound(AkiraException):
http_status = 404
error_code = "AKIRA_NOT_FOUND"

class AkiraUnauthorized(AkiraException):
http_status = 401
error_code = "AKIRA_UNAUTHORIZED"

class AkiraForbidden(AkiraException):
http_status = 403
error_code = "AKIRA_FORBIDDEN"

class AkiraConflict(AkiraException):
http_status = 409
error_code = "AKIRA_CONFLICT"

class AkiraExternalServiceError(AkiraException):
http_status = 502
error_code = "AKIRA_EXTERNAL_SERVICE_ERROR"

Regole d'uso

  • Domain layer (services) solleva sempre AkiraException subclass — mai HTTP-aware exception.
  • HTTP layer (routers) NON cattura — il global handler middleware se ne occupa.
  • External service wrappers (Kamailio RPC, RTPengine NG, Revolut API, AgentCore) sollevano AkiraExternalServiceError con details={"upstream": ..., "upstream_code": ...}.

2. FastAPI exception handler middleware

# apps/api/akira/middleware/error_handler.py

from fastapi import Request
from fastapi.responses import JSONResponse
import structlog

log = structlog.get_logger()

async def akira_exception_handler(request: Request, exc: AkiraException):
correlation_id = request.state.correlation_id
log.error(
"akira_exception",
error_code=exc.error_code,
message=exc.message,
details=exc.details,
path=request.url.path,
)
return JSONResponse(
status_code=exc.http_status,
content={
"error_code": exc.error_code,
"message": exc.message,
"details": exc.details,
"correlation_id": correlation_id,
},
headers={"X-Correlation-Id": correlation_id},
)

Registrazione:

app.add_exception_handler(AkiraException, akira_exception_handler)

3. Correlation ID propagation

Middleware FastAPI

# apps/api/akira/middleware/correlation_id.py
import uuid
from contextvars import ContextVar

correlation_id_ctx: ContextVar[str] = ContextVar("correlation_id", default="")

class CorrelationIdMiddleware:
async def __call__(self, request, call_next):
cid = request.headers.get("X-Correlation-Id") or str(uuid.uuid4())
request.state.correlation_id = cid
token = correlation_id_ctx.set(cid)
try:
response = await call_next(request)
response.headers["X-Correlation-Id"] = cid
return response
finally:
correlation_id_ctx.reset(token)

structlog integration

# apps/api/akira/logging.py
import structlog

def add_correlation_id(logger, method_name, event_dict):
cid = correlation_id_ctx.get()
if cid:
event_dict["correlation_id"] = cid
return event_dict

structlog.configure(
processors=[
add_correlation_id,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer(),
],
)

Risultato: ogni log line emessa durante una request include correlation_id automaticamente.

4. Tool call AgentCore — propagation

AgentCore session → Akira tool MCP → backend services → log/DB.

# apps/agentcore-bridge/akira_mcp/handlers.py

async def handle_tool_call(payload: ToolCallPayload):
session_cid = payload.session.correlation_id # AgentCore propaga
token = correlation_id_ctx.set(session_cid)
try:
# qualsiasi chiamata downstream eredita session_cid
return await akira_api.call(payload.tool, payload.args)
finally:
correlation_id_ctx.reset(token)

L'header X-Correlation-Id: <session_cid> viene propagato anche nelle chiamate HTTP outbound (httpx client con event hook).

5. CDR audit — propagation

Ogni record CDR scritto su TimescaleDB include correlation_id del task originale per debug end-to-end Kamailio → NATS → worker → DB.

  • Schema: cdr_* table ha colonna correlation_id UUID NULL.
  • Kamailio inietta header X-Akira-Correlation-Id nei record acc_json quando disponibile (es. tool call AgentCore che ha originato una test call).
  • cdr-worker estrae e persiste.

6. Retry pattern — tenacity

Standard per qualsiasi chiamata I/O verso servizi esterni con failure transient.

from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type

@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=8),
retry=retry_if_exception_type(AkiraExternalServiceError),
reraise=True,
)
async def call_kamailio_rpc(method: str, params: dict):
...

Parametri standard:

  • Max attempts: 3.
  • Backoff: exponential 1s / 2s / 4s, capped at 8s.
  • Retry only: AkiraExternalServiceError (transient). NON retry su AkiraValidationError/AkiraNotFound/AkiraConflict.

7. Circuit breaker — pybreaker

Per chiamate verso servizi esterni con failure prolungato (evita cascade failure).

import pybreaker

kamailio_breaker = pybreaker.CircuitBreaker(
fail_max=5,
reset_timeout=60,
name="kamailio_rpc",
)

@kamailio_breaker
async def call_kamailio_rpc(...):
...

Config standard:

  • fail_max: 5 fallimenti consecutivi.
  • reset_timeout: 60s (open → half-open).
  • Applicato a: Kamailio RPC, RTPengine NG control socket, Revolut API, AgentCore HTTP.

Quando il breaker è open, le chiamate falliscono immediatamente con AkiraExternalServiceError(message="circuit breaker open: ...").

8. Checklist code review

Per ogni PR che tocca service / handler / external client:

  • Tutte le eccezioni di dominio sono AkiraException subclass.
  • Nessun except Exception "swallow" — log + re-raise.
  • Le chiamate external sono protette da @retry + @kamailio_breaker (o equivalente).
  • Nuove route hanno test che verifica error response schema (error_code, message, correlation_id presenti).
  • Log strutturati: chiavi snake_case, no f-string nei messaggi (usare kwargs).

9. Esempio response error

HTTP/1.1 404 Not Found
Content-Type: application/json
X-Correlation-Id: 7c4f6d2a-1b3e-4a8c-9f1d-2b5e8c4f6d2a

{
"error_code": "AKIRA_NOT_FOUND",
"message": "Company not found",
"details": {"company_id": "abc-123"},
"correlation_id": "7c4f6d2a-1b3e-4a8c-9f1d-2b5e8c4f6d2a"
}