Перейти к содержанию

Компоненты системы обнаружения мошенничества

~2 минуты чтения

Предварительно: Определение задачи, Ансамблевые методы

Архитектура production fraud detection -- это не одна модель, а оркестрация 4-5 независимых компонентов. Rules Engine обрабатывает 60-80% транзакций за 2-5 мс (blocklists, velocity checks), ML-ансамбль покрывает сложные паттерны за 15-30 мс, граф-анализ выявляет связи между сущностями (fraud rings из 10-50 аккаунтов), а Decision Engine комбинирует все сигналы и маршрутизирует 1-3% транзакций на ручной review. Каждый компонент имеет circuit breaker и fallback -- система не должна падать из-за одного сломанного модуля.

High-Level Architecture

graph TD
    A["Transaction Request<br/>POST /transactions/authorize"] --> B["API Gateway<br/>Auth, Rate Limiting"]
    B --> C["Fraud Scoring Service"]
    B --> D["Payment Engine<br/>(parallel)"]

    C --> E["Rules Engine"]
    C --> F["ML Models"]
    C --> G["Graph Analysis"]
    C --> H["3rd Party"]

    E & F & G & H --> I["Decision Engine<br/>(Orchestrator)"]

    I -->|score < 0.3| J["APPROVE"]
    I -->|0.3 - 0.7| K["REVIEW"]
    I -->|score > 0.7| L["DECLINE"]

    K --> M["Case Management"]
    L -.-> M

    style I fill:#e8eaf6,stroke:#3f51b5
    style J fill:#e8f5e9,stroke:#4caf50
    style K fill:#fff3e0,stroke:#ff9800
    style L fill:#ffebee,stroke:#f44336

Component Details

1. Rules Engine

Цель: Быстрые детерминистические правила для очевидного фрода

class RulesEngine:
    """
    Deterministic rules for clear-cut fraud cases
    """

    def __init__(self):
        self.rules = [
            BlocklistRule(),           # Blocked users/devices/IPs
            VelocityRule(),            # Too many transactions
            AmountRule(),              # Unusual amounts
            GeoRule(),                 # Impossible travel
            CardTestingRule(),         # Small amounts testing
            MerchantRiskRule(),        # High-risk merchants
        ]

    def evaluate(self, transaction: Transaction) -> RuleResult:
        """
        Evaluate all rules in order
        """
        triggered_rules = []

        for rule in self.rules:
            result = rule.check(transaction)
            if result.triggered:
                triggered_rules.append(result)

                if result.action == "block":
                    return RuleResult(
                        decision="block",
                        reason=result.reason,
                        triggered_rules=triggered_rules
                    )

        return RuleResult(
            decision="continue",  # pass to ML
            triggered_rules=triggered_rules
        )

class VelocityRule:
    """Example: Block if too many transactions"""

    def check(self, txn: Transaction) -> RuleCheckResult:
        user_velocity = self.get_user_velocity(txn.user_id)

        # More than 10 transactions in 1 hour
        if user_velocity.count_1h > 10:
            return RuleCheckResult(
                triggered=True,
                action="block",
                reason="velocity_exceeded",
                details={"count_1h": user_velocity.count_1h}
            )

        # More than 5 transactions to same merchant in 1 hour
        if user_velocity.same_merchant_1h > 5:
            return RuleCheckResult(
                triggered=True,
                action="review",
                reason="merchant_velocity",
                details={"same_merchant_1h": user_velocity.same_merchant_1h}
            )

        return RuleCheckResult(triggered=False)

class ImpossibleTravelRule:
    """Detect physically impossible travel"""

    def check(self, txn: Transaction) -> RuleCheckResult:
        last_txn = self.get_last_transaction(txn.user_id)

        if not last_txn:
            return RuleCheckResult(triggered=False)

        distance_km = haversine(
            (txn.lat, txn.lon),
            (last_txn.lat, last_txn.lon)
        )
        time_diff_hours = (txn.timestamp - last_txn.timestamp).total_seconds() / 3600

        # Max human travel speed ~900 km/h (airplane)
        if distance_km / time_diff_hours > 1000:
            return RuleCheckResult(
                triggered=True,
                action="block",
                reason="impossible_travel",
                details={
                    "distance_km": distance_km,
                    "time_diff_hours": time_diff_hours
                }
            )

        return RuleCheckResult(triggered=False)

2. ML Models

Цель: Ловить сложные паттерны, которые не покрываются правилами

