Перейти к содержанию

Детекция спама: компоненты системы

~3 минуты чтения

Предварительно: Определение задачи | Материалы MLSD

Архитектура спам-детекции строится как трёхуровневый pipeline: Rule Engine отсекает 60-70% спама за < 0.1ms, быстрая модель (XGBoost, < 1ms) обрабатывает основной поток, а тяжёлая модель (DistilBERT, ~10ms) подключается только для неуверенных случаев. Такой cascade routing позволяет обрабатывать 1M+ сообщений/сек при суммарном latency < 50ms.

High-Level Architecture

graph TD
    A["Message Input<br/>Email / SMS / Comment / Review"] --> B["Preprocessing<br/>Tokenization, URL extraction"]

    B --> C["Rule Engine<br/>blocklist, regex"]
    B --> D["ML Models<br/>text, user behavior"]
    B --> E["Network Analysis<br/>IP, domain reputation"]

    C & D & E --> F["Decision Engine<br/>Aggregate scores, threshold"]

    F -->|clean| G["DELIVER<br/>(inbox)"]
    F -->|suspicious| H["QUARANTINE<br/>(review)"]
    F -->|spam| I["BLOCK<br/>(reject)"]

    style F fill:#e8eaf6,stroke:#3f51b5
    style G fill:#e8f5e9,stroke:#4caf50
    style H fill:#fff3e0,stroke:#ff9800
    style I fill:#ffebee,stroke:#f44336

Component Details

1. Preprocessing

class SpamPreprocessor:
    """
    Extract structured features from raw message
    """

    def process(self, message: RawMessage) -> ProcessedMessage:
        # Header analysis (email)
        header_features = self.parse_headers(message.headers)

        # Text features
        text_features = self.extract_text_features(message.body)

        # URL analysis
        urls = self.extract_urls(message.body)
        url_features = self.analyze_urls(urls)

        # Attachment analysis
        attachment_features = self.analyze_attachments(
            message.attachments
        )

        return ProcessedMessage(
            header=header_features,
            text=text_features,
            urls=url_features,
            attachments=attachment_features,
            raw_text=message.body,
        )

    def extract_text_features(self, text: str) -> dict:
        return {
            "length": len(text),
            "uppercase_ratio": sum(1 for c in text if c.isupper()) / max(len(text), 1),
            "special_char_ratio": sum(1 for c in text if not c.isalnum()) / max(len(text), 1),
            "url_count": len(re.findall(r'https?://\S+', text)),
            "phone_count": len(re.findall(r'\+?\d[\d\s-]{7,}', text)),
            "currency_symbols": len(re.findall(r'[$EUR]', text)),
            "exclamation_count": text.count('!'),
            "has_unsubscribe": bool(re.search(r'unsubscribe|opt.?out', text, re.I)),
        }

2. Rule Engine

class RuleEngine:
    """
    Fast deterministic rules (first line of defense)
    """

    def evaluate(self, message: ProcessedMessage) -> RuleResult:
        # Blocklist check (O(1) lookup)
        if message.sender in self.blocklist:
            return RuleResult(spam=True, reason="blocklist", confidence=1.0)

        # Known spam patterns (regex)
        for pattern in self.spam_patterns:
            if pattern.search(message.raw_text):
                return RuleResult(spam=True, reason=f"pattern:{pattern.pattern}", confidence=0.95)

        # SPF/DKIM/DMARC check (email)
        if message.header.spf_fail or message.header.dkim_fail:
            return RuleResult(spam=True, reason="auth_fail", confidence=0.9)

        # Allowlist (trusted senders)
        if message.sender in self.allowlist:
            return RuleResult(spam=False, reason="allowlist", confidence=1.0)

        return RuleResult(spam=None, reason="no_rule_match", confidence=0.0)

3. ML Models

class SpamClassifier:
    """
    Multi-model ensemble for spam detection
    """

    def __init__(self):
        # Text classifier (fine-tuned BERT or DistilBERT)
        self.text_model = TextSpamClassifier()

        # Behavioral model (user/sender patterns)
        self.behavior_model = BehaviorModel()

        # Lightweight model (for real-time, XGBoost on handcrafted features)
        self.fast_model = FastSpamModel()

    def predict(self, message: ProcessedMessage) -> SpamPrediction:
        # Fast model (always runs, <1ms)
        fast_score = self.fast_model.predict(message.text)

        # If fast model is confident, skip heavy models
        if fast_score > 0.95 or fast_score < 0.05:
            return SpamPrediction(
                score=fast_score,
                model="fast",
            )

        # Deep model (runs for uncertain cases)
        text_score = self.text_model.predict(message.raw_text)
        behavior_score = self.behavior_model.predict(message)

        # Ensemble
        final_score = (
            0.5 * text_score +
            0.3 * behavior_score +
            0.2 * fast_score
        )

        return SpamPrediction(score=final_score, model="ensemble")


class FastSpamModel:
    """
    XGBoost on handcrafted features (<1ms inference)
    """

    def predict(self, features: dict) -> float:
        feature_vector = [
            features["length"],
            features["uppercase_ratio"],
            features["special_char_ratio"],
            features["url_count"],
            features["has_unsubscribe"],
            features["currency_symbols"],
            features["exclamation_count"],
        ]
        return self.model.predict_proba([feature_vector])[0][1]


