Перейти к содержанию

Масштабирование системы обнаружения мошенничества

~4 минуты чтения

Предварительно: Компоненты, Метрики

Fraud detection -- одна из самых требовательных к масштабированию ML-систем. Visa обрабатывает 65,000 TPS в пике (Black Friday), при этом каждая транзакция должна быть оценена за < 100 мс с availability 99.99% (52 мин downtime в год). Это означает: 20+ stateless scoring pods с auto-scaling, Redis cluster из 20 шардов (200 GB, 2M ops/sec) для feature store, и latency budget распределённый побайтно: 15 мс на фичи, 5 мс на правила, 30 мс на ML inference, 18 мс на граф, 3 мс на решение. При этом инфраструктура стоит ~$57K/мес, а один час простоя -- $2-5M потерь от пропущенного фрода.

Traffic Patterns

Transaction Volume

graph LR
    subgraph DAILY["Daily Pattern (TPS)"]
        NIGHT["00:00-06:00<br/>10K TPS<br/>(off-peak)"]
        MORNING["06:00-12:00<br/>20-30K TPS<br/>(ramp-up)"]
        PEAK["12:00-20:00<br/>35-50K TPS<br/>(peak)"]
        EVENING["20:00-00:00<br/>20-30K TPS<br/>(wind-down)"]

        NIGHT --> MORNING --> PEAK --> EVENING
    end

    style NIGHT fill:#e8eaf6,stroke:#3f51b5
    style MORNING fill:#fff3e0,stroke:#ef6c00
    style PEAK fill:#fce4ec,stroke:#c62828
    style EVENING fill:#fff3e0,stroke:#ef6c00

Peak: 50K TPS (Black Friday) | Normal peak: 35K TPS (evening) | Off-peak: 10K TPS (night)

Latency Budget

Total budget: 100ms (p99)

+----------------------+--------+--------------+------------------+
| Stage                | Budget | Actual (p50) | Actual (p99)     |
+----------------------+--------+--------------+------------------+
| Feature Retrieval    | 20ms   | 5ms          | 15ms             |
| Rules Engine         | 10ms   | 2ms          | 5ms              |
| ML Model Inference   | 40ms   | 15ms         | 30ms             |
| Graph Query          | 20ms   | 8ms          | 18ms             |
| Decision Engine      | 5ms    | 1ms          | 3ms              |
| Response             | 5ms    | 1ms          | 3ms              |
+----------------------+--------+--------------+------------------+
| TOTAL                | 100ms  | 32ms         | 74ms             |
+----------------------+--------+--------------+------------------+

Horizontal Scaling

Service Scaling

# HPA for fraud scoring service
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: fraud-scoring-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: fraud-scoring
  minReplicas: 20
  maxReplicas: 200
  metrics:
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 60
    - type: Pods
      pods:
        metric:
          name: request_latency_p99
        target:
          type: AverageValue
          averageValue: "80ms"
  behavior:
    scaleUp:
      stabilizationWindowSeconds: 30
      policies:
        - type: Pods
          value: 10
          periodSeconds: 30
    scaleDown:
      stabilizationWindowSeconds: 300

Database Scaling

Component Strategy Capacity
Feature Store (Redis) Cluster, 20 shards 200GB, 2M ops/s
Graph DB (Neo4j) Causal cluster, 5 nodes 100M nodes, 50K qps
Rules DB (Postgres) Read replicas 10K qps
Model Store (S3) Multi-region 100GB models
Event Store (Kafka) 20 partitions 100K msg/s

Real-time Feature Computation

Streaming Architecture