class FraudMLModels:
    """
    Ensemble of ML models for fraud detection
    """

    def __init__(self):
        self.models = {
            "xgboost": XGBoostClassifier(),      # Primary model
            "neural": NeuralNetwork(),            # Deep learning
            "isolation_forest": IsolationForest(), # Anomaly detection
            "autoencoder": Autoencoder(),         # Reconstruction error
        }
        self.ensemble_weights = {
            "xgboost": 0.4,
            "neural": 0.3,
            "isolation_forest": 0.15,
            "autoencoder": 0.15,
        }

    def predict(self, features: dict) -> FraudPrediction:
        """
        Ensemble prediction with model diversity
        """
        predictions = {}

        for name, model in self.models.items():
            pred = model.predict_proba(features)
            predictions[name] = pred

        # Weighted ensemble
        final_score = sum(
            self.ensemble_weights[name] * pred
            for name, pred in predictions.items()
        )

        return FraudPrediction(
            score=final_score,
            model_scores=predictions,
            top_features=self.get_feature_importance(features)
        )

class XGBoostFraudModel:
    """
    Gradient boosted trees - main workhorse
    """

    def __init__(self):
        self.model = xgb.XGBClassifier(
            n_estimators=500,
            max_depth=8,
            learning_rate=0.05,
            scale_pos_weight=100,  # Handle imbalance
            eval_metric="aucpr",
        )

    def train(self, X, y, sample_weight=None):
        """
        Train with cost-sensitive learning
        """
        # Higher weight for fraud cases
        if sample_weight is None:
            sample_weight = np.where(y == 1, 100, 1)

        self.model.fit(
            X, y,
            sample_weight=sample_weight,
            eval_set=[(X_val, y_val)],
            early_stopping_rounds=50,
        )

class AnomalyDetector:
    """
    Unsupervised anomaly detection for novel fraud patterns
    """

    def __init__(self):
        self.isolation_forest = IsolationForest(
            n_estimators=100,
            contamination=0.01,
            random_state=42
        )
        self.autoencoder = self.build_autoencoder()

    def build_autoencoder(self):
        """Reconstruction-based anomaly detection"""
        input_dim = 100

        encoder = nn.Sequential(
            nn.Linear(input_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, 16),
        )

        decoder = nn.Sequential(
            nn.Linear(16, 32),
            nn.ReLU(),
            nn.Linear(32, 64),
            nn.ReLU(),
            nn.Linear(64, input_dim),
        )

        return nn.Sequential(encoder, decoder)

    def get_anomaly_score(self, features):
        """
        Combine IF and AE scores
        """
        # Isolation Forest: -1 for anomaly, 1 for normal
        if_score = -self.isolation_forest.score_samples(features)

        # Autoencoder: reconstruction error
        reconstruction = self.autoencoder(features)
        ae_score = F.mse_loss(reconstruction, features, reduction='none').mean(dim=1)

        # Normalize and combine
        combined = 0.5 * normalize(if_score) + 0.5 * normalize(ae_score)
        return combined

3. Graph Analysis

Цель: Обнаружение связей между подозрительными сущностями

class FraudGraphAnalyzer:
    """
    Graph-based fraud detection using entity relationships
    """

    def __init__(self):
        self.graph = Neo4jClient()

    def analyze_transaction(self, txn: Transaction) -> GraphFeatures:
        """
        Extract graph-based features
        """
        # Find connected entities
        device_connections = self.get_device_graph(txn.device_id)
        ip_connections = self.get_ip_graph(txn.ip_address)
        user_connections = self.get_user_graph(txn.user_id)

        return GraphFeatures(
            # Device sharing
            users_sharing_device=len(device_connections.users),
            fraud_users_on_device=device_connections.fraud_count,
            device_first_seen_days=device_connections.age_days,

            # IP analysis
            users_on_ip=len(ip_connections.users),
            fraud_users_on_ip=ip_connections.fraud_count,
            is_datacenter_ip=ip_connections.is_datacenter,

            # User network
            user_degree=user_connections.degree,
            user_community_fraud_rate=user_connections.community_fraud_rate,
            hops_to_known_fraudster=user_connections.fraud_distance,

            # Risk propagation
            neighbor_risk_score=self.propagate_risk(txn.user_id),
        )

    def propagate_risk(self, user_id: str, hops: int = 2) -> float:
        """
        PageRank-style risk propagation
        """
        query = """
        MATCH (u:User {id: $user_id})-[:CONNECTED*1..2]-(neighbor:User)
        WHERE neighbor.is_fraud = true
        RETURN count(neighbor) as fraud_neighbors,
               avg(neighbor.fraud_score) as avg_fraud_score
        """
        result = self.graph.query(query, user_id=user_id)
        return result.avg_fraud_score * (1 + 0.1 * result.fraud_neighbors)

    def detect_fraud_rings(self) -> List[FraudRing]:
        """
        Community detection for fraud ring identification
        """
        query = """
        CALL gds.louvain.stream('fraud-graph')
        YIELD nodeId, communityId
        WITH communityId, collect(nodeId) as members
        WHERE size(members) > 5
        MATCH (u:User) WHERE id(u) IN members AND u.is_fraud = true
        WITH communityId, members, fraud_count
        WHERE fraud_count > 2
        RETURN communityId, members, fraud_count
        """
        return self.graph.query(query)

