Перейти к содержанию

Модерация контента: компоненты системы

~3 минуты чтения

Предварительно: Определение задачи | Материалы MLSD

Система модерации контента состоит из 5 ключевых компонентов: Text Moderation (BERT, 5 параллельных классификаторов), Image Moderation (ResNet NSFW + YOLO hate symbols + OCR), Video Moderation (smart frame sampling + audio transcription), Decision Engine (policy-aware aggregation) и Human Review System (priority queue с SLA). Все ML-модели работают параллельно через ThreadPoolExecutor, что позволяет обрабатывать multi-modal контент за < 500ms для текста и < 5s для видео.

High-Level Architecture

graph TD
    A["Content Submission<br/>POST /content (text, image, video)"] --> B["Content Ingestion<br/>Preprocessing, Feature Extraction"]

    B --> C["Text Moderation"]
    B --> D["Image Moderation"]
    B --> E["Video Moderation"]

    C & D & E --> F["Decision Engine<br/>Aggregate Scores, Apply Policies"]

    F -->|safe| G["APPROVE<br/>(Publish)"]
    F -->|uncertain| H["REVIEW<br/>(Queue)"]
    F -->|violation| I["REJECT<br/>(Block)"]

    H --> J["Human Moderator"]

    style F fill:#e8eaf6,stroke:#3f51b5
    style G fill:#e8f5e9,stroke:#4caf50
    style H fill:#fff3e0,stroke:#ff9800
    style I fill:#ffebee,stroke:#f44336

Component Details

1. Text Moderation

class TextModerationService:
    """
    Multi-model text content moderation
    """

    def __init__(self):
        self.preprocessor = TextPreprocessor()
        self.classifiers = {
            "toxicity": ToxicityClassifier(),
            "hate_speech": HateSpeechClassifier(),
            "spam": SpamClassifier(),
            "self_harm": SelfHarmClassifier(),
            "threat": ThreatClassifier(),
        }
        self.context_analyzer = ContextAnalyzer()

    def moderate(self, text: str, context: dict) -> ModerationResult:
        """
        Full text moderation pipeline
        """
        # Preprocessing
        clean_text = self.preprocessor.clean(text)
        normalized = self.preprocessor.normalize(clean_text)

        # Run classifiers in parallel
        scores = {}
        with ThreadPoolExecutor() as executor:
            futures = {
                name: executor.submit(clf.predict, normalized)
                for name, clf in self.classifiers.items()
            }
            for name, future in futures.items():
                scores[name] = future.result()

        # Context analysis
        context_adjustment = self.context_analyzer.adjust_scores(
            scores, context
        )

        return ModerationResult(
            raw_scores=scores,
            adjusted_scores=context_adjustment,
            top_violation=max(scores, key=scores.get),
            confidence=max(scores.values()),
        )

class ToxicityClassifier:
    """
    BERT-based toxicity classifier
    """

    def __init__(self):
        self.model = AutoModelForSequenceClassification.from_pretrained(
            "unitary/toxic-bert"
        )
        self.tokenizer = AutoTokenizer.from_pretrained("unitary/toxic-bert")

    def predict(self, text: str) -> float:
        inputs = self.tokenizer(
            text,
            truncation=True,
            max_length=512,
            return_tensors="pt"
        )

        with torch.no_grad():
            outputs = self.model(**inputs)
            probs = torch.softmax(outputs.logits, dim=-1)

        return probs[0][1].item()  # Probability of toxic

class TextPreprocessor:
    """
    Handle adversarial text manipulation
    """

    def normalize(self, text: str) -> str:
        # Handle l33t speak: h4te → hate
        text = self.decode_leet(text)

        # Handle obfuscation: f*ck → fuck
        text = self.decode_obfuscation(text)

        # Handle unicode tricks
        text = self.normalize_unicode(text)

        # Handle repetition: haaaate → hate
        text = self.normalize_repetition(text)

        return text.lower()

    def decode_leet(self, text: str) -> str:
        leet_map = {
            '0': 'o', '1': 'i', '3': 'e', '4': 'a',
            '5': 's', '7': 't', '@': 'a', '$': 's'
        }
        for leet, char in leet_map.items():
            text = text.replace(leet, char)
        return text

2. Image Moderation