graph LR
    TXN["Transaction<br/>Events"] --> KAFKA["Kafka<br/>(buffer)"] --> FLINK["Flink<br/>(compute)"]
    FLINK --> VEL["Velocity Counters<br/>(Redis)"]
    FLINK --> GRAPH["Graph Updates<br/>(Neo4j)"]
    FLINK --> PROF["Profile Updates<br/>(Redis)"]

    style TXN fill:#e8eaf6,stroke:#3f51b5
    style KAFKA fill:#fff3e0,stroke:#ef6c00
    style FLINK fill:#f3e5f5,stroke:#9c27b0
    style VEL fill:#e8f5e9,stroke:#4caf50
    style GRAPH fill:#e8f5e9,stroke:#4caf50
    style PROF fill:#e8f5e9,stroke:#4caf50
class VelocityComputation(FlinkJob):
    """
    Real-time velocity feature computation
    """

    def process(self):
        # Read from Kafka
        transactions = self.env.add_source(
            FlinkKafkaConsumer(
                "transactions",
                TransactionSchema(),
                kafka_properties
            )
        )

        # Compute velocity in sliding windows
        velocity_1h = (
            transactions
            .key_by(lambda t: t.user_id)
            .window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(1)))
            .aggregate(VelocityAggregator())
        )

        # Write to Redis
        velocity_1h.add_sink(
            RedisSink(
                key_template="user:{user_id}:velocity_1h",
                ttl_seconds=3600
            )
        )

class VelocityAggregator(AggregateFunction):
    def create_accumulator(self):
        return {"count": 0, "amount": 0.0, "merchants": set()}

    def add(self, transaction, accumulator):
        accumulator["count"] += 1
        accumulator["amount"] += transaction.amount
        accumulator["merchants"].add(transaction.merchant_id)
        return accumulator

    def get_result(self, accumulator):
        return {
            "txn_count_1h": accumulator["count"],
            "txn_amount_1h": accumulator["amount"],
            "unique_merchants_1h": len(accumulator["merchants"]),
        }

High Availability

Multi-Region Setup

graph TD
    GLB["GLOBAL LOAD BALANCER"] --> US["US-EAST<br/>(Primary)"]
    GLB --> EU["EU-WEST<br/>(Secondary)"]
    GLB --> AP["AP-SOUTH<br/>(Secondary)"]

    US --> FS1["Feature Store<br/>(Primary)"]
    EU --> FS2["Feature Store<br/>(Replica)"]
    AP --> FS3["Feature Store<br/>(Replica)"]

    FS1 <-.->|"replication"| FS2
    FS1 <-.->|"replication"| FS3

    style GLB fill:#e8eaf6,stroke:#3f51b5
    style US fill:#e8f5e9,stroke:#4caf50
    style EU fill:#fff3e0,stroke:#ef6c00
    style AP fill:#fff3e0,stroke:#ef6c00
    style FS1 fill:#f3e5f5,stroke:#9c27b0
    style FS2 fill:#f3e5f5,stroke:#9c27b0
    style FS3 fill:#f3e5f5,stroke:#9c27b0

Failover Strategy

class FailoverManager:
    """
    Handle component failures gracefully
    """

    def __init__(self):
        self.fallback_strategies = {
            "ml_model": self.fallback_to_rules,
            "feature_store": self.fallback_to_cache,
            "graph_db": self.skip_graph_features,
        }

    async def score_with_failover(self, transaction):
        """
        Score transaction with automatic failover
        """
        try:
            # Try full pipeline
            return await self.full_scoring_pipeline(transaction)

        except FeatureStoreTimeout:
            # Fallback: use cached features
            return await self.score_with_cached_features(transaction)

        except MLModelTimeout:
            # Fallback: rules only
            return await self.rules_only_scoring(transaction)

        except Exception as e:
            # Ultimate fallback: default decision
            logger.error(f"Scoring failed: {e}")
            return self.default_decision(transaction)

    def default_decision(self, transaction):
        """
        Conservative default when everything fails
        """
        if transaction.amount > 1000:
            return Decision(action="review", reason="system_fallback")
        else:
            return Decision(action="approve", reason="low_amount_fallback")

Circuit Breaker