4. Decision Engine

Цель: Оркестрация всех сигналов и финальное решение

class DecisionEngine:
    """
    Orchestrate all signals and make final decision
    """

    def __init__(self):
        self.thresholds = DecisionThresholds(
            auto_approve=0.1,
            review=0.5,
            auto_decline=0.9,
        )
        self.action_handlers = {
            "approve": self.handle_approve,
            "review": self.handle_review,
            "decline": self.handle_decline,
            "challenge": self.handle_challenge,  # 3DS, OTP
        }

    def decide(
        self,
        transaction: Transaction,
        rule_result: RuleResult,
        ml_prediction: FraudPrediction,
        graph_features: GraphFeatures,
    ) -> Decision:
        """
        Make final decision based on all signals
        """
        # Rule-based override
        if rule_result.decision == "block":
            return Decision(
                action="decline",
                reason=rule_result.reason,
                score=1.0,
                confidence="high"
            )

        # Combine scores
        final_score = self.combine_scores(
            ml_score=ml_prediction.score,
            graph_score=self.score_from_graph(graph_features),
            rule_score=self.score_from_rules(rule_result)
        )

        # Apply thresholds
        if final_score < self.thresholds.auto_approve:
            action = "approve"
        elif final_score < self.thresholds.review:
            action = "approve"  # with monitoring
        elif final_score < self.thresholds.auto_decline:
            action = "review"
        else:
            action = "decline"

        # Special handling for high-value transactions
        if transaction.amount > 10000 and action == "approve":
            action = "review"

        return Decision(
            action=action,
            score=final_score,
            reason=self.generate_reason(ml_prediction, graph_features),
            confidence=self.calculate_confidence(ml_prediction),
            requires_challenge=action == "challenge"
        )

    def combine_scores(self, ml_score, graph_score, rule_score):
        """
        Weighted combination with confidence adjustment
        """
        weights = {
            "ml": 0.5,
            "graph": 0.3,
            "rules": 0.2,
        }

        return (
            weights["ml"] * ml_score +
            weights["graph"] * graph_score +
            weights["rules"] * rule_score
        )

5. Case Management System

Цель: Workflow для ручных расследований

class CaseManagementSystem:
    """
    Handle manual review workflow
    """

    def __init__(self):
        self.queue = CaseQueue()
        self.assignment = CaseAssignment()

    def create_case(
        self,
        transaction: Transaction,
        decision: Decision
    ) -> Case:
        """
        Create case for manual review
        """
        case = Case(
            id=uuid4(),
            transaction=transaction,
            fraud_score=decision.score,
            reason=decision.reason,
            priority=self.calculate_priority(transaction, decision),
            assigned_to=None,
            status="pending",
            created_at=datetime.now(),
            sla_deadline=self.calculate_sla(transaction),
        )

        # Add to queue
        self.queue.add(case)

        # Auto-assign if available analysts
        self.try_assign(case)

        return case

    def calculate_priority(self, txn, decision) -> int:
        """
        Priority based on amount and score
        """
        amount_score = min(txn.amount / 10000, 1.0)
        fraud_score = decision.score

        priority = int((amount_score * 0.5 + fraud_score * 0.5) * 10)
        return min(priority, 10)  # 1-10 scale

    def submit_decision(
        self,
        case_id: str,
        analyst_id: str,
        decision: str,  # "fraud" or "legitimate"
        notes: str
    ):
        """
        Analyst submits decision
        """
        case = self.get_case(case_id)
        case.decision = decision
        case.decided_by = analyst_id
        case.decided_at = datetime.now()
        case.notes = notes
        case.status = "completed"

        # Trigger actions
        if decision == "fraud":
            self.handle_fraud_confirmed(case)
        else:
            self.handle_legitimate_confirmed(case)

        # Update ML labels
        self.send_to_label_store(case)

Infrastructure

Real-time Serving

apiVersion: apps/v1
kind: Deployment
metadata:
  name: fraud-scoring-service
