Компоненты системы обнаружения мошенничества¶
~2 минуты чтения
Предварительно: Определение задачи, Ансамблевые методы
Архитектура production fraud detection -- это не одна модель, а оркестрация 4-5 независимых компонентов. Rules Engine обрабатывает 60-80% транзакций за 2-5 мс (blocklists, velocity checks), ML-ансамбль покрывает сложные паттерны за 15-30 мс, граф-анализ выявляет связи между сущностями (fraud rings из 10-50 аккаунтов), а Decision Engine комбинирует все сигналы и маршрутизирует 1-3% транзакций на ручной review. Каждый компонент имеет circuit breaker и fallback -- система не должна падать из-за одного сломанного модуля.
High-Level Architecture¶
graph TD
A["Transaction Request<br/>POST /transactions/authorize"] --> B["API Gateway<br/>Auth, Rate Limiting"]
B --> C["Fraud Scoring Service"]
B --> D["Payment Engine<br/>(parallel)"]
C --> E["Rules Engine"]
C --> F["ML Models"]
C --> G["Graph Analysis"]
C --> H["3rd Party"]
E & F & G & H --> I["Decision Engine<br/>(Orchestrator)"]
I -->|score < 0.3| J["APPROVE"]
I -->|0.3 - 0.7| K["REVIEW"]
I -->|score > 0.7| L["DECLINE"]
K --> M["Case Management"]
L -.-> M
style I fill:#e8eaf6,stroke:#3f51b5
style J fill:#e8f5e9,stroke:#4caf50
style K fill:#fff3e0,stroke:#ff9800
style L fill:#ffebee,stroke:#f44336
Component Details¶
1. Rules Engine¶
Цель: Быстрые детерминистические правила для очевидного фрода
class RulesEngine:
"""
Deterministic rules for clear-cut fraud cases
"""
def __init__(self):
self.rules = [
BlocklistRule(), # Blocked users/devices/IPs
VelocityRule(), # Too many transactions
AmountRule(), # Unusual amounts
GeoRule(), # Impossible travel
CardTestingRule(), # Small amounts testing
MerchantRiskRule(), # High-risk merchants
]
def evaluate(self, transaction: Transaction) -> RuleResult:
"""
Evaluate all rules in order
"""
triggered_rules = []
for rule in self.rules:
result = rule.check(transaction)
if result.triggered:
triggered_rules.append(result)
if result.action == "block":
return RuleResult(
decision="block",
reason=result.reason,
triggered_rules=triggered_rules
)
return RuleResult(
decision="continue", # pass to ML
triggered_rules=triggered_rules
)
class VelocityRule:
"""Example: Block if too many transactions"""
def check(self, txn: Transaction) -> RuleCheckResult:
user_velocity = self.get_user_velocity(txn.user_id)
# More than 10 transactions in 1 hour
if user_velocity.count_1h > 10:
return RuleCheckResult(
triggered=True,
action="block",
reason="velocity_exceeded",
details={"count_1h": user_velocity.count_1h}
)
# More than 5 transactions to same merchant in 1 hour
if user_velocity.same_merchant_1h > 5:
return RuleCheckResult(
triggered=True,
action="review",
reason="merchant_velocity",
details={"same_merchant_1h": user_velocity.same_merchant_1h}
)
return RuleCheckResult(triggered=False)
class ImpossibleTravelRule:
"""Detect physically impossible travel"""
def check(self, txn: Transaction) -> RuleCheckResult:
last_txn = self.get_last_transaction(txn.user_id)
if not last_txn:
return RuleCheckResult(triggered=False)
distance_km = haversine(
(txn.lat, txn.lon),
(last_txn.lat, last_txn.lon)
)
time_diff_hours = (txn.timestamp - last_txn.timestamp).total_seconds() / 3600
# Max human travel speed ~900 km/h (airplane)
if distance_km / time_diff_hours > 1000:
return RuleCheckResult(
triggered=True,
action="block",
reason="impossible_travel",
details={
"distance_km": distance_km,
"time_diff_hours": time_diff_hours
}
)
return RuleCheckResult(triggered=False)
2. ML Models¶
Цель: Ловить сложные паттерны, которые не покрываются правилами
class FraudMLModels:
"""
Ensemble of ML models for fraud detection
"""
def __init__(self):
self.models = {
"xgboost": XGBoostClassifier(), # Primary model
"neural": NeuralNetwork(), # Deep learning
"isolation_forest": IsolationForest(), # Anomaly detection
"autoencoder": Autoencoder(), # Reconstruction error
}
self.ensemble_weights = {
"xgboost": 0.4,
"neural": 0.3,
"isolation_forest": 0.15,
"autoencoder": 0.15,
}
def predict(self, features: dict) -> FraudPrediction:
"""
Ensemble prediction with model diversity
"""
predictions = {}
for name, model in self.models.items():
pred = model.predict_proba(features)
predictions[name] = pred
# Weighted ensemble
final_score = sum(
self.ensemble_weights[name] * pred
for name, pred in predictions.items()
)
return FraudPrediction(
score=final_score,
model_scores=predictions,
top_features=self.get_feature_importance(features)
)
class XGBoostFraudModel:
"""
Gradient boosted trees - main workhorse
"""
def __init__(self):
self.model = xgb.XGBClassifier(
n_estimators=500,
max_depth=8,
learning_rate=0.05,
scale_pos_weight=100, # Handle imbalance
eval_metric="aucpr",
)
def train(self, X, y, sample_weight=None):
"""
Train with cost-sensitive learning
"""
# Higher weight for fraud cases
if sample_weight is None:
sample_weight = np.where(y == 1, 100, 1)
self.model.fit(
X, y,
sample_weight=sample_weight,
eval_set=[(X_val, y_val)],
early_stopping_rounds=50,
)
class AnomalyDetector:
"""
Unsupervised anomaly detection for novel fraud patterns
"""
def __init__(self):
self.isolation_forest = IsolationForest(
n_estimators=100,
contamination=0.01,
random_state=42
)
self.autoencoder = self.build_autoencoder()
def build_autoencoder(self):
"""Reconstruction-based anomaly detection"""
input_dim = 100
encoder = nn.Sequential(
nn.Linear(input_dim, 64),
nn.ReLU(),
nn.Linear(64, 32),
nn.ReLU(),
nn.Linear(32, 16),
)
decoder = nn.Sequential(
nn.Linear(16, 32),
nn.ReLU(),
nn.Linear(32, 64),
nn.ReLU(),
nn.Linear(64, input_dim),
)
return nn.Sequential(encoder, decoder)
def get_anomaly_score(self, features):
"""
Combine IF and AE scores
"""
# Isolation Forest: -1 for anomaly, 1 for normal
if_score = -self.isolation_forest.score_samples(features)
# Autoencoder: reconstruction error
reconstruction = self.autoencoder(features)
ae_score = F.mse_loss(reconstruction, features, reduction='none').mean(dim=1)
# Normalize and combine
combined = 0.5 * normalize(if_score) + 0.5 * normalize(ae_score)
return combined
3. Graph Analysis¶
Цель: Обнаружение связей между подозрительными сущностями
class FraudGraphAnalyzer:
"""
Graph-based fraud detection using entity relationships
"""
def __init__(self):
self.graph = Neo4jClient()
def analyze_transaction(self, txn: Transaction) -> GraphFeatures:
"""
Extract graph-based features
"""
# Find connected entities
device_connections = self.get_device_graph(txn.device_id)
ip_connections = self.get_ip_graph(txn.ip_address)
user_connections = self.get_user_graph(txn.user_id)
return GraphFeatures(
# Device sharing
users_sharing_device=len(device_connections.users),
fraud_users_on_device=device_connections.fraud_count,
device_first_seen_days=device_connections.age_days,
# IP analysis
users_on_ip=len(ip_connections.users),
fraud_users_on_ip=ip_connections.fraud_count,
is_datacenter_ip=ip_connections.is_datacenter,
# User network
user_degree=user_connections.degree,
user_community_fraud_rate=user_connections.community_fraud_rate,
hops_to_known_fraudster=user_connections.fraud_distance,
# Risk propagation
neighbor_risk_score=self.propagate_risk(txn.user_id),
)
def propagate_risk(self, user_id: str, hops: int = 2) -> float:
"""
PageRank-style risk propagation
"""
query = """
MATCH (u:User {id: $user_id})-[:CONNECTED*1..2]-(neighbor:User)
WHERE neighbor.is_fraud = true
RETURN count(neighbor) as fraud_neighbors,
avg(neighbor.fraud_score) as avg_fraud_score
"""
result = self.graph.query(query, user_id=user_id)
return result.avg_fraud_score * (1 + 0.1 * result.fraud_neighbors)
def detect_fraud_rings(self) -> List[FraudRing]:
"""
Community detection for fraud ring identification
"""
query = """
CALL gds.louvain.stream('fraud-graph')
YIELD nodeId, communityId
WITH communityId, collect(nodeId) as members
WHERE size(members) > 5
MATCH (u:User) WHERE id(u) IN members AND u.is_fraud = true
WITH communityId, members, fraud_count
WHERE fraud_count > 2
RETURN communityId, members, fraud_count
"""
return self.graph.query(query)
4. Decision Engine¶
Цель: Оркестрация всех сигналов и финальное решение
class DecisionEngine:
"""
Orchestrate all signals and make final decision
"""
def __init__(self):
self.thresholds = DecisionThresholds(
auto_approve=0.1,
review=0.5,
auto_decline=0.9,
)
self.action_handlers = {
"approve": self.handle_approve,
"review": self.handle_review,
"decline": self.handle_decline,
"challenge": self.handle_challenge, # 3DS, OTP
}
def decide(
self,
transaction: Transaction,
rule_result: RuleResult,
ml_prediction: FraudPrediction,
graph_features: GraphFeatures,
) -> Decision:
"""
Make final decision based on all signals
"""
# Rule-based override
if rule_result.decision == "block":
return Decision(
action="decline",
reason=rule_result.reason,
score=1.0,
confidence="high"
)
# Combine scores
final_score = self.combine_scores(
ml_score=ml_prediction.score,
graph_score=self.score_from_graph(graph_features),
rule_score=self.score_from_rules(rule_result)
)
# Apply thresholds
if final_score < self.thresholds.auto_approve:
action = "approve"
elif final_score < self.thresholds.review:
action = "approve" # with monitoring
elif final_score < self.thresholds.auto_decline:
action = "review"
else:
action = "decline"
# Special handling for high-value transactions
if transaction.amount > 10000 and action == "approve":
action = "review"
return Decision(
action=action,
score=final_score,
reason=self.generate_reason(ml_prediction, graph_features),
confidence=self.calculate_confidence(ml_prediction),
requires_challenge=action == "challenge"
)
def combine_scores(self, ml_score, graph_score, rule_score):
"""
Weighted combination with confidence adjustment
"""
weights = {
"ml": 0.5,
"graph": 0.3,
"rules": 0.2,
}
return (
weights["ml"] * ml_score +
weights["graph"] * graph_score +
weights["rules"] * rule_score
)
5. Case Management System¶
Цель: Workflow для ручных расследований
class CaseManagementSystem:
"""
Handle manual review workflow
"""
def __init__(self):
self.queue = CaseQueue()
self.assignment = CaseAssignment()
def create_case(
self,
transaction: Transaction,
decision: Decision
) -> Case:
"""
Create case for manual review
"""
case = Case(
id=uuid4(),
transaction=transaction,
fraud_score=decision.score,
reason=decision.reason,
priority=self.calculate_priority(transaction, decision),
assigned_to=None,
status="pending",
created_at=datetime.now(),
sla_deadline=self.calculate_sla(transaction),
)
# Add to queue
self.queue.add(case)
# Auto-assign if available analysts
self.try_assign(case)
return case
def calculate_priority(self, txn, decision) -> int:
"""
Priority based on amount and score
"""
amount_score = min(txn.amount / 10000, 1.0)
fraud_score = decision.score
priority = int((amount_score * 0.5 + fraud_score * 0.5) * 10)
return min(priority, 10) # 1-10 scale
def submit_decision(
self,
case_id: str,
analyst_id: str,
decision: str, # "fraud" or "legitimate"
notes: str
):
"""
Analyst submits decision
"""
case = self.get_case(case_id)
case.decision = decision
case.decided_by = analyst_id
case.decided_at = datetime.now()
case.notes = notes
case.status = "completed"
# Trigger actions
if decision == "fraud":
self.handle_fraud_confirmed(case)
else:
self.handle_legitimate_confirmed(case)
# Update ML labels
self.send_to_label_store(case)
Infrastructure¶
Real-time Serving¶
apiVersion: apps/v1
kind: Deployment
metadata:
name: fraud-scoring-service
spec:
replicas: 20
template:
spec:
containers:
- name: fraud-scorer
image: fraud-scoring:v2.1
resources:
requests:
memory: "4Gi"
cpu: "2"
limits:
memory: "8Gi"
cpu: "4"
env:
- name: MODEL_PATH
value: "s3://models/fraud/v2.1"
- name: FEATURE_STORE_URL
value: "redis-cluster:6379"
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 10
periodSeconds: 5
Feature Store¶
class FraudFeatureStore:
"""
Real-time feature serving for fraud detection
"""
def __init__(self):
self.redis = RedisCluster()
self.ttl = {
"user_velocity": 3600, # 1 hour
"device_features": 86400, # 1 day
"user_profile": 86400, # 1 day
}
async def get_features(self, user_id: str, device_id: str) -> dict:
"""
Get all features for scoring
"""
# Parallel feature fetch
user_vel, device_feat, user_prof = await asyncio.gather(
self.get_user_velocity(user_id),
self.get_device_features(device_id),
self.get_user_profile(user_id),
)
return {
**user_vel,
**device_feat,
**user_prof,
}
async def update_velocity(self, user_id: str, txn: Transaction):
"""
Update velocity counters in real-time
"""
pipe = self.redis.pipeline()
# Increment counters
pipe.incr(f"user:{user_id}:txn_count_1h")
pipe.expire(f"user:{user_id}:txn_count_1h", 3600)
pipe.incrbyfloat(f"user:{user_id}:txn_amount_1h", txn.amount)
pipe.expire(f"user:{user_id}:txn_amount_1h", 3600)
# Add to sets for unique counts
pipe.sadd(f"user:{user_id}:merchants_24h", txn.merchant_id)
pipe.expire(f"user:{user_id}:merchants_24h", 86400)
await pipe.execute()
Заблуждение: ML-модель должна быть одна и самая точная
В production fraud detection ансамбль из 3-4 моделей (XGBoost + нейросеть + Isolation Forest) работает лучше, чем одна сложная модель. Причина: разные модели ловят разные типы фрода. XGBoost отлично работает с табличными данными и velocity-фичами (recall 95%+), Isolation Forest находит novel-паттерны без лейблов, а нейросеть захватывает нелинейные взаимодействия. При отказе одной модели ансамбль деградирует gracefully.
Заблуждение: граф-анализ можно делать всегда в реальном времени
Запрос к Neo4j на 2 хопа (найти соседей соседей) занимает 8-18 мс. Но при глубоких запросах (3+ хопа) или больших community detection (Louvain) латентность вырастает до 100+ мс. В production граф-фичи предвычисляются batch-pipeline (каждые 15-30 мин), а real-time запросы ограничивают 1-2 хопами с таймаутом 20 мс и fallback на кэшированные значения.
Заблуждение: Decision Engine -- это просто if/else по порогам
Кроме порогов, Decision Engine учитывает: сумму транзакции (> $10K -- автоматически на review даже при низком скоре), merchant risk category, время суток, историю пользователя, regulatory requirements (PCI DSS для определённых MCC-кодов). В Stripe/PayPal Decision Engine содержит 50-100 правил помимо ML-скора.
Секция для интервью¶
Вопрос: "Зачем Rules Engine если есть ML?"
Слабый ответ: "Для скорости."
Сильный ответ: "Rules Engine решает 3 задачи, которые ML не может: (1) мгновенная блокировка -- blocklists и impossible travel отрабатывают за 2-5 мс без ожидания фичей и модели; (2) детерминистичность -- регулятор может потребовать объяснить каждое решение, правило 'более 10 транзакций в час' объяснимо, вероятность 0.73 -- нет; (3) оперативность -- новый fraud-паттерн можно закрыть правилом за минуты, а переобучение модели занимает часы. В Stripe Rules Engine обрабатывает 60-80% транзакций и только оставшиеся 20-40% идут в ML-пайплайн."
Вопрос: "Как обрабатывать отказ одного из компонентов?"
Слабый ответ: "Логировать ошибку и вернуть ответ по умолчанию."
Сильный ответ: "Каждый компонент обёрнут в circuit breaker. Если ML-модель не отвечает 5 раз подряд -- переключаемся на rules-only scoring (деградация, не отказ). Если Feature Store недоступен -- используем кэшированные фичи с пометкой 'stale'. Если Graph DB упал -- пропускаем граф-фичи, но повышаем порог для review. Для транзакций > $1000 при любом отказе -- auto-review. Default decision при полном отказе: approve для сумм < $100, review для остального. Всё это настраивается через FailoverManager с приоритетами."