from circuitbreaker import circuit

class FraudScoringService:

    @circuit(
        failure_threshold=5,
        recovery_timeout=30,
        expected_exception=TimeoutError
    )
    async def get_ml_prediction(self, features):
        """ML prediction with circuit breaker"""
        return await self.ml_client.predict(features)

    @circuit(
        failure_threshold=10,
        recovery_timeout=60,
    )
    async def get_graph_features(self, user_id, device_id):
        """Graph features with circuit breaker"""
        return await self.graph_client.query(user_id, device_id)

Model Serving Optimization

Model Quantization

def optimize_fraud_model(model_path: str) -> str:
    """
    Quantize model for faster inference
    """
    import onnx
    from onnxruntime.quantization import quantize_dynamic

    # Load ONNX model
    model = onnx.load(model_path)

    # Dynamic quantization (INT8)
    quantized_path = model_path.replace(".onnx", "_quantized.onnx")
    quantize_dynamic(
        model_path,
        quantized_path,
        weight_type=QuantType.QUInt8
    )

    # Benchmark
    original_latency = benchmark(model_path)
    quantized_latency = benchmark(quantized_path)

    print(f"Speedup: {original_latency / quantized_latency:.2f}x")

    return quantized_path

Batching Strategy

class BatchedModelServer:
    """
    Batch predictions for efficiency
    """

    def __init__(self):
        self.batch_queue = asyncio.Queue()
        self.batch_size = 32
        self.batch_timeout_ms = 5  # Max wait time

    async def predict_single(self, features) -> float:
        """
        Single prediction (batched internally)
        """
        future = asyncio.Future()
        await self.batch_queue.put((features, future))
        return await future

    async def batch_worker(self):
        """
        Background worker that processes batches
        """
        while True:
            batch = []
            deadline = time.monotonic() + self.batch_timeout_ms / 1000

            # Collect batch
            while len(batch) < self.batch_size:
                remaining = deadline - time.monotonic()
                if remaining <= 0:
                    break
                try:
                    item = await asyncio.wait_for(
                        self.batch_queue.get(),
                        timeout=remaining
                    )
                    batch.append(item)
                except asyncio.TimeoutError:
                    break

            if not batch:
                continue

            # Process batch
            features = [f for f, _ in batch]
            predictions = self.model.predict_batch(features)

            # Return results
            for (_, future), pred in zip(batch, predictions):
                future.set_result(pred)

Cost Optimization

Cost Breakdown

Component Type Count Cost/Month
Fraud Scoring Pods c5.2xlarge 50 $12,000
ML Model Serving g4dn.xlarge 10 $5,200
Feature Store (Redis) r5.4xlarge 20 $16,000
Graph DB (Neo4j) r5.8xlarge 5 $10,000
Kafka Cluster kafka.m5.large 10 $5,000
Flink Cluster c5.4xlarge 10 $6,000
Data Transfer - - $3,000
TOTAL ~$57,200

Cost Reduction Strategies

  1. Tiered Scoring: Simple rules for low-risk transactions
  2. Spot Instances: For batch processing jobs
  3. Reserved Instances: For baseline capacity
  4. Model Pruning: Smaller models for faster inference
  5. Feature Selection: Fewer features = less storage
class TieredScoring:
    """
    Different scoring tiers based on transaction risk
    """

    def score(self, transaction):
        # Tier 1: Rules only (fast, cheap)
        if transaction.amount < 10:
            return self.rules_only_score(transaction)

        # Tier 2: Rules + simple ML
        if transaction.amount < 100:
            return self.simple_ml_score(transaction)

        # Tier 3: Full pipeline (expensive)
        return self.full_pipeline_score(transaction)

Disaster Recovery

Recovery Procedures