spec:
  replicas: 20
  template:
    spec:
      containers:
        - name: fraud-scorer
          image: fraud-scoring:v2.1
          resources:
            requests:
              memory: "4Gi"
              cpu: "2"
            limits:
              memory: "8Gi"
              cpu: "4"
          env:
            - name: MODEL_PATH
              value: "s3://models/fraud/v2.1"
            - name: FEATURE_STORE_URL
              value: "redis-cluster:6379"
          livenessProbe:
            httpGet:
              path: /health
              port: 8080
            initialDelaySeconds: 10
            periodSeconds: 5

Feature Store

class FraudFeatureStore:
    """
    Real-time feature serving for fraud detection
    """

    def __init__(self):
        self.redis = RedisCluster()
        self.ttl = {
            "user_velocity": 3600,      # 1 hour
            "device_features": 86400,   # 1 day
            "user_profile": 86400,      # 1 day
        }

    async def get_features(self, user_id: str, device_id: str) -> dict:
        """
        Get all features for scoring
        """
        # Parallel feature fetch
        user_vel, device_feat, user_prof = await asyncio.gather(
            self.get_user_velocity(user_id),
            self.get_device_features(device_id),
            self.get_user_profile(user_id),
        )

        return {
            **user_vel,
            **device_feat,
            **user_prof,
        }

    async def update_velocity(self, user_id: str, txn: Transaction):
        """
        Update velocity counters in real-time
        """
        pipe = self.redis.pipeline()

        # Increment counters
        pipe.incr(f"user:{user_id}:txn_count_1h")
        pipe.expire(f"user:{user_id}:txn_count_1h", 3600)

        pipe.incrbyfloat(f"user:{user_id}:txn_amount_1h", txn.amount)
        pipe.expire(f"user:{user_id}:txn_amount_1h", 3600)

        # Add to sets for unique counts
        pipe.sadd(f"user:{user_id}:merchants_24h", txn.merchant_id)
        pipe.expire(f"user:{user_id}:merchants_24h", 86400)

        await pipe.execute()

Заблуждение: ML-модель должна быть одна и самая точная

В production fraud detection ансамбль из 3-4 моделей (XGBoost + нейросеть + Isolation Forest) работает лучше, чем одна сложная модель. Причина: разные модели ловят разные типы фрода. XGBoost отлично работает с табличными данными и velocity-фичами (recall 95%+), Isolation Forest находит novel-паттерны без лейблов, а нейросеть захватывает нелинейные взаимодействия. При отказе одной модели ансамбль деградирует gracefully.

Заблуждение: граф-анализ можно делать всегда в реальном времени

Запрос к Neo4j на 2 хопа (найти соседей соседей) занимает 8-18 мс. Но при глубоких запросах (3+ хопа) или больших community detection (Louvain) латентность вырастает до 100+ мс. В production граф-фичи предвычисляются batch-pipeline (каждые 15-30 мин), а real-time запросы ограничивают 1-2 хопами с таймаутом 20 мс и fallback на кэшированные значения.

Заблуждение: Decision Engine -- это просто if/else по порогам

Кроме порогов, Decision Engine учитывает: сумму транзакции (> $10K -- автоматически на review даже при низком скоре), merchant risk category, время суток, историю пользователя, regulatory requirements (PCI DSS для определённых MCC-кодов). В Stripe/PayPal Decision Engine содержит 50-100 правил помимо ML-скора.

Секция для интервью

Вопрос: "Зачем Rules Engine если есть ML?"

❌ Слабый ответ: "Для скорости."

✅ Сильный ответ: "Rules Engine решает 3 задачи, которые ML не может: (1) мгновенная блокировка -- blocklists и impossible travel отрабатывают за 2-5 мс без ожидания фичей и модели; (2) детерминистичность -- регулятор может потребовать объяснить каждое решение, правило 'более 10 транзакций в час' объяснимо, вероятность 0.73 -- нет; (3) оперативность -- новый fraud-паттерн можно закрыть правилом за минуты, а переобучение модели занимает часы. В Stripe Rules Engine обрабатывает 60-80% транзакций и только оставшиеся 20-40% идут в ML-пайплайн."

Вопрос: "Как обрабатывать отказ одного из компонентов?"

❌ Слабый ответ: "Логировать ошибку и вернуть ответ по умолчанию."

✅ Сильный ответ: "Каждый компонент обёрнут в circuit breaker. Если ML-модель не отвечает 5 раз подряд -- переключаемся на rules-only scoring (деградация, не отказ). Если Feature Store недоступен -- используем кэшированные фичи с пометкой 'stale'. Если Graph DB упал -- пропускаем граф-фичи, но повышаем порог для review. Для транзакций > $1000 при любом отказе -- auto-review. Default decision при полном отказе: approve для сумм < $100, review для остального. Всё это настраивается через FailoverManager с приоритетами."