Детекция спама: компоненты системы¶
~3 минуты чтения
Предварительно: Определение задачи | Материалы MLSD
Архитектура спам-детекции строится как трёхуровневый pipeline: Rule Engine отсекает 60-70% спама за < 0.1ms, быстрая модель (XGBoost, < 1ms) обрабатывает основной поток, а тяжёлая модель (DistilBERT, ~10ms) подключается только для неуверенных случаев. Такой cascade routing позволяет обрабатывать 1M+ сообщений/сек при суммарном latency < 50ms.
High-Level Architecture¶
graph TD
A["Message Input<br/>Email / SMS / Comment / Review"] --> B["Preprocessing<br/>Tokenization, URL extraction"]
B --> C["Rule Engine<br/>blocklist, regex"]
B --> D["ML Models<br/>text, user behavior"]
B --> E["Network Analysis<br/>IP, domain reputation"]
C & D & E --> F["Decision Engine<br/>Aggregate scores, threshold"]
F -->|clean| G["DELIVER<br/>(inbox)"]
F -->|suspicious| H["QUARANTINE<br/>(review)"]
F -->|spam| I["BLOCK<br/>(reject)"]
style F fill:#e8eaf6,stroke:#3f51b5
style G fill:#e8f5e9,stroke:#4caf50
style H fill:#fff3e0,stroke:#ff9800
style I fill:#ffebee,stroke:#f44336
Component Details¶
1. Preprocessing¶
class SpamPreprocessor:
"""
Extract structured features from raw message
"""
def process(self, message: RawMessage) -> ProcessedMessage:
# Header analysis (email)
header_features = self.parse_headers(message.headers)
# Text features
text_features = self.extract_text_features(message.body)
# URL analysis
urls = self.extract_urls(message.body)
url_features = self.analyze_urls(urls)
# Attachment analysis
attachment_features = self.analyze_attachments(
message.attachments
)
return ProcessedMessage(
header=header_features,
text=text_features,
urls=url_features,
attachments=attachment_features,
raw_text=message.body,
)
def extract_text_features(self, text: str) -> dict:
return {
"length": len(text),
"uppercase_ratio": sum(1 for c in text if c.isupper()) / max(len(text), 1),
"special_char_ratio": sum(1 for c in text if not c.isalnum()) / max(len(text), 1),
"url_count": len(re.findall(r'https?://\S+', text)),
"phone_count": len(re.findall(r'\+?\d[\d\s-]{7,}', text)),
"currency_symbols": len(re.findall(r'[$EUR]', text)),
"exclamation_count": text.count('!'),
"has_unsubscribe": bool(re.search(r'unsubscribe|opt.?out', text, re.I)),
}
2. Rule Engine¶
class RuleEngine:
"""
Fast deterministic rules (first line of defense)
"""
def evaluate(self, message: ProcessedMessage) -> RuleResult:
# Blocklist check (O(1) lookup)
if message.sender in self.blocklist:
return RuleResult(spam=True, reason="blocklist", confidence=1.0)
# Known spam patterns (regex)
for pattern in self.spam_patterns:
if pattern.search(message.raw_text):
return RuleResult(spam=True, reason=f"pattern:{pattern.pattern}", confidence=0.95)
# SPF/DKIM/DMARC check (email)
if message.header.spf_fail or message.header.dkim_fail:
return RuleResult(spam=True, reason="auth_fail", confidence=0.9)
# Allowlist (trusted senders)
if message.sender in self.allowlist:
return RuleResult(spam=False, reason="allowlist", confidence=1.0)
return RuleResult(spam=None, reason="no_rule_match", confidence=0.0)
3. ML Models¶
class SpamClassifier:
"""
Multi-model ensemble for spam detection
"""
def __init__(self):
# Text classifier (fine-tuned BERT or DistilBERT)
self.text_model = TextSpamClassifier()
# Behavioral model (user/sender patterns)
self.behavior_model = BehaviorModel()
# Lightweight model (for real-time, XGBoost on handcrafted features)
self.fast_model = FastSpamModel()
def predict(self, message: ProcessedMessage) -> SpamPrediction:
# Fast model (always runs, <1ms)
fast_score = self.fast_model.predict(message.text)
# If fast model is confident, skip heavy models
if fast_score > 0.95 or fast_score < 0.05:
return SpamPrediction(
score=fast_score,
model="fast",
)
# Deep model (runs for uncertain cases)
text_score = self.text_model.predict(message.raw_text)
behavior_score = self.behavior_model.predict(message)
# Ensemble
final_score = (
0.5 * text_score +
0.3 * behavior_score +
0.2 * fast_score
)
return SpamPrediction(score=final_score, model="ensemble")
class FastSpamModel:
"""
XGBoost on handcrafted features (<1ms inference)
"""
def predict(self, features: dict) -> float:
feature_vector = [
features["length"],
features["uppercase_ratio"],
features["special_char_ratio"],
features["url_count"],
features["has_unsubscribe"],
features["currency_symbols"],
features["exclamation_count"],
]
return self.model.predict_proba([feature_vector])[0][1]
class BehaviorModel:
"""
Sender/user behavior analysis
"""
def predict(self, message: ProcessedMessage) -> float:
sender_features = self.get_sender_features(message.sender)
features = {
"account_age_days": sender_features.account_age,
"messages_last_hour": sender_features.recent_volume,
"unique_recipients_last_hour": sender_features.unique_recipients,
"bounce_rate": sender_features.bounce_rate,
"complaint_rate": sender_features.complaint_rate,
"is_new_sender": sender_features.account_age < 7,
}
return self.model.predict_proba([list(features.values())])[0][1]
4. Network Analysis¶
class NetworkAnalyzer:
"""
IP, domain, and sender network reputation
"""
def analyze(self, message: ProcessedMessage) -> NetworkScore:
scores = {}
# IP reputation
scores["ip"] = self.ip_reputation.score(message.header.source_ip)
# Domain reputation (sender domain)
scores["domain"] = self.domain_reputation.score(
message.header.sender_domain
)
# URL reputation (links in message body)
url_scores = [
self.url_reputation.score(url) for url in message.urls
]
scores["urls"] = max(url_scores) if url_scores else 0.0
# Graph-based: is sender part of known spam network?
scores["network"] = self.spam_graph.check_membership(
message.sender
)
return NetworkScore(
ip_score=scores["ip"],
domain_score=scores["domain"],
url_score=scores["urls"],
network_score=scores["network"],
aggregate=max(scores.values()),
)
5. Decision Engine¶
class SpamDecisionEngine:
"""
Combine all signals into final decision
"""
def decide(
self,
rule_result: RuleResult,
ml_prediction: SpamPrediction,
network_score: NetworkScore,
) -> SpamDecision:
# Rule engine has priority (deterministic)
if rule_result.spam is not None:
return SpamDecision(
action="block" if rule_result.spam else "deliver",
reason=rule_result.reason,
confidence=rule_result.confidence,
)
# Combine ML and network scores
combined = (
0.6 * ml_prediction.score +
0.4 * network_score.aggregate
)
# Thresholds
if combined > 0.9:
return SpamDecision(action="block", confidence=combined)
elif combined > 0.5:
return SpamDecision(action="quarantine", confidence=combined)
else:
return SpamDecision(action="deliver", confidence=1 - combined)
Infrastructure¶
| Component | Technology | Scale |
|---|---|---|
| Rule Engine | In-memory (Bloom filter + regex) | <0.1ms |
| Fast Model | XGBoost (CPU) | <1ms |
| Deep Model | DistilBERT on GPU (Triton) | ~10ms |
| IP Reputation | Redis (IP -> score) | <1ms |
| Domain Reputation | Redis + daily batch update | <1ms |
| Spam Graph | Neo4j / in-memory graph | <5ms |
| Event Stream | Kafka (user feedback: mark as spam) | Async |
| Training Pipeline | Spark + PyTorch | Daily retrain |
Latency Budget¶
Total classification: <50ms (email), <10ms (real-time chat)
Preprocessing: 5ms
Rule engine: 0.1ms
Fast model: 1ms
Network analysis: 3ms
Deep model (if needed): 10ms
Decision: 0.5ms
Типичные заблуждения¶
Заблуждение: ensemble веса (0.5/0.3/0.2) можно задать экспертно
Веса моделей в ensemble должны оптимизироваться на validation set, а не задаваться вручную. Оптимальные веса зависят от распределения спама: для phishing-heavy трафика текстовая модель важнее (0.6+), для bot spam -- behavioral model (0.5+). Используйте stacking (meta-learner) или bayesian optimization для подбора весов.
Заблуждение: threshold 0.5 -- разумное значение для спам-классификации
При spam rate 2% (SMS) и threshold 0.5, даже модель с AUC 0.99 даёт неприемлемое количество FP. Правильный подход: выбирать threshold по precision-recall curve для target precision (например, 99.5%). Для email (45% spam) и SMS (2% spam) оптимальные thresholds будут существенно отличаться.
Заблуждение: IP blocklist достаточно обновлять раз в день
Спамеры меняют IP каждые 15-30 минут через botnets и proxy chains. Batch-обновление blocklist раз в день означает 24h окно уязвимости. Нужен real-time IP reputation scoring через streaming pipeline (Kafka -> Flink), дополненный batch-обновлением RBL (Realtime Blackhole Lists).
Вопросы с оценкой ответов¶
Зачем нужен cascade из fast model + deep model, а не просто deep model?
"Для экономии GPU ресурсов" -- верно, но не раскрывает главную причину
"Latency budget: при 1M msg/sec пропускать каждое сообщение через DistilBERT (~10ms на GPU) потребует сотни GPU. Fast model (XGBoost, < 1ms на CPU) уверенно классифицирует 70-80% сообщений (score > 0.95 или < 0.05). Deep model подключается только для оставшихся 20-30% неуверенных случаев. Итого: средний latency ~3ms вместо ~15ms, потребность в GPU сокращается в 3-5x."
Как Decision Engine агрегирует сигналы от Rule Engine, ML и Network Analysis?
"Простое голосование большинством" -- теряет информацию о confidence
"Rule Engine имеет абсолютный приоритет -- если sender в blocklist (confidence 1.0) или allowlist, решение принимается мгновенно без ML. Для остальных: weighted combination ML score (0.6) + Network score (0.4) с тремя порогами: > 0.9 = block, 0.5-0.9 = quarantine, < 0.5 = deliver. Веса и пороги калибруются на validation set. Quarantine -- ключевой элемент: снижает цену ошибки, позволяя пользователю проверить."