Scenario RTO RPO Recovery Action
Pod failure 30s 0 K8s auto-restart
Node failure 2min 0 Pod rescheduling
Redis shard failure 5min 1min Failover to replica
Graph DB failure 10min 5min Failover to read replica
Region failure 15min 1min DNS failover
Model corruption 30min N/A Rollback to previous version

Backup Strategy

# Daily backups
backup_schedule:
  feature_store:
    type: redis_snapshot
    frequency: every_4_hours
    retention: 7_days

  graph_db:
    type: neo4j_backup
    frequency: daily
    retention: 30_days

  training_data:
    type: s3_versioning
    frequency: continuous
    retention: 90_days

  models:
    type: mlflow_registry
    retention: all_versions

Заблуждение: горизонтальное масштабирование решает все проблемы с латентностью

Добавление подов не поможет, если bottleneck -- Feature Store или Graph DB. При 50K TPS каждый запрос к Redis занимает 5-15 мс. Если Redis cluster отвечает медленно (сетевой джиттер, hot key), латентность всех 200 подов вырастет одновременно. Решение: pre-warming кэша, consistent hashing для равномерного распределения, local in-memory cache (LRU) на каждом поде для top-1000 пользователей -- снижает p99 с 15 мс до 1 мс.

Заблуждение: один scoring tier для всех транзакций

Транзакция на $5 и на $50,000 не должны проходить одинаковый pipeline. Tiered scoring: транзакции < $10 проходят только Rules Engine (2 мс, $0.0001 за транзакцию), \(10-\)100 -- Rules + lightweight ML (10 мс), > $100 -- полный pipeline с графом (60 мс). Это сокращает инфраструктурные расходы на 40-60% при том же качестве детекции для high-value транзакций.

Заблуждение: при отказе ML-модели нужно блокировать все транзакции

Блокировка всех транзакций при отказе ML -- это denial of service для собственных клиентов. Правильная стратегия: graceful degradation. ML упал -> переключиться на Rules Engine only (ловит 60-80% фрода). Feature Store упал -> использовать stale cached features. Graph DB упал -> пропустить граф-фичи, повысить порог для review. Полный отказ -> approve < $100, review > $100. Каждый fallback уровень должен быть протестирован в production (chaos engineering).

Секция для интервью

Вопрос: "Как масштабировать систему до 50K TPS?"

❌ Слабый ответ: "Добавить больше серверов."

✅ Сильный ответ: "Пять ключевых решений: (1) Stateless scoring service на K8s -- 100 подов, каждый обрабатывает 500 TPS, HPA по CPU и latency p99. (2) Redis Cluster из 20 шардов для feature store -- 2M ops/sec, < 5 мс на запрос, consistent hashing для равномерной нагрузки. (3) Flink для real-time feature computation -- velocity-фичи вычисляются в sliding windows и пишутся в Redis, не нужно считать at scoring time. (4) Tiered scoring -- 60% транзакций (< $10) проходят только Rules Engine за 2 мс, экономя 40% compute. (5) Batched ML inference -- собираем 32 запроса за 5 мс и делаем один batch predict, latency каждого запроса = 5 мс ожидания + 10 мс inference / 32 = 5.3 мс вместо 30 мс поштучно."

Вопрос: "Что произойдёт при отказе региона?"

❌ Слабый ответ: "Переключим трафик на другой регион."

✅ Сильный ответ: "Multi-region active-passive: primary (US-East) обрабатывает весь трафик, secondary (EU-West, AP-South) получают реплики Feature Store с задержкой < 1 сек. При отказе primary -- DNS failover за 15 мин (RTO). RPO = 1 мин (последняя реплика). Нюансы: (1) velocity-фичи в secondary регионе будут stale на время failover -- повышаем пороги review; (2) Graph DB -- causal cluster Neo4j, read replica в каждом регионе; (3) Kafka -- MirrorMaker для cross-region replication; (4) модели хранятся в S3 с multi-region replication, загружаются при старте пода. Chaos engineering: раз в квартал тестируем region failover в production."