Passa al contenuto principale

ADR-0028 - Pattern Analyzer / Fraud Detection engine

  • Status: Accepted (2026-06-08)
  • Deciders: Massimo Bagnoli
  • Implementation tasks: TASK-509, TASK-510, TASK-511, TASK-512
  • Depends on: ADR-0007, ADR-0010, ADR-0015, ADR-0016, ADR-0027
  • Supersedes: frontend-only Pattern Analyzer localStorage runs, legacy cdr_patterns as the target model and the ADR-0007 legacy note that assigned "Pattern Analyzer scheduled" to arq + Redis
  • Superseded by: nessuno

Context

The /pattern-analyzer page currently exposes a frontend-only fraud catalog, sample detections and run history stored in localStorage. That is useful for UI shape, but it cannot inspect real CDR traffic, cannot be scheduled reliably and cannot provide auditable drill-down from a finding to the CDR rows that triggered it.

Akira already has the CDR ingestion path required by the engine: Kamailio emits CDR JSONL through the sidecar defined by ADR-0010, NATS JetStream and cdr-worker persist records into the TimescaleDB cdr hypertable per ADR-0007 and ADR-0016. ADR-0027 also fixed an anti-noise principle that applies here: thresholds must include min_samples, and evaluation windows must be explicit rather than inferred from frontend state.

This ADR defines the canonical backend model and detection contract for the 9 Pattern Analyzer detections present in the frontend catalog. Implementation is deferred to TASK-510 and TASK-511; frontend wiring is deferred to TASK-512.

Decision

Pattern Analyzer becomes a persistent backend engine over the cdr hypertable. The frontend catalog and localStorage run history are replaced by backend catalog, run and finding resources.

The legacy cdr_patterns table from the baseline schema is not extended. It may be retained as a compatibility/source-history table until TASK-510 migrates or deprecates it, but the canonical target model for new implementation is pattern_detections, pattern_runs and pattern_findings.

ADR-0028 explicitly supersedes the ADR-0007 "arq + Redis resta per / Pattern Analyzer scheduled" line. That note is legacy guidance from before the accepted ADR-0015 worker model and before the production Benchmark runner established the current durable-work idiom. Pattern Analyzer scheduled runs must therefore use Postgres-backed run state and worker claiming, not Redis job state.

Data model

pattern_detections

Catalog table. Rows are seeded and can be enabled/disabled or tuned by admins.

Required columns:

  • id BIGSERIAL PRIMARY KEY
  • kind VARCHAR(64) UNIQUE NOT NULL
  • label VARCHAR(128) NOT NULL
  • description TEXT NOT NULL
  • default_params JSONB NOT NULL
  • enabled BOOLEAN NOT NULL DEFAULT TRUE
  • created_at TIMESTAMPTZ NOT NULL DEFAULT now()
  • updated_at TIMESTAMPTZ NOT NULL DEFAULT now()

kind values are wangiri, irsf, sim_box, ping_calls, msrn_range, auto_call_center, anomalous_cli, concentration_risk and temporal_anomaly.

pattern_runs

Run table. A run is immutable except for status, summary and timestamps.

Required columns:

  • id UUID PRIMARY KEY
  • requested_by BIGINT NULL
  • trigger_kind VARCHAR(32) NOT NULL with values on_demand | scheduled
  • window_from TIMESTAMPTZ NOT NULL
  • window_to TIMESTAMPTZ NOT NULL
  • scope JSONB NOT NULL
  • detections JSONB NOT NULL
  • params_override JSONB NOT NULL DEFAULT '{}'::jsonb
  • status VARCHAR(32) NOT NULL with values queued | running | succeeded | failed | canceled
  • idempotency_key VARCHAR(128) UNIQUE
  • lease_owner VARCHAR(128)
  • lease_until TIMESTAMPTZ
  • started_at TIMESTAMPTZ
  • ended_at TIMESTAMPTZ
  • summary JSONB NOT NULL DEFAULT '{}'::jsonb
  • error TEXT
  • created_at TIMESTAMPTZ NOT NULL DEFAULT now()

