Масштабирование системы обнаружения мошенничества¶
~4 минуты чтения
Предварительно: Компоненты, Метрики
Fraud detection -- одна из самых требовательных к масштабированию ML-систем. Visa обрабатывает 65,000 TPS в пике (Black Friday), при этом каждая транзакция должна быть оценена за < 100 мс с availability 99.99% (52 мин downtime в год). Это означает: 20+ stateless scoring pods с auto-scaling, Redis cluster из 20 шардов (200 GB, 2M ops/sec) для feature store, и latency budget распределённый побайтно: 15 мс на фичи, 5 мс на правила, 30 мс на ML inference, 18 мс на граф, 3 мс на решение. При этом инфраструктура стоит ~$57K/мес, а один час простоя -- $2-5M потерь от пропущенного фрода.
Traffic Patterns¶
Transaction Volume¶
graph LR
subgraph DAILY["Daily Pattern (TPS)"]
NIGHT["00:00-06:00<br/>10K TPS<br/>(off-peak)"]
MORNING["06:00-12:00<br/>20-30K TPS<br/>(ramp-up)"]
PEAK["12:00-20:00<br/>35-50K TPS<br/>(peak)"]
EVENING["20:00-00:00<br/>20-30K TPS<br/>(wind-down)"]
NIGHT --> MORNING --> PEAK --> EVENING
end
style NIGHT fill:#e8eaf6,stroke:#3f51b5
style MORNING fill:#fff3e0,stroke:#ef6c00
style PEAK fill:#fce4ec,stroke:#c62828
style EVENING fill:#fff3e0,stroke:#ef6c00
Peak: 50K TPS (Black Friday) | Normal peak: 35K TPS (evening) | Off-peak: 10K TPS (night)
Latency Budget¶
Total budget: 100ms (p99)
+----------------------+--------+--------------+------------------+
| Stage | Budget | Actual (p50) | Actual (p99) |
+----------------------+--------+--------------+------------------+
| Feature Retrieval | 20ms | 5ms | 15ms |
| Rules Engine | 10ms | 2ms | 5ms |
| ML Model Inference | 40ms | 15ms | 30ms |
| Graph Query | 20ms | 8ms | 18ms |
| Decision Engine | 5ms | 1ms | 3ms |
| Response | 5ms | 1ms | 3ms |
+----------------------+--------+--------------+------------------+
| TOTAL | 100ms | 32ms | 74ms |
+----------------------+--------+--------------+------------------+
Horizontal Scaling¶
Service Scaling¶
# HPA for fraud scoring service
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: fraud-scoring-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: fraud-scoring
minReplicas: 20
maxReplicas: 200
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 60
- type: Pods
pods:
metric:
name: request_latency_p99
target:
type: AverageValue
averageValue: "80ms"
behavior:
scaleUp:
stabilizationWindowSeconds: 30
policies:
- type: Pods
value: 10
periodSeconds: 30
scaleDown:
stabilizationWindowSeconds: 300
Database Scaling¶
| Component | Strategy | Capacity |
|---|---|---|
| Feature Store (Redis) | Cluster, 20 shards | 200GB, 2M ops/s |
| Graph DB (Neo4j) | Causal cluster, 5 nodes | 100M nodes, 50K qps |
| Rules DB (Postgres) | Read replicas | 10K qps |
| Model Store (S3) | Multi-region | 100GB models |
| Event Store (Kafka) | 20 partitions | 100K msg/s |
Real-time Feature Computation¶
Streaming Architecture¶
graph LR
TXN["Transaction<br/>Events"] --> KAFKA["Kafka<br/>(buffer)"] --> FLINK["Flink<br/>(compute)"]
FLINK --> VEL["Velocity Counters<br/>(Redis)"]
FLINK --> GRAPH["Graph Updates<br/>(Neo4j)"]
FLINK --> PROF["Profile Updates<br/>(Redis)"]
style TXN fill:#e8eaf6,stroke:#3f51b5
style KAFKA fill:#fff3e0,stroke:#ef6c00
style FLINK fill:#f3e5f5,stroke:#9c27b0
style VEL fill:#e8f5e9,stroke:#4caf50
style GRAPH fill:#e8f5e9,stroke:#4caf50
style PROF fill:#e8f5e9,stroke:#4caf50
Flink Job for Velocity Features¶
class VelocityComputation(FlinkJob):
"""
Real-time velocity feature computation
"""
def process(self):
# Read from Kafka
transactions = self.env.add_source(
FlinkKafkaConsumer(
"transactions",
TransactionSchema(),
kafka_properties
)
)
# Compute velocity in sliding windows
velocity_1h = (
transactions
.key_by(lambda t: t.user_id)
.window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(1)))
.aggregate(VelocityAggregator())
)
# Write to Redis
velocity_1h.add_sink(
RedisSink(
key_template="user:{user_id}:velocity_1h",
ttl_seconds=3600
)
)
class VelocityAggregator(AggregateFunction):
def create_accumulator(self):
return {"count": 0, "amount": 0.0, "merchants": set()}
def add(self, transaction, accumulator):
accumulator["count"] += 1
accumulator["amount"] += transaction.amount
accumulator["merchants"].add(transaction.merchant_id)
return accumulator
def get_result(self, accumulator):
return {
"txn_count_1h": accumulator["count"],
"txn_amount_1h": accumulator["amount"],
"unique_merchants_1h": len(accumulator["merchants"]),
}
High Availability¶
Multi-Region Setup¶
graph TD
GLB["GLOBAL LOAD BALANCER"] --> US["US-EAST<br/>(Primary)"]
GLB --> EU["EU-WEST<br/>(Secondary)"]
GLB --> AP["AP-SOUTH<br/>(Secondary)"]
US --> FS1["Feature Store<br/>(Primary)"]
EU --> FS2["Feature Store<br/>(Replica)"]
AP --> FS3["Feature Store<br/>(Replica)"]
FS1 <-.->|"replication"| FS2
FS1 <-.->|"replication"| FS3
style GLB fill:#e8eaf6,stroke:#3f51b5
style US fill:#e8f5e9,stroke:#4caf50
style EU fill:#fff3e0,stroke:#ef6c00
style AP fill:#fff3e0,stroke:#ef6c00
style FS1 fill:#f3e5f5,stroke:#9c27b0
style FS2 fill:#f3e5f5,stroke:#9c27b0
style FS3 fill:#f3e5f5,stroke:#9c27b0
Failover Strategy¶
class FailoverManager:
"""
Handle component failures gracefully
"""
def __init__(self):
self.fallback_strategies = {
"ml_model": self.fallback_to_rules,
"feature_store": self.fallback_to_cache,
"graph_db": self.skip_graph_features,
}
async def score_with_failover(self, transaction):
"""
Score transaction with automatic failover
"""
try:
# Try full pipeline
return await self.full_scoring_pipeline(transaction)
except FeatureStoreTimeout:
# Fallback: use cached features
return await self.score_with_cached_features(transaction)
except MLModelTimeout:
# Fallback: rules only
return await self.rules_only_scoring(transaction)
except Exception as e:
# Ultimate fallback: default decision
logger.error(f"Scoring failed: {e}")
return self.default_decision(transaction)
def default_decision(self, transaction):
"""
Conservative default when everything fails
"""
if transaction.amount > 1000:
return Decision(action="review", reason="system_fallback")
else:
return Decision(action="approve", reason="low_amount_fallback")
Circuit Breaker¶
from circuitbreaker import circuit
class FraudScoringService:
@circuit(
failure_threshold=5,
recovery_timeout=30,
expected_exception=TimeoutError
)
async def get_ml_prediction(self, features):
"""ML prediction with circuit breaker"""
return await self.ml_client.predict(features)
@circuit(
failure_threshold=10,
recovery_timeout=60,
)
async def get_graph_features(self, user_id, device_id):
"""Graph features with circuit breaker"""
return await self.graph_client.query(user_id, device_id)
Model Serving Optimization¶
Model Quantization¶
def optimize_fraud_model(model_path: str) -> str:
"""
Quantize model for faster inference
"""
import onnx
from onnxruntime.quantization import quantize_dynamic
# Load ONNX model
model = onnx.load(model_path)
# Dynamic quantization (INT8)
quantized_path = model_path.replace(".onnx", "_quantized.onnx")
quantize_dynamic(
model_path,
quantized_path,
weight_type=QuantType.QUInt8
)
# Benchmark
original_latency = benchmark(model_path)
quantized_latency = benchmark(quantized_path)
print(f"Speedup: {original_latency / quantized_latency:.2f}x")
return quantized_path
Batching Strategy¶
class BatchedModelServer:
"""
Batch predictions for efficiency
"""
def __init__(self):
self.batch_queue = asyncio.Queue()
self.batch_size = 32
self.batch_timeout_ms = 5 # Max wait time
async def predict_single(self, features) -> float:
"""
Single prediction (batched internally)
"""
future = asyncio.Future()
await self.batch_queue.put((features, future))
return await future
async def batch_worker(self):
"""
Background worker that processes batches
"""
while True:
batch = []
deadline = time.monotonic() + self.batch_timeout_ms / 1000
# Collect batch
while len(batch) < self.batch_size:
remaining = deadline - time.monotonic()
if remaining <= 0:
break
try:
item = await asyncio.wait_for(
self.batch_queue.get(),
timeout=remaining
)
batch.append(item)
except asyncio.TimeoutError:
break
if not batch:
continue
# Process batch
features = [f for f, _ in batch]
predictions = self.model.predict_batch(features)
# Return results
for (_, future), pred in zip(batch, predictions):
future.set_result(pred)
Cost Optimization¶
Cost Breakdown¶
| Component | Type | Count | Cost/Month |
|---|---|---|---|
| Fraud Scoring Pods | c5.2xlarge | 50 | $12,000 |
| ML Model Serving | g4dn.xlarge | 10 | $5,200 |
| Feature Store (Redis) | r5.4xlarge | 20 | $16,000 |
| Graph DB (Neo4j) | r5.8xlarge | 5 | $10,000 |
| Kafka Cluster | kafka.m5.large | 10 | $5,000 |
| Flink Cluster | c5.4xlarge | 10 | $6,000 |
| Data Transfer | - | - | $3,000 |
| TOTAL | ~$57,200 |
Cost Reduction Strategies¶
- Tiered Scoring: Simple rules for low-risk transactions
- Spot Instances: For batch processing jobs
- Reserved Instances: For baseline capacity
- Model Pruning: Smaller models for faster inference
- Feature Selection: Fewer features = less storage
class TieredScoring:
"""
Different scoring tiers based on transaction risk
"""
def score(self, transaction):
# Tier 1: Rules only (fast, cheap)
if transaction.amount < 10:
return self.rules_only_score(transaction)
# Tier 2: Rules + simple ML
if transaction.amount < 100:
return self.simple_ml_score(transaction)
# Tier 3: Full pipeline (expensive)
return self.full_pipeline_score(transaction)
Disaster Recovery¶
Recovery Procedures¶
| Scenario | RTO | RPO | Recovery Action |
|---|---|---|---|
| Pod failure | 30s | 0 | K8s auto-restart |
| Node failure | 2min | 0 | Pod rescheduling |
| Redis shard failure | 5min | 1min | Failover to replica |
| Graph DB failure | 10min | 5min | Failover to read replica |
| Region failure | 15min | 1min | DNS failover |
| Model corruption | 30min | N/A | Rollback to previous version |
Backup Strategy¶
# Daily backups
backup_schedule:
feature_store:
type: redis_snapshot
frequency: every_4_hours
retention: 7_days
graph_db:
type: neo4j_backup
frequency: daily
retention: 30_days
training_data:
type: s3_versioning
frequency: continuous
retention: 90_days
models:
type: mlflow_registry
retention: all_versions
Заблуждение: горизонтальное масштабирование решает все проблемы с латентностью
Добавление подов не поможет, если bottleneck -- Feature Store или Graph DB. При 50K TPS каждый запрос к Redis занимает 5-15 мс. Если Redis cluster отвечает медленно (сетевой джиттер, hot key), латентность всех 200 подов вырастет одновременно. Решение: pre-warming кэша, consistent hashing для равномерного распределения, local in-memory cache (LRU) на каждом поде для top-1000 пользователей -- снижает p99 с 15 мс до 1 мс.
Заблуждение: один scoring tier для всех транзакций
Транзакция на $5 и на $50,000 не должны проходить одинаковый pipeline. Tiered scoring: транзакции < $10 проходят только Rules Engine (2 мс, $0.0001 за транзакцию), \(10-\)100 -- Rules + lightweight ML (10 мс), > $100 -- полный pipeline с графом (60 мс). Это сокращает инфраструктурные расходы на 40-60% при том же качестве детекции для high-value транзакций.
Заблуждение: при отказе ML-модели нужно блокировать все транзакции
Блокировка всех транзакций при отказе ML -- это denial of service для собственных клиентов. Правильная стратегия: graceful degradation. ML упал -> переключиться на Rules Engine only (ловит 60-80% фрода). Feature Store упал -> использовать stale cached features. Graph DB упал -> пропустить граф-фичи, повысить порог для review. Полный отказ -> approve < $100, review > $100. Каждый fallback уровень должен быть протестирован в production (chaos engineering).
Секция для интервью¶
Вопрос: "Как масштабировать систему до 50K TPS?"
Слабый ответ: "Добавить больше серверов."
Сильный ответ: "Пять ключевых решений: (1) Stateless scoring service на K8s -- 100 подов, каждый обрабатывает 500 TPS, HPA по CPU и latency p99. (2) Redis Cluster из 20 шардов для feature store -- 2M ops/sec, < 5 мс на запрос, consistent hashing для равномерной нагрузки. (3) Flink для real-time feature computation -- velocity-фичи вычисляются в sliding windows и пишутся в Redis, не нужно считать at scoring time. (4) Tiered scoring -- 60% транзакций (< $10) проходят только Rules Engine за 2 мс, экономя 40% compute. (5) Batched ML inference -- собираем 32 запроса за 5 мс и делаем один batch predict, latency каждого запроса = 5 мс ожидания + 10 мс inference / 32 = 5.3 мс вместо 30 мс поштучно."
Вопрос: "Что произойдёт при отказе региона?"
Слабый ответ: "Переключим трафик на другой регион."
Сильный ответ: "Multi-region active-passive: primary (US-East) обрабатывает весь трафик, secondary (EU-West, AP-South) получают реплики Feature Store с задержкой < 1 сек. При отказе primary -- DNS failover за 15 мин (RTO). RPO = 1 мин (последняя реплика). Нюансы: (1) velocity-фичи в secondary регионе будут stale на время failover -- повышаем пороги review; (2) Graph DB -- causal cluster Neo4j, read replica в каждом регионе; (3) Kafka -- MirrorMaker для cross-region replication; (4) модели хранятся в S3 с multi-region replication, загружаются при старте пода. Chaos engineering: раз в квартал тестируем region failover в production."