class ImageModerationService:
    """
    Multi-label image classification for content moderation
    """

    def __init__(self):
        self.nsfw_classifier = NSFWClassifier()
        self.violence_classifier = ViolenceClassifier()
        self.hate_symbol_detector = HateSymbolDetector()
        self.ocr_engine = OCREngine()
        self.text_moderator = TextModerationService()

    def moderate(self, image: bytes) -> ImageModerationResult:
        """
        Full image moderation pipeline
        """
        # Decode image
        img = self.decode_image(image)

        # Run classifiers in parallel
        with ThreadPoolExecutor() as executor:
            nsfw_future = executor.submit(self.nsfw_classifier.predict, img)
            violence_future = executor.submit(self.violence_classifier.predict, img)
            symbol_future = executor.submit(self.hate_symbol_detector.detect, img)
            ocr_future = executor.submit(self.ocr_engine.extract_text, img)

            nsfw_score = nsfw_future.result()
            violence_score = violence_future.result()
            hate_symbols = symbol_future.result()
            extracted_text = ocr_future.result()

        # Moderate extracted text
        text_result = None
        if extracted_text:
            text_result = self.text_moderator.moderate(extracted_text, {})

        return ImageModerationResult(
            nsfw_score=nsfw_score,
            violence_score=violence_score,
            hate_symbols=hate_symbols,
            text_moderation=text_result,
        )

class NSFWClassifier:
    """
    ResNet-based NSFW classifier
    """

    def __init__(self):
        self.model = resnet50(pretrained=False)
        self.model.fc = nn.Linear(2048, 5)  # 5 NSFW categories
        self.model.load_state_dict(torch.load("nsfw_model.pth"))
        self.model.eval()

        self.transform = transforms.Compose([
            transforms.Resize(256),
            transforms.CenterCrop(224),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406],
                               std=[0.229, 0.224, 0.225]),
        ])

    def predict(self, image: PIL.Image) -> dict:
        """
        Predict NSFW categories
        """
        tensor = self.transform(image).unsqueeze(0)

        with torch.no_grad():
            outputs = self.model(tensor)
            probs = torch.softmax(outputs, dim=-1)[0]

        categories = ["safe", "suggestive", "porn", "hentai", "neutral"]
        return {cat: prob.item() for cat, prob in zip(categories, probs)}

class HateSymbolDetector:
    """
    Object detection for hate symbols
    """

    def __init__(self):
        self.model = YOLOv8("hate_symbols_yolo.pt")
        self.known_symbols = self.load_symbol_database()

    def detect(self, image: PIL.Image) -> List[HateSymbol]:
        """
        Detect hate symbols in image
        """
        results = self.model(image)

        detected = []
        for box in results.boxes:
            if box.conf > 0.5:
                symbol_class = self.model.names[int(box.cls)]
                detected.append(HateSymbol(
                    name=symbol_class,
                    confidence=box.conf.item(),
                    bbox=box.xyxy.tolist(),
                ))

        return detected

3. Video Moderation

class VideoModerationService:
    """
    Video content moderation with frame sampling
    """

    def __init__(self):
        self.frame_sampler = FrameSampler()
        self.image_moderator = ImageModerationService()
        self.audio_moderator = AudioModerationService()
        self.scene_detector = SceneDetector()

    def moderate(self, video_path: str) -> VideoModerationResult:
        """
        Full video moderation pipeline
        """
        # Extract key frames
        frames = self.frame_sampler.sample(video_path)

        # Extract audio
        audio = self.extract_audio(video_path)

        # Moderate in parallel
        with ThreadPoolExecutor() as executor:
            # Moderate frames
            frame_futures = [
                executor.submit(self.image_moderator.moderate, frame)
                for frame in frames
            ]

            # Moderate audio
            audio_future = executor.submit(self.audio_moderator.moderate, audio)

            # Collect results
            frame_results = [f.result() for f in frame_futures]
            audio_result = audio_future.result()

        # Aggregate scores
        aggregated = self.aggregate_frame_results(frame_results)

        return VideoModerationResult(
            frame_results=frame_results,
            audio_result=audio_result,
            aggregated_scores=aggregated,
            flagged_timestamps=self.get_flagged_timestamps(frame_results),
        )

class FrameSampler:
    """
    Smart frame sampling for video analysis
    """

    def __init__(self, strategy: str = "scene_change"):
        self.strategy = strategy
        self.scene_detector = SceneDetector()

    def sample(self, video_path: str, max_frames: int = 30) -> List[np.ndarray]:
        """
        Sample key frames from video
        """
        if self.strategy == "uniform":
            return self.uniform_sample(video_path, max_frames)
        elif self.strategy == "scene_change":
            return self.scene_change_sample(video_path, max_frames)

    def scene_change_sample(self, video_path: str, max_frames: int) -> List[np.ndarray]:
        """
        Sample frames at scene changes
        """
        scenes = self.scene_detector.detect(video_path)

        frames = []
        cap = cv2.VideoCapture(video_path)

        for scene in scenes[:max_frames]:
            cap.set(cv2.CAP_PROP_POS_FRAMES, scene.start_frame)
            ret, frame = cap.read()
            if ret:
                frames.append(frame)

        cap.release()
        return frames