queued is the API/database enum name for the pending state described by the lead review. lease_owner and lease_until are nullable because only claimed runs have an active worker lease.

scope supports additive filters: originator_ids, terminator_ids, destination_ids, dst_prefixes, src_prefixes and include_test_traffic. The default include_test_traffic is false, consistent with ADR-0026 test traffic isolation.

pattern_findings

Finding table. Each row is a suspicious entity in a run, not each suspicious CDR.

Required columns:

  • id UUID PRIMARY KEY
  • run_id UUID NOT NULL REFERENCES pattern_runs(id) ON DELETE CASCADE
  • detection_kind VARCHAR(64) NOT NULL
  • entity_type VARCHAR(32) NOT NULL
  • entity_ref JSONB NOT NULL
  • severity VARCHAR(16) NOT NULL with values low | medium | high | critical
  • confidence NUMERIC(5,2) NOT NULL
  • score NUMERIC(7,2) NOT NULL
  • metrics JSONB NOT NULL
  • params_used JSONB NOT NULL
  • evidence_cdr_refs JSONB NOT NULL
  • first_seen_at TIMESTAMPTZ
  • last_seen_at TIMESTAMPTZ
  • created_at TIMESTAMPTZ NOT NULL DEFAULT now()

entity_type values are originator, terminator, destination, cli, dst_prefix, src_prefix, route and time_bucket.

evidence_cdr_refs stores a capped array of CDR references, not full CDR rows:

[
{"id": 12345, "call_id": "abc", "started_at": "2026-06-08T08:12:00Z"}
]

The cap is 100 references per finding by default. Drill-down uses those refs and the existing CDR browser/trace endpoints, never duplicated CDR payloads.

Detection framework

Each detection is a pure analyzer function:

analyze(window_from, window_to, scope, params) -> list[PatternFinding]

The function must:

  • apply the run window and scope before aggregation;
  • use only indexed predicates on cdr.started_at, scoped entity ids or prefixable src/dst filters;
  • reject or fail the run if the requested window exceeds the configured maximum lookback;
  • require min_samples before producing a finding, following ADR-0027;
  • return bounded evidence CDR refs for drill-down;
  • return deterministic findings for the same window/scope/params.

Default cost caps:

  • on-demand maximum lookback: 7 days;
  • scheduled maximum lookback: 24 hours unless explicitly configured;
  • maximum detections per run: 9;
  • maximum findings returned per detection: 500, ordered by severity/score;
  • maximum evidence CDR refs per finding: 100;
  • compressed chunks older than 7 days are not queried by default.

Severity is derived from score unless a detection declares a stronger rule:

  • low: score < 30
  • medium: 30 <= score < 50
  • high: 50 <= score < 75
  • critical: score >= 75

score is computed as:

score = base_weight * (1 + ln(observed / threshold))

where observed >= threshold; the result is clamped to 100. confidence is based on sample size, signal agreement and evidence completeness and is clamped to 0-100.

Detection catalog

The following reference SQL is illustrative. TASK-510 may adapt column names to the actual SQLAlchemy model, but must preserve the heuristic, thresholds and scope/window behavior.

1. Wangiri

Heuristic: many unanswered or very short calls toward international/premium destinations, low ASR and average duration <= 4 seconds. Entity is usually originator + dst_prefix or cli.

Default params:

{
"window_seconds": 3600,
"min_samples": 30,
"max_short_duration_sec": 4,
"max_asr": 0.05,
"premium_or_international_only": true,
"base_weight": 35
}

Reference query:

SELECT originator_id,
left(dst, 6) AS dst_prefix,
COUNT(*) AS attempts,
COUNT(*) FILTER (WHERE disposition = 'ANSWERED')::numeric / COUNT(*) AS asr,
AVG(COALESCE(billsec, duration_sec, 0)) AS avg_duration_sec
FROM cdr
WHERE started_at >= :window_from
AND started_at < :window_to
AND dst IS NOT NULL
GROUP BY originator_id, left(dst, 6)
HAVING COUNT(*) >= :min_samples
AND COUNT(*) FILTER (WHERE disposition = 'ANSWERED')::numeric / COUNT(*) <= :max_asr
AND AVG(COALESCE(billsec, duration_sec, 0)) <= :max_short_duration_sec;

2. IRSF

Heuristic: spike toward premium or high-cost prefixes. Prefix lists are configuration data, not hardcoded code constants.

Default params:

{
"window_seconds": 3600,
"baseline_days": 14,
"min_samples": 20,
"min_attempts": 20,
"spike_ratio": 3.0,
"premium_prefix_source": "pattern_params.premium_prefixes",
"base_weight": 45
}

Reference query:

WITH current AS (
SELECT originator_id, left(dst, 6) AS dst_prefix, COUNT(*) AS attempts
FROM cdr
WHERE started_at >= :window_from
AND started_at < :window_to
AND dst LIKE ANY(:premium_prefix_like)
GROUP BY originator_id, left(dst, 6)
),
baseline AS (
SELECT originator_id, left(dst, 6) AS dst_prefix,
COUNT(*)::numeric / :baseline_windows AS avg_attempts
FROM cdr
WHERE started_at >= :baseline_from
AND started_at < :window_from
AND dst LIKE ANY(:premium_prefix_like)
GROUP BY originator_id, left(dst, 6)
)
SELECT c.originator_id, c.dst_prefix, c.attempts, COALESCE(b.avg_attempts, 0) AS baseline_attempts
FROM current c
LEFT JOIN baseline b USING (originator_id, dst_prefix)
WHERE c.attempts >= :min_attempts
AND c.attempts >= GREATEST(:min_samples, COALESCE(b.avg_attempts, 0) * :spike_ratio);

3. SIM-box

Heuristic: bypass pattern with off-net to on-net traffic, many distinct CLI on a route, low ASR and anomalous ACD.

Default params:

{
"window_seconds": 3600,
"min_samples": 100,
"min_distinct_cli": 25,
"max_asr": 0.35,
"max_acd_sec": 35,
"same_country_required": true,
"base_weight": 40
}

Reference query:

SELECT terminator_id,
destination_id,
COUNT(*) AS attempts,
COUNT(DISTINCT src) AS distinct_cli,
COUNT(*) FILTER (WHERE disposition = 'ANSWERED')::numeric / COUNT(*) AS asr,
AVG(billsec) FILTER (WHERE disposition = 'ANSWERED') AS acd_sec
FROM cdr
WHERE started_at >= :window_from
AND started_at < :window_to
AND terminator_id IS NOT NULL
GROUP BY terminator_id, destination_id
HAVING COUNT(*) >= :min_samples
AND COUNT(DISTINCT src) >= :min_distinct_cli
AND COUNT(*) FILTER (WHERE disposition = 'ANSWERED')::numeric / COUNT(*) <= :max_asr
AND COALESCE(AVG(billsec) FILTER (WHERE disposition = 'ANSWERED'), 0) <= :max_acd_sec;

4. Ping calls

Heuristic: high volume of ultra-short calls, often automated, with duration less than 3 seconds and very low answer quality.

Default params:

{
"window_seconds": 900,
"min_samples": 100,
"max_duration_sec": 3,
"min_short_ratio": 0.25,
"base_weight": 30
}

Reference query:

SELECT originator_id,
destination_id,
COUNT(*) AS attempts,
COUNT(*) FILTER (WHERE COALESCE(billsec, duration_sec, 0) <= :max_duration_sec)::numeric / COUNT(*) AS short_ratio
FROM cdr
WHERE started_at >= :window_from
AND started_at < :window_to
GROUP BY originator_id, destination_id
HAVING COUNT(*) >= :min_samples
AND COUNT(*) FILTER (WHERE COALESCE(billsec, duration_sec, 0) <= :max_duration_sec)::numeric / COUNT(*) >= :min_short_ratio;

