Модерация контента: компоненты системы¶
~3 минуты чтения
Предварительно: Определение задачи | Материалы MLSD
Система модерации контента состоит из 5 ключевых компонентов: Text Moderation (BERT, 5 параллельных классификаторов), Image Moderation (ResNet NSFW + YOLO hate symbols + OCR), Video Moderation (smart frame sampling + audio transcription), Decision Engine (policy-aware aggregation) и Human Review System (priority queue с SLA). Все ML-модели работают параллельно через ThreadPoolExecutor, что позволяет обрабатывать multi-modal контент за < 500ms для текста и < 5s для видео.
High-Level Architecture¶
graph TD
A["Content Submission<br/>POST /content (text, image, video)"] --> B["Content Ingestion<br/>Preprocessing, Feature Extraction"]
B --> C["Text Moderation"]
B --> D["Image Moderation"]
B --> E["Video Moderation"]
C & D & E --> F["Decision Engine<br/>Aggregate Scores, Apply Policies"]
F -->|safe| G["APPROVE<br/>(Publish)"]
F -->|uncertain| H["REVIEW<br/>(Queue)"]
F -->|violation| I["REJECT<br/>(Block)"]
H --> J["Human Moderator"]
style F fill:#e8eaf6,stroke:#3f51b5
style G fill:#e8f5e9,stroke:#4caf50
style H fill:#fff3e0,stroke:#ff9800
style I fill:#ffebee,stroke:#f44336
Component Details¶
1. Text Moderation¶
class TextModerationService:
"""
Multi-model text content moderation
"""
def __init__(self):
self.preprocessor = TextPreprocessor()
self.classifiers = {
"toxicity": ToxicityClassifier(),
"hate_speech": HateSpeechClassifier(),
"spam": SpamClassifier(),
"self_harm": SelfHarmClassifier(),
"threat": ThreatClassifier(),
}
self.context_analyzer = ContextAnalyzer()
def moderate(self, text: str, context: dict) -> ModerationResult:
"""
Full text moderation pipeline
"""
# Preprocessing
clean_text = self.preprocessor.clean(text)
normalized = self.preprocessor.normalize(clean_text)
# Run classifiers in parallel
scores = {}
with ThreadPoolExecutor() as executor:
futures = {
name: executor.submit(clf.predict, normalized)
for name, clf in self.classifiers.items()
}
for name, future in futures.items():
scores[name] = future.result()
# Context analysis
context_adjustment = self.context_analyzer.adjust_scores(
scores, context
)
return ModerationResult(
raw_scores=scores,
adjusted_scores=context_adjustment,
top_violation=max(scores, key=scores.get),
confidence=max(scores.values()),
)
class ToxicityClassifier:
"""
BERT-based toxicity classifier
"""
def __init__(self):
self.model = AutoModelForSequenceClassification.from_pretrained(
"unitary/toxic-bert"
)
self.tokenizer = AutoTokenizer.from_pretrained("unitary/toxic-bert")
def predict(self, text: str) -> float:
inputs = self.tokenizer(
text,
truncation=True,
max_length=512,
return_tensors="pt"
)
with torch.no_grad():
outputs = self.model(**inputs)
probs = torch.softmax(outputs.logits, dim=-1)
return probs[0][1].item() # Probability of toxic
class TextPreprocessor:
"""
Handle adversarial text manipulation
"""
def normalize(self, text: str) -> str:
# Handle l33t speak: h4te → hate
text = self.decode_leet(text)
# Handle obfuscation: f*ck → fuck
text = self.decode_obfuscation(text)
# Handle unicode tricks
text = self.normalize_unicode(text)
# Handle repetition: haaaate → hate
text = self.normalize_repetition(text)
return text.lower()
def decode_leet(self, text: str) -> str:
leet_map = {
'0': 'o', '1': 'i', '3': 'e', '4': 'a',
'5': 's', '7': 't', '@': 'a', '$': 's'
}
for leet, char in leet_map.items():
text = text.replace(leet, char)
return text
2. Image Moderation¶
class ImageModerationService:
"""
Multi-label image classification for content moderation
"""
def __init__(self):
self.nsfw_classifier = NSFWClassifier()
self.violence_classifier = ViolenceClassifier()
self.hate_symbol_detector = HateSymbolDetector()
self.ocr_engine = OCREngine()
self.text_moderator = TextModerationService()
def moderate(self, image: bytes) -> ImageModerationResult:
"""
Full image moderation pipeline
"""
# Decode image
img = self.decode_image(image)
# Run classifiers in parallel
with ThreadPoolExecutor() as executor:
nsfw_future = executor.submit(self.nsfw_classifier.predict, img)
violence_future = executor.submit(self.violence_classifier.predict, img)
symbol_future = executor.submit(self.hate_symbol_detector.detect, img)
ocr_future = executor.submit(self.ocr_engine.extract_text, img)
nsfw_score = nsfw_future.result()
violence_score = violence_future.result()
hate_symbols = symbol_future.result()
extracted_text = ocr_future.result()
# Moderate extracted text
text_result = None
if extracted_text:
text_result = self.text_moderator.moderate(extracted_text, {})
return ImageModerationResult(
nsfw_score=nsfw_score,
violence_score=violence_score,
hate_symbols=hate_symbols,
text_moderation=text_result,
)
class NSFWClassifier:
"""
ResNet-based NSFW classifier
"""
def __init__(self):
self.model = resnet50(pretrained=False)
self.model.fc = nn.Linear(2048, 5) # 5 NSFW categories
self.model.load_state_dict(torch.load("nsfw_model.pth"))
self.model.eval()
self.transform = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]),
])
def predict(self, image: PIL.Image) -> dict:
"""
Predict NSFW categories
"""
tensor = self.transform(image).unsqueeze(0)
with torch.no_grad():
outputs = self.model(tensor)
probs = torch.softmax(outputs, dim=-1)[0]
categories = ["safe", "suggestive", "porn", "hentai", "neutral"]
return {cat: prob.item() for cat, prob in zip(categories, probs)}
class HateSymbolDetector:
"""
Object detection for hate symbols
"""
def __init__(self):
self.model = YOLOv8("hate_symbols_yolo.pt")
self.known_symbols = self.load_symbol_database()
def detect(self, image: PIL.Image) -> List[HateSymbol]:
"""
Detect hate symbols in image
"""
results = self.model(image)
detected = []
for box in results.boxes:
if box.conf > 0.5:
symbol_class = self.model.names[int(box.cls)]
detected.append(HateSymbol(
name=symbol_class,
confidence=box.conf.item(),
bbox=box.xyxy.tolist(),
))
return detected
3. Video Moderation¶
class VideoModerationService:
"""
Video content moderation with frame sampling
"""
def __init__(self):
self.frame_sampler = FrameSampler()
self.image_moderator = ImageModerationService()
self.audio_moderator = AudioModerationService()
self.scene_detector = SceneDetector()
def moderate(self, video_path: str) -> VideoModerationResult:
"""
Full video moderation pipeline
"""
# Extract key frames
frames = self.frame_sampler.sample(video_path)
# Extract audio
audio = self.extract_audio(video_path)
# Moderate in parallel
with ThreadPoolExecutor() as executor:
# Moderate frames
frame_futures = [
executor.submit(self.image_moderator.moderate, frame)
for frame in frames
]
# Moderate audio
audio_future = executor.submit(self.audio_moderator.moderate, audio)
# Collect results
frame_results = [f.result() for f in frame_futures]
audio_result = audio_future.result()
# Aggregate scores
aggregated = self.aggregate_frame_results(frame_results)
return VideoModerationResult(
frame_results=frame_results,
audio_result=audio_result,
aggregated_scores=aggregated,
flagged_timestamps=self.get_flagged_timestamps(frame_results),
)
class FrameSampler:
"""
Smart frame sampling for video analysis
"""
def __init__(self, strategy: str = "scene_change"):
self.strategy = strategy
self.scene_detector = SceneDetector()
def sample(self, video_path: str, max_frames: int = 30) -> List[np.ndarray]:
"""
Sample key frames from video
"""
if self.strategy == "uniform":
return self.uniform_sample(video_path, max_frames)
elif self.strategy == "scene_change":
return self.scene_change_sample(video_path, max_frames)
def scene_change_sample(self, video_path: str, max_frames: int) -> List[np.ndarray]:
"""
Sample frames at scene changes
"""
scenes = self.scene_detector.detect(video_path)
frames = []
cap = cv2.VideoCapture(video_path)
for scene in scenes[:max_frames]:
cap.set(cv2.CAP_PROP_POS_FRAMES, scene.start_frame)
ret, frame = cap.read()
if ret:
frames.append(frame)
cap.release()
return frames
class AudioModerationService:
"""
Audio/speech content moderation
"""
def __init__(self):
self.speech_recognizer = SpeechRecognizer()
self.text_moderator = TextModerationService()
self.audio_classifier = AudioClassifier()
def moderate(self, audio_path: str) -> AudioModerationResult:
"""
Moderate audio content
"""
# Speech to text
transcript = self.speech_recognizer.transcribe(audio_path)
# Moderate transcript
text_result = self.text_moderator.moderate(transcript, {})
# Audio-only analysis (screaming, gunshots, etc.)
audio_scores = self.audio_classifier.classify(audio_path)
return AudioModerationResult(
transcript=transcript,
text_moderation=text_result,
audio_event_scores=audio_scores,
)
4. Decision Engine¶
class DecisionEngine:
"""
Aggregate scores and make final decision
"""
def __init__(self):
self.policies = self.load_policies()
self.thresholds = self.load_thresholds()
def decide(
self,
text_result: Optional[ModerationResult],
image_result: Optional[ImageModerationResult],
video_result: Optional[VideoModerationResult],
context: dict
) -> Decision:
"""
Make final moderation decision
"""
# Aggregate all scores
all_scores = self.aggregate_scores(text_result, image_result, video_result)
# Apply policy rules
for policy in self.policies:
if policy.matches(all_scores, context):
return Decision(
action=policy.action,
reason=policy.name,
scores=all_scores,
confidence=self.calculate_confidence(all_scores)
)
# Default threshold-based decision
max_score = max(all_scores.values())
max_category = max(all_scores, key=all_scores.get)
if max_score > self.thresholds.reject:
action = "reject"
elif max_score > self.thresholds.review:
action = "review"
else:
action = "approve"
return Decision(
action=action,
reason=max_category,
scores=all_scores,
confidence=max_score
)
def aggregate_scores(self, text_result, image_result, video_result) -> dict:
"""
Combine scores from all modalities
"""
scores = defaultdict(float)
if text_result:
for category, score in text_result.raw_scores.items():
scores[category] = max(scores[category], score)
if image_result:
scores["nsfw"] = max(scores["nsfw"], image_result.nsfw_score["porn"])
scores["violence"] = max(scores["violence"], image_result.violence_score)
if video_result:
for category, score in video_result.aggregated_scores.items():
scores[category] = max(scores[category], score)
return dict(scores)
class PolicyEngine:
"""
Rule-based policy application
"""
def __init__(self):
self.policies = [
# Zero tolerance
Policy(
name="child_safety",
condition=lambda s, c: s.get("child_exploitation", 0) > 0.5,
action="reject",
priority=1
),
Policy(
name="terrorism",
condition=lambda s, c: s.get("terrorism", 0) > 0.8,
action="reject",
priority=2
),
# Context-dependent
Policy(
name="news_exception",
condition=lambda s, c: (
s.get("violence", 0) > 0.7 and
c.get("is_news_account", False)
),
action="review", # Don't auto-reject news
priority=3
),
# Regional
Policy(
name="germany_hate_speech",
condition=lambda s, c: (
s.get("hate_speech", 0) > 0.5 and
c.get("region") == "DE"
),
action="reject", # Stricter in Germany
priority=4
),
]
5. Human Review System¶
class HumanReviewSystem:
"""
Queue and workflow for human moderators
"""
def __init__(self):
self.queue = PriorityQueue()
self.assignment = ModeratorAssignment()
self.quality_control = QualityControl()
def enqueue(self, content: Content, decision: Decision):
"""
Add content to review queue
"""
priority = self.calculate_priority(decision)
review_item = ReviewItem(
content_id=content.id,
content_type=content.type,
ml_decision=decision,
priority=priority,
created_at=datetime.now(),
sla_deadline=self.calculate_sla(priority),
)
self.queue.put(review_item)
def calculate_priority(self, decision: Decision) -> int:
"""
Higher priority for more severe violations
"""
severity_weights = {
"child_exploitation": 10,
"terrorism": 9,
"self_harm": 8,
"violence": 7,
"hate_speech": 6,
"nsfw": 5,
"harassment": 4,
"spam": 2,
}
base_priority = severity_weights.get(decision.reason, 1)
# Boost for high-reach content
if decision.content_reach > 10000:
base_priority += 2
return base_priority
def assign_to_moderator(self, moderator_id: str) -> ReviewItem:
"""
Get next item for moderator
"""
# Consider moderator specialization
moderator = self.get_moderator(moderator_id)
# Get suitable item from queue
item = self.queue.get_for_specialization(moderator.specialization)
# Lock item
item.assigned_to = moderator_id
item.assigned_at = datetime.now()
return item
def submit_decision(
self,
item_id: str,
moderator_id: str,
decision: str,
notes: str
):
"""
Moderator submits their decision
"""
item = self.get_item(item_id)
# Quality control check (random sample)
if self.quality_control.should_audit(moderator_id):
self.quality_control.queue_for_audit(item, decision)
# Record decision
item.human_decision = decision
item.decided_at = datetime.now()
item.notes = notes
# Apply action
self.apply_moderation_action(item, decision)
# Update ML feedback
self.send_feedback_to_ml(item)
Infrastructure¶
Service Architecture¶
apiVersion: apps/v1
kind: Deployment
metadata:
name: content-moderation
spec:
replicas: 50
template:
spec:
containers:
- name: moderation-api
image: moderation-service:v2.1
resources:
requests:
memory: "4Gi"
cpu: "2"
limits:
memory: "8Gi"
cpu: "4"
- name: text-model
image: text-moderation-model:v1.5
resources:
requests:
memory: "8Gi"
cpu: "4"
nvidia.com/gpu: "1"
- name: image-model
image: image-moderation-model:v1.3
resources:
requests:
memory: "8Gi"
cpu: "4"
nvidia.com/gpu: "1"
Processing Pipeline¶
graph TD
UP["Content Upload"] --> KF["Kafka Topic"]
KF --> WP["Worker Pool"]
WP --> MLM["ML Models"]
MLM --> DE["Decision Engine"]
MLM --> MS["Model Serving<br/>(TF Serving + Triton)"]
style UP fill:#e8eaf6,stroke:#3f51b5
style KF fill:#fff3e0,stroke:#ef6c00
style WP fill:#e8eaf6,stroke:#3f51b5
style MLM fill:#f3e5f5,stroke:#9c27b0
style DE fill:#e8f5e9,stroke:#4caf50
style MS fill:#fff3e0,stroke:#ef6c00
Типичные заблуждения¶
Заблуждение: OCR для image moderation -- это optional nice-to-have
Мемы с текстом (image macros) составляют 30-40% hate speech контента на платформах. Без OCR + text moderation pipeline для извлечённого текста система пропускает значительную долю нарушений. Latency impact минимален: современные OCR engines (PaddleOCR, EasyOCR) работают за 5-15ms на изображение.
Заблуждение: для видео достаточно проанализировать несколько случайных кадров
Uniform sampling пропускает 15-25% нарушений, которые происходят в коротких сценах. Scene-change sampling (детекция смены сцен + анализ ключевого кадра из каждой) ловит на 20% больше нарушений при том же количестве кадров (~30 для 5-минутного видео). Плюс audio channel обязателен: hate speech в речи, gunshots, explicit lyrics.
Заблуждение: zero-tolerance policies не требуют Human Review
Даже для child safety (zero tolerance), auto-reject без human review создаёт legal risk: (1) ложные обвинения в серьёзном преступлении, (2) удаление легитимного медицинского/образовательного контента. Правильный подход: auto-remove из публичного доступа немедленно, но queue для human review с SLA < 1 час для подтверждения и возможного law enforcement reporting.
Вопросы с оценкой ответов¶
Как организовать priority queue для Human Review System?
"FIFO -- первый пришёл, первый обработан" -- игнорирует severity и reach
"Priority по двум осям: severity (child safety=10, terrorism=9, self-harm=8, ..., spam=2) и reach (контент с 10K+ views получает +2 к приоритету). SLA: severity 8+ = review за 1 час, severity 5-7 = 4 часа, severity < 5 = 24 часа. Moderator specialization: не каждый модератор обрабатывает child safety -- нужна специализация + rotation для mental health. Quality control: 5% решений проходят аудит вторым модератором."
Как обеспечить feedback loop от Human Review обратно в ML модели?
"Человеческие решения автоматически добавляются в training set" -- без quality control это деградирует модель
"Трёхступенчатый feedback loop: (1) Решения модераторов логируются с confidence score и notes, (2) Quality-filtered: только решения с inter-annotator agreement > 80% идут в training, (3) Active learning: модель запрашивает human labels для cases с максимальной uncertainty (confidence 0.4-0.6). Weekly model retraining на augmented dataset. Мониторинг: если appeal success rate > 10% на category -- сигнал для пересмотра policy или модели."