class AudioModerationService:
    """
    Audio/speech content moderation
    """

    def __init__(self):
        self.speech_recognizer = SpeechRecognizer()
        self.text_moderator = TextModerationService()
        self.audio_classifier = AudioClassifier()

    def moderate(self, audio_path: str) -> AudioModerationResult:
        """
        Moderate audio content
        """
        # Speech to text
        transcript = self.speech_recognizer.transcribe(audio_path)

        # Moderate transcript
        text_result = self.text_moderator.moderate(transcript, {})

        # Audio-only analysis (screaming, gunshots, etc.)
        audio_scores = self.audio_classifier.classify(audio_path)

        return AudioModerationResult(
            transcript=transcript,
            text_moderation=text_result,
            audio_event_scores=audio_scores,
        )

4. Decision Engine

class DecisionEngine:
    """
    Aggregate scores and make final decision
    """

    def __init__(self):
        self.policies = self.load_policies()
        self.thresholds = self.load_thresholds()

    def decide(
        self,
        text_result: Optional[ModerationResult],
        image_result: Optional[ImageModerationResult],
        video_result: Optional[VideoModerationResult],
        context: dict
    ) -> Decision:
        """
        Make final moderation decision
        """
        # Aggregate all scores
        all_scores = self.aggregate_scores(text_result, image_result, video_result)

        # Apply policy rules
        for policy in self.policies:
            if policy.matches(all_scores, context):
                return Decision(
                    action=policy.action,
                    reason=policy.name,
                    scores=all_scores,
                    confidence=self.calculate_confidence(all_scores)
                )

        # Default threshold-based decision
        max_score = max(all_scores.values())
        max_category = max(all_scores, key=all_scores.get)

        if max_score > self.thresholds.reject:
            action = "reject"
        elif max_score > self.thresholds.review:
            action = "review"
        else:
            action = "approve"

        return Decision(
            action=action,
            reason=max_category,
            scores=all_scores,
            confidence=max_score
        )

    def aggregate_scores(self, text_result, image_result, video_result) -> dict:
        """
        Combine scores from all modalities
        """
        scores = defaultdict(float)

        if text_result:
            for category, score in text_result.raw_scores.items():
                scores[category] = max(scores[category], score)

        if image_result:
            scores["nsfw"] = max(scores["nsfw"], image_result.nsfw_score["porn"])
            scores["violence"] = max(scores["violence"], image_result.violence_score)

        if video_result:
            for category, score in video_result.aggregated_scores.items():
                scores[category] = max(scores[category], score)

        return dict(scores)

class PolicyEngine:
    """
    Rule-based policy application
    """

    def __init__(self):
        self.policies = [
            # Zero tolerance
            Policy(
                name="child_safety",
                condition=lambda s, c: s.get("child_exploitation", 0) > 0.5,
                action="reject",
                priority=1
            ),
            Policy(
                name="terrorism",
                condition=lambda s, c: s.get("terrorism", 0) > 0.8,
                action="reject",
                priority=2
            ),

            # Context-dependent
            Policy(
                name="news_exception",
                condition=lambda s, c: (
                    s.get("violence", 0) > 0.7 and
                    c.get("is_news_account", False)
                ),
                action="review",  # Don't auto-reject news
                priority=3
            ),

            # Regional
            Policy(
                name="germany_hate_speech",
                condition=lambda s, c: (
                    s.get("hate_speech", 0) > 0.5 and
                    c.get("region") == "DE"
                ),
                action="reject",  # Stricter in Germany
                priority=4
            ),
        ]

5. Human Review System

class HumanReviewSystem:
    """
    Queue and workflow for human moderators
    """

    def __init__(self):
        self.queue = PriorityQueue()
        self.assignment = ModeratorAssignment()
        self.quality_control = QualityControl()

    def enqueue(self, content: Content, decision: Decision):
        """
        Add content to review queue
        """
        priority = self.calculate_priority(decision)

        review_item = ReviewItem(
            content_id=content.id,
            content_type=content.type,
            ml_decision=decision,
            priority=priority,
            created_at=datetime.now(),
            sla_deadline=self.calculate_sla(priority),
        )

        self.queue.put(review_item)

    def calculate_priority(self, decision: Decision) -> int:
        """
        Higher priority for more severe violations
        """
        severity_weights = {
            "child_exploitation": 10,
            "terrorism": 9,
            "self_harm": 8,
            "violence": 7,
            "hate_speech": 6,
            "nsfw": 5,
            "harassment": 4,
            "spam": 2,
        }

        base_priority = severity_weights.get(decision.reason, 1)

        # Boost for high-reach content
        if decision.content_reach > 10000:
            base_priority += 2

        return base_priority

    def assign_to_moderator(self, moderator_id: str) -> ReviewItem:
        """
        Get next item for moderator
        """
        # Consider moderator specialization
        moderator = self.get_moderator(moderator_id)

        # Get suitable item from queue
        item = self.queue.get_for_specialization(moderator.specialization)

        # Lock item
        item.assigned_to = moderator_id
        item.assigned_at = datetime.now()

        return item

    def submit_decision(
        self,
        item_id: str,
        moderator_id: str,
        decision: str,
        notes: str
    ):
        """
        Moderator submits their decision
        """
        item = self.get_item(item_id)

        # Quality control check (random sample)
        if self.quality_control.should_audit(moderator_id):
            self.quality_control.queue_for_audit(item, decision)

        # Record decision
        item.human_decision = decision
        item.decided_at = datetime.now()
        item.notes = notes

        # Apply action
        self.apply_moderation_action(item, decision)

        # Update ML feedback
        self.send_feedback_to_ml(item)