5. MSRN range

Heuristic: calls toward configured MSRN or roaming ranges. Ranges are managed as configurable prefixes because MSRN allocations are operator-specific.

Default params:

{
"window_seconds": 3600,
"min_samples": 10,
"msrn_prefixes": [],
"min_attempts": 10,
"base_weight": 35
}

Reference query:

SELECT originator_id,
left(dst, 8) AS msrn_prefix,
COUNT(*) AS attempts,
COUNT(DISTINCT dst) AS distinct_numbers
FROM cdr
WHERE started_at >= :window_from
AND started_at < :window_to
AND dst LIKE ANY(:msrn_prefix_like)
GROUP BY originator_id, left(dst, 8)
HAVING COUNT(*) >= :min_samples
AND COUNT(*) >= :min_attempts;

6. Auto call-center

Heuristic: machine burst with regular inter-call intervals and nearly identical durations, typical of automated dialers. Legitimate call-center traffic is not blocked automatically.

Default params:

{
"window_seconds": 1800,
"min_samples": 200,
"max_interval_cv": 0.20,
"max_duration_cv": 0.25,
"min_distinct_dst": 100,
"base_weight": 25
}

Reference query:

WITH ordered AS (
SELECT id, call_id, started_at, originator_id, dst,
COALESCE(billsec, duration_sec, 0) AS duration_sec,
EXTRACT(EPOCH FROM started_at - LAG(started_at) OVER (PARTITION BY originator_id ORDER BY started_at)) AS interval_sec
FROM cdr
WHERE started_at >= :window_from
AND started_at < :window_to
),
stats AS (
SELECT originator_id,
COUNT(*) AS attempts,
COUNT(DISTINCT dst) AS distinct_dst,
stddev_pop(interval_sec) / NULLIF(avg(interval_sec), 0) AS interval_cv,
stddev_pop(duration_sec) / NULLIF(avg(duration_sec), 0) AS duration_cv
FROM ordered
GROUP BY originator_id
)
SELECT *
FROM stats
WHERE attempts >= :min_samples
AND distinct_dst >= :min_distinct_dst
AND interval_cv <= :max_interval_cv
AND duration_cv <= :max_duration_cv;

7. Anomalous CLI

Heuristic: malformed, spoofed or invalid CLI: null/empty, non-numeric after normalization, all-zero, implausible length or not matching an originator CLI policy when such a policy exists.

Default params:

{
"window_seconds": 3600,
"min_samples": 20,
"min_invalid_ratio": 0.10,
"min_invalid_calls": 20,
"base_weight": 30
}

Reference query:

SELECT originator_id,
COUNT(*) AS attempts,
COUNT(*) FILTER (
WHERE src IS NULL
OR src = ''
OR src !~ '^\\+?[0-9]{6,15}$'
OR src ~ '^\\+?0+$'
) AS invalid_cli
FROM cdr
WHERE started_at >= :window_from
AND started_at < :window_to
GROUP BY originator_id
HAVING COUNT(*) >= :min_samples
AND COUNT(*) FILTER (
WHERE src IS NULL
OR src = ''
OR src !~ '^\\+?[0-9]{6,15}$'
OR src ~ '^\\+?0+$'
) >= :min_invalid_calls
AND COUNT(*) FILTER (
WHERE src IS NULL
OR src = ''
OR src !~ '^\\+?[0-9]{6,15}$'
OR src ~ '^\\+?0+$'
)::numeric / COUNT(*) >= :min_invalid_ratio;

8. Concentration risk

Heuristic: excessive share of traffic or cost on one destination or route. This is a business concentration risk even when calls are legitimate.

Default params:

{
"window_seconds": 3600,
"min_samples": 100,
"max_destination_share": 0.60,
"max_route_share": 0.70,
"base_weight": 25
}

Reference query:

WITH grouped AS (
SELECT originator_id, destination_id, terminator_id, COUNT(*) AS attempts
FROM cdr
WHERE started_at >= :window_from
AND started_at < :window_to
GROUP BY originator_id, destination_id, terminator_id
),
totals AS (
SELECT originator_id, SUM(attempts) AS total_attempts
FROM grouped
GROUP BY originator_id
)
SELECT g.originator_id,
g.destination_id,
g.terminator_id,
g.attempts,
g.attempts::numeric / t.total_attempts AS share
FROM grouped g
JOIN totals t USING (originator_id)
WHERE t.total_attempts >= :min_samples
AND g.attempts::numeric / t.total_attempts >= :max_destination_share;

9. Temporal anomaly

Heuristic: traffic in anomalous hours versus baseline for the same originator and destination. Baseline compares the current bucket with recent buckets for the same hour-of-week.

Default params:

{
"window_seconds": 3600,
"baseline_days": 28,
"min_samples": 30,
"z_score_threshold": 3.0,
"min_spike_ratio": 2.5,
"base_weight": 35
}

Reference query:

WITH current AS (
SELECT originator_id,
destination_id,
date_trunc('hour', started_at) AS bucket,
COUNT(*) AS attempts
FROM cdr
WHERE started_at >= :window_from
AND started_at < :window_to
GROUP BY originator_id, destination_id, date_trunc('hour', started_at)
),
baseline AS (
SELECT originator_id,
destination_id,
EXTRACT(ISODOW FROM bucket) AS dow,
EXTRACT(HOUR FROM bucket) AS hour,
AVG(hourly_attempts) AS avg_attempts,
STDDEV_POP(hourly_attempts) AS stddev_attempts
FROM (
SELECT originator_id,
destination_id,
date_trunc('hour', started_at) AS bucket,
COUNT(*) AS hourly_attempts
FROM cdr
WHERE started_at >= :baseline_from
AND started_at < :window_from
GROUP BY originator_id, destination_id, date_trunc('hour', started_at)
) h
GROUP BY originator_id, destination_id, EXTRACT(ISODOW FROM bucket), EXTRACT(HOUR FROM bucket)
)
SELECT c.originator_id,
c.destination_id,
c.bucket,
c.attempts,
b.avg_attempts,
b.stddev_attempts
FROM current c
JOIN baseline b
ON b.originator_id = c.originator_id
AND b.destination_id = c.destination_id
AND b.dow = EXTRACT(ISODOW FROM c.bucket)
AND b.hour = EXTRACT(HOUR FROM c.bucket)
WHERE c.attempts >= :min_samples
AND c.attempts >= b.avg_attempts * :min_spike_ratio
AND (c.attempts - b.avg_attempts) / NULLIF(b.stddev_attempts, 0) >= :z_score_threshold;

Execution model

Pattern Analyzer runs are backend jobs with Postgres as source of truth.

On-demand execution:

  1. API creates a pattern_runs row with status queued.
  2. API may publish a NATS wakeup notification such as pattern.runs.requested, but that message is only a hint. It is not the durable job record and is not required for correctness.
  3. pattern-analyzer-worker claims the run from Postgres with a FOR UPDATE SKIP LOCKED lease, using the same durable-work idiom as the Benchmark runner claim_next_job path.
  4. The worker executes selected detections, writes pattern_findings, updates pattern_runs.summary and marks the run succeeded or failed.

Scheduled execution:

  1. A systemd timer for pattern-analyzer-worker enqueues due runs using the same internal service path as on-demand runs.
  2. Each scheduled fire creates one pattern_runs row with a deterministic idempotency_key.
  3. Duplicate enqueue attempts for the same schedule/window are ignored by the unique idempotency key.

The worker is a systemd-managed Python service aligned with ADR-0015 and with the Benchmark runner already in production: Postgres is the source of truth for run state, findings and leases; NATS is an optional wakeup/notify channel, not a second broker for job state. This ADR intentionally does not introduce Celery, RQ, Dramatiq, APScheduler or arq/Redis for Pattern Analyzer runs.