class BehaviorModel:
    """
    Sender/user behavior analysis
    """

    def predict(self, message: ProcessedMessage) -> float:
        sender_features = self.get_sender_features(message.sender)

        features = {
            "account_age_days": sender_features.account_age,
            "messages_last_hour": sender_features.recent_volume,
            "unique_recipients_last_hour": sender_features.unique_recipients,
            "bounce_rate": sender_features.bounce_rate,
            "complaint_rate": sender_features.complaint_rate,
            "is_new_sender": sender_features.account_age < 7,
        }

        return self.model.predict_proba([list(features.values())])[0][1]

4. Network Analysis

class NetworkAnalyzer:
    """
    IP, domain, and sender network reputation
    """

    def analyze(self, message: ProcessedMessage) -> NetworkScore:
        scores = {}

        # IP reputation
        scores["ip"] = self.ip_reputation.score(message.header.source_ip)

        # Domain reputation (sender domain)
        scores["domain"] = self.domain_reputation.score(
            message.header.sender_domain
        )

        # URL reputation (links in message body)
        url_scores = [
            self.url_reputation.score(url) for url in message.urls
        ]
        scores["urls"] = max(url_scores) if url_scores else 0.0

        # Graph-based: is sender part of known spam network?
        scores["network"] = self.spam_graph.check_membership(
            message.sender
        )

        return NetworkScore(
            ip_score=scores["ip"],
            domain_score=scores["domain"],
            url_score=scores["urls"],
            network_score=scores["network"],
            aggregate=max(scores.values()),
        )

5. Decision Engine

class SpamDecisionEngine:
    """
    Combine all signals into final decision
    """

    def decide(
        self,
        rule_result: RuleResult,
        ml_prediction: SpamPrediction,
        network_score: NetworkScore,
    ) -> SpamDecision:

        # Rule engine has priority (deterministic)
        if rule_result.spam is not None:
            return SpamDecision(
                action="block" if rule_result.spam else "deliver",
                reason=rule_result.reason,
                confidence=rule_result.confidence,
            )

        # Combine ML and network scores
        combined = (
            0.6 * ml_prediction.score +
            0.4 * network_score.aggregate
        )

        # Thresholds
        if combined > 0.9:
            return SpamDecision(action="block", confidence=combined)
        elif combined > 0.5:
            return SpamDecision(action="quarantine", confidence=combined)
        else:
            return SpamDecision(action="deliver", confidence=1 - combined)

Infrastructure

Component Technology Scale
Rule Engine In-memory (Bloom filter + regex) <0.1ms
Fast Model XGBoost (CPU) <1ms
Deep Model DistilBERT on GPU (Triton) ~10ms
IP Reputation Redis (IP -> score) <1ms
Domain Reputation Redis + daily batch update <1ms
Spam Graph Neo4j / in-memory graph <5ms
Event Stream Kafka (user feedback: mark as spam) Async
Training Pipeline Spark + PyTorch Daily retrain

Latency Budget

Total classification: <50ms (email), <10ms (real-time chat)

Preprocessing:         5ms
Rule engine:           0.1ms
Fast model:            1ms
Network analysis:      3ms
Deep model (if needed): 10ms
Decision:              0.5ms

Типичные заблуждения

Заблуждение: ensemble веса (0.5/0.3/0.2) можно задать экспертно

Веса моделей в ensemble должны оптимизироваться на validation set, а не задаваться вручную. Оптимальные веса зависят от распределения спама: для phishing-heavy трафика текстовая модель важнее (0.6+), для bot spam -- behavioral model (0.5+). Используйте stacking (meta-learner) или bayesian optimization для подбора весов.

Заблуждение: threshold 0.5 -- разумное значение для спам-классификации

При spam rate 2% (SMS) и threshold 0.5, даже модель с AUC 0.99 даёт неприемлемое количество FP. Правильный подход: выбирать threshold по precision-recall curve для target precision (например, 99.5%). Для email (45% spam) и SMS (2% spam) оптимальные thresholds будут существенно отличаться.

Заблуждение: IP blocklist достаточно обновлять раз в день

Спамеры меняют IP каждые 15-30 минут через botnets и proxy chains. Batch-обновление blocklist раз в день означает 24h окно уязвимости. Нужен real-time IP reputation scoring через streaming pipeline (Kafka -> Flink), дополненный batch-обновлением RBL (Realtime Blackhole Lists).

Вопросы с оценкой ответов

Зачем нужен cascade из fast model + deep model, а не просто deep model?

❌ "Для экономии GPU ресурсов" -- верно, но не раскрывает главную причину

✅ "Latency budget: при 1M msg/sec пропускать каждое сообщение через DistilBERT (~10ms на GPU) потребует сотни GPU. Fast model (XGBoost, < 1ms на CPU) уверенно классифицирует 70-80% сообщений (score > 0.95 или < 0.05). Deep model подключается только для оставшихся 20-30% неуверенных случаев. Итого: средний latency ~3ms вместо ~15ms, потребность в GPU сокращается в 3-5x."

Как Decision Engine агрегирует сигналы от Rule Engine, ML и Network Analysis?

❌ "Простое голосование большинством" -- теряет информацию о confidence

✅ "Rule Engine имеет абсолютный приоритет -- если sender в blocklist (confidence 1.0) или allowlist, решение принимается мгновенно без ML. Для остальных: weighted combination ML score (0.6) + Network score (0.4) с тремя порогами: > 0.9 = block, 0.5-0.9 = quarantine, < 0.5 = deliver. Веса и пороги калибруются на validation set. Quarantine -- ключевой элемент: снижает цену ошибки, позволяя пользователю проверить."