Infrastructure

Service Architecture

apiVersion: apps/v1
kind: Deployment
metadata:
  name: content-moderation
spec:
  replicas: 50
  template:
    spec:
      containers:
        - name: moderation-api
          image: moderation-service:v2.1
          resources:
            requests:
              memory: "4Gi"
              cpu: "2"
            limits:
              memory: "8Gi"
              cpu: "4"

        - name: text-model
          image: text-moderation-model:v1.5
          resources:
            requests:
              memory: "8Gi"
              cpu: "4"
              nvidia.com/gpu: "1"

        - name: image-model
          image: image-moderation-model:v1.3
          resources:
            requests:
              memory: "8Gi"
              cpu: "4"
              nvidia.com/gpu: "1"

Processing Pipeline

graph TD
    UP["Content Upload"] --> KF["Kafka Topic"]
    KF --> WP["Worker Pool"]
    WP --> MLM["ML Models"]
    MLM --> DE["Decision Engine"]
    MLM --> MS["Model Serving<br/>(TF Serving + Triton)"]

    style UP fill:#e8eaf6,stroke:#3f51b5
    style KF fill:#fff3e0,stroke:#ef6c00
    style WP fill:#e8eaf6,stroke:#3f51b5
    style MLM fill:#f3e5f5,stroke:#9c27b0
    style DE fill:#e8f5e9,stroke:#4caf50
    style MS fill:#fff3e0,stroke:#ef6c00

Типичные заблуждения

Заблуждение: OCR для image moderation -- это optional nice-to-have

Мемы с текстом (image macros) составляют 30-40% hate speech контента на платформах. Без OCR + text moderation pipeline для извлечённого текста система пропускает значительную долю нарушений. Latency impact минимален: современные OCR engines (PaddleOCR, EasyOCR) работают за 5-15ms на изображение.

Заблуждение: для видео достаточно проанализировать несколько случайных кадров

Uniform sampling пропускает 15-25% нарушений, которые происходят в коротких сценах. Scene-change sampling (детекция смены сцен + анализ ключевого кадра из каждой) ловит на 20% больше нарушений при том же количестве кадров (~30 для 5-минутного видео). Плюс audio channel обязателен: hate speech в речи, gunshots, explicit lyrics.

Заблуждение: zero-tolerance policies не требуют Human Review

Даже для child safety (zero tolerance), auto-reject без human review создаёт legal risk: (1) ложные обвинения в серьёзном преступлении, (2) удаление легитимного медицинского/образовательного контента. Правильный подход: auto-remove из публичного доступа немедленно, но queue для human review с SLA < 1 час для подтверждения и возможного law enforcement reporting.

Вопросы с оценкой ответов

Как организовать priority queue для Human Review System?

❌ "FIFO -- первый пришёл, первый обработан" -- игнорирует severity и reach

✅ "Priority по двум осям: severity (child safety=10, terrorism=9, self-harm=8, ..., spam=2) и reach (контент с 10K+ views получает +2 к приоритету). SLA: severity 8+ = review за 1 час, severity 5-7 = 4 часа, severity < 5 = 24 часа. Moderator specialization: не каждый модератор обрабатывает child safety -- нужна специализация + rotation для mental health. Quality control: 5% решений проходят аудит вторым модератором."

Как обеспечить feedback loop от Human Review обратно в ML модели?

❌ "Человеческие решения автоматически добавляются в training set" -- без quality control это деградирует модель

✅ "Трёхступенчатый feedback loop: (1) Решения модераторов логируются с confidence score и notes, (2) Quality-filtered: только решения с inter-annotator agreement > 80% идут в training, (3) Active learning: модель запрашивает human labels для cases с максимальной uncertainty (confidence 0.4-0.6). Weekly model retraining на augmented dataset. Мониторинг: если appeal success rate > 10% на category -- сигнал для пересмотра policy или модели."