Claim and lease requirements:

  • workers claim only queued runs whose lease is absent or expired;
  • the claim happens in one transaction with row locking and FOR UPDATE SKIP LOCKED;
  • the claim sets status = 'running', started_at, lease_owner and lease_until;
  • long detections refresh lease_until periodically;
  • expired running leases can be reclaimed only after the configured timeout;
  • every run has an explicit bounded window, so the worker never performs a full-table scan.

Retry behavior:

  • transient worker failures may retry by reclaiming the same queued or lease-expired run;
  • the database idempotency key prevents duplicate run rows;
  • pattern_findings are replaced inside a transaction for a rerun of the same run_id;
  • failed runs retain error and partial summary, but partial findings are not visible unless the run succeeds.

API contract

All endpoints live under /api/v1/pattern.

GET /api/v1/pattern/detections

Returns the catalog ordered by label. Requires pattern.read.

{
"items": [
{
"kind": "wangiri",
"label": "Wangiri",
"description": "...",
"default_params": {"window_seconds": 3600, "min_samples": 30},
"enabled": true
}
]
}

POST /api/v1/pattern/runs

Starts a run. Requires pattern.run.

{
"window_from": "2026-06-08T07:00:00Z",
"window_to": "2026-06-08T08:00:00Z",
"detections": ["wangiri", "irsf"],
"scope": {"originator_ids": [10], "include_test_traffic": false},
"params_override": {"wangiri": {"min_samples": 50}},
"idempotency_key": "optional-client-key"
}

Response is 202 Accepted:

{
"id": "0c4764ce-07a8-4af4-9b41-7df55c6021ea",
"status": "queued"
}

GET /api/v1/pattern/runs

Lists runs with filters: status, trigger_kind, detection_kind, window_from, window_to, limit and offset. Requires pattern.read.

GET /api/v1/pattern/runs/{id}

Returns run detail including summary and selected detection kinds. It does not inline all findings by default. Requires pattern.read.

GET /api/v1/pattern/findings?run_id=...

Lists findings for one run. Optional filters are detection_kind, severity, entity_type, limit and offset. Requires pattern.read.

RBAC and actions

Pattern Analyzer permissions are:

  • pattern.read: list catalog, runs and findings.
  • pattern.run: start on-demand runs and manage scheduled runs.

Suggested actions shown by the UI are navigation/actions into other domains, not automatic enforcement by the analyzer:

  • block originator or CLI range;
  • block premium/MSRN prefix;
  • rate-limit originator;
  • flag terminator or route;
  • open ticket;
  • schedule audit.

Executing those actions requires the target domain permission and must write the normal audit trail. Pattern Analyzer may prefill action context from a finding, but it must not silently block traffic, mutate access policies or open tickets without the explicit target action.

Consequences

Positive

  • Pattern Analyzer results become auditable and reproducible.
  • Frontend localStorage no longer owns operational state.
  • Drill-down to CDR is possible without copying full CDR payloads into finding rows.
  • min_samples, windows and scope are backend-enforced, matching ADR-0027.
  • Scheduled and on-demand runs share one execution path.

Negative

  • TASK-510 must introduce an additive migration and seed catalog data.
  • Detection queries can be expensive if scope/window caps are misconfigured.
  • Some detections depend on configurable prefix lists or future richer metadata; until those lists are curated, false positives are expected.
  • The existing cdr_patterns table becomes legacy state and needs a clear migration/deprecation decision in implementation.

References

  • ADR-0007: CDR pipeline NATS JetStream.
  • ADR-0010: Kamailio CDR emit pattern.
  • ADR-0015: Background job framework.
  • ADR-0016: CDR pipeline implementation.
  • ADR-0026: Benchmark traffic isolation.
  • ADR-0027: Quality policy min_samples and evaluation windows.
  • docs/security/fraud-detection-patterns.md: cap.46 fraud pattern background.
  • TASK-509: ADR drafting.
  • TASK-510/TASK-511: backend implementation.
  • TASK-512: frontend wiring.