Перейти к содержанию

Масштабирование рекомендательной системы

~4 минуты чтения

Предварительно: Компоненты системы | Метрики

Рекомендательная система уровня Netflix обрабатывает 100-150K RPS с пиками до 500K в праздники, при latency budget всего 100ms (p99). Это означает 50+ stateless pods, Redis Cluster из 10 шардов на 100GB, FAISS-индекс на 100M векторов и GPU-серверы для deep ranking -- всё за ~$30K/месяц. Ключевые инженерные решения: graceful degradation (4 уровня деградации при перегрузке), multi-level caching (L1 local + L2 Redis + CDN), request batching для GPU inference. На интервью масштабирование -- это финальная часть, где отличают Senior от Staff: не "добавим серверов", а конкретные latency budgets, cost/RPS trade-offs и disaster recovery с RTO < 15 минут.

Traffic Patterns

Peak Load Analysis

Peak: 150K RPS (evening prime time)
Off-peak: 30K RPS (night)
Spike events: Black Friday 3x, New Year 5x
Время суток RPS Примечание
00:00-06:00 30K Минимальная нагрузка
06:00-12:00 50-100K Утренний рост
12:00-18:00 100-150K Пиковый трафик
18:00-00:00 80-100K Вечерний спад

Horizontal Scaling Strategy

Service Scaling

# HPA configuration
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: recommendation-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: recommendation-service
  minReplicas: 10
  maxReplicas: 100
  metrics:
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 70
    - type: Pods
      pods:
        metric:
          name: requests_per_second
        target:
          type: AverageValue
          averageValue: "1000"
  behavior:
    scaleUp:
      stabilizationWindowSeconds: 60
      policies:
        - type: Percent
          value: 100
          periodSeconds: 60
    scaleDown:
      stabilizationWindowSeconds: 300
      policies:
        - type: Percent
          value: 10
          periodSeconds: 60

Data Store Scaling

Component Scaling Strategy Capacity
Feature Store (Redis) Cluster mode, 10 shards 100GB, 1M ops/s
Item Index (Elasticsearch) 5 shards, 2 replicas 10M docs, 50K qps
Vector DB (Milvus) Distributed mode 100M vectors, 10K qps
Model Cache Local + distributed 10GB per node

Latency Optimization

End-to-End Latency Budget

Stage Budget Actual (p50) Actual (p99)
Network (client) 10ms 3ms 8ms
API Gateway 5ms 1ms 3ms
Feature Retrieval 10ms 3ms 8ms
Candidate Gen 20ms 8ms 15ms
Ranking Model 40ms 15ms 35ms
Filtering 5ms 2ms 4ms
Blending 5ms 2ms 4ms
Response 5ms 1ms 3ms
TOTAL 100ms 35ms 80ms

Optimization Techniques

# 1. Parallel execution
async def get_recommendations(user_id: str) -> List[Item]:
    # Run independent operations in parallel
    user_features, item_candidates, context = await asyncio.gather(
        feature_store.get_user_features(user_id),
        candidate_generator.generate(user_id),
        context_service.get_context()
    )

    # Sequential dependent operations
    ranked = await ranker.rank(user_features, item_candidates, context)
    filtered = await filter_layer.filter(ranked)

    return filtered

# 2. Request batching
class BatchedModelServer:
    def __init__(self):
        self.batch_queue = asyncio.Queue()
        self.batch_size = 32
        self.batch_timeout_ms = 5

    async def predict(self, features: Features) -> float:
        future = asyncio.Future()
        await self.batch_queue.put((features, future))
        return await future

    async def batch_worker(self):
        while True:
            batch = []
            try:
                while len(batch) < self.batch_size:
                    item = await asyncio.wait_for(
                        self.batch_queue.get(),
                        timeout=self.batch_timeout_ms / 1000
                    )
                    batch.append(item)
            except asyncio.TimeoutError:
                pass

            if batch:
                features = [f for f, _ in batch]
                predictions = self.model.predict_batch(features)
                for (_, future), pred in zip(batch, predictions):
                    future.set_result(pred)

# 3. Caching strategies
class MultiLevelCache:
    def __init__(self):
        self.l1_cache = LocalLRUCache(maxsize=10000)  # In-memory
        self.l2_cache = RedisCache()                   # Distributed

    async def get(self, key: str) -> Optional[Any]:
        # L1: Local cache (< 1ms)
        if key in self.l1_cache:
            return self.l1_cache[key]

        # L2: Redis (< 5ms)
        value = await self.l2_cache.get(key)
        if value:
            self.l1_cache[key] = value

        return value

Reliability Patterns

Circuit Breaker

from circuitbreaker import circuit

class RecommendationService:

    @circuit(failure_threshold=5, recovery_timeout=30)
    async def get_personalized_recommendations(self, user_id: str):
        """
        Personalized recommendations with circuit breaker
        """
        return await self._get_recommendations(user_id, personalized=True)

    async def get_recommendations_with_fallback(self, user_id: str):
        try:
            return await self.get_personalized_recommendations(user_id)
        except CircuitBreakerError:
            # Fallback to popular items
            return await self.get_popular_items()

Graceful Degradation

class GracefulDegradation:
    """
    Progressive feature degradation under load
    """

    def __init__(self):
        self.load_levels = {
            "normal": self.full_pipeline,
            "elevated": self.reduced_pipeline,
            "high": self.minimal_pipeline,
            "critical": self.fallback_pipeline,
        }

    def full_pipeline(self, user_id):
        """All features enabled"""
        return Pipeline(
            candidate_sources=["ann", "cf", "content", "popular"],
            ranking_model="deep_ranking",
            num_candidates=1000,
            enable_diversity=True,
            enable_explanations=True,
        )

    def reduced_pipeline(self, user_id):
        """Reduced features"""
        return Pipeline(
            candidate_sources=["ann", "popular"],
            ranking_model="xgboost",
            num_candidates=500,
            enable_diversity=True,
            enable_explanations=False,
        )

    def minimal_pipeline(self, user_id):
        """Minimal processing"""
        return Pipeline(
            candidate_sources=["popular"],
            ranking_model="simple_scorer",
            num_candidates=100,
            enable_diversity=False,
            enable_explanations=False,
        )

    def fallback_pipeline(self, user_id):
        """Static fallback"""
        return Pipeline(
            use_precomputed=True,
        )

Timeout Management

class TimeoutManager:
    def __init__(self, total_budget_ms: int = 100):
        self.total_budget_ms = total_budget_ms
        self.start_time = None

    def start(self):
        self.start_time = time.monotonic()

    def remaining_ms(self) -> int:
        elapsed = (time.monotonic() - self.start_time) * 1000
        return max(0, self.total_budget_ms - elapsed)

    def should_skip(self, stage: str, min_required_ms: int) -> bool:
        """Check if we have time for this stage"""
        return self.remaining_ms() < min_required_ms

    async def with_timeout(self, coro, timeout_ms: int, fallback):
        """Execute with timeout and fallback"""
        try:
            return await asyncio.wait_for(
                coro,
                timeout=min(timeout_ms, self.remaining_ms()) / 1000
            )
        except asyncio.TimeoutError:
            return fallback

Cost Optimization

Compute Cost Analysis

Component Instance Type Count Cost/Month
API Servers c5.2xlarge 20 $4,800
Ranking Servers (GPU) g4dn.xlarge 10 $5,200
Feature Store (Redis) r5.4xlarge 10 $8,000
Vector DB (Milvus) r5.8xlarge 5 $10,000
Load Balancer ALB 2 $400
Data Transfer - - $2,000
TOTAL ~$30,400

Cost Reduction Strategies

# 1. Spot instances for batch processing
# 2. Reserved instances for baseline
# 3. Precomputed recommendations for cold users
# 4. Model quantization for faster inference
# 5. Tiered storage (hot/warm/cold)

class CostOptimizedPipeline:
    def __init__(self):
        self.user_tier_cache = {}

    def get_pipeline_tier(self, user_id: str) -> str:
        """
        Different pipelines based on user value
        """
        user_value = self.get_user_value(user_id)

        if user_value > 1000:  # High-value user
            return "premium"   # Full pipeline, GPU ranking
        elif user_value > 100:
            return "standard"  # Standard pipeline
        else:
            return "basic"     # Precomputed + simple ranking

Disaster Recovery

Multi-Region Setup

graph TD
    GLB["Global LB<br/>(Route 53)"] --> USE["US-East<br/>Primary"]
    GLB --> EUW["EU-West<br/>Secondary"]
    GLB --> APS["AP-South<br/>Secondary"]

    USE --> USFS["Regional<br/>Feature Store"]
    EUW --> EUFS["Regional<br/>Feature Store"]
    APS --> APFS["Regional<br/>Feature Store"]

    style GLB fill:#fce4ec,stroke:#c62828
    style USE fill:#e8eaf6,stroke:#3f51b5
    style EUW fill:#e8f5e9,stroke:#4caf50
    style APS fill:#e8f5e9,stroke:#4caf50
    style USFS fill:#fff3e0,stroke:#ef6c00
    style EUFS fill:#fff3e0,stroke:#ef6c00
    style APFS fill:#fff3e0,stroke:#ef6c00

Recovery Procedures

Scenario RTO RPO Recovery Action
Single pod failure 30s 0 Auto-restart by K8s
Node failure 2min 0 Pod rescheduling
AZ failure 5min 0 Traffic shift to other AZs
Region failure 15min 1min DNS failover to secondary
Data corruption 1hour 1hour Restore from backup

Заблуждение: Горизонтальное масштабирование решает все проблемы

Добавление серверов помогает только для stateless компонентов (API, ranking). Но Feature Store (Redis), Vector DB (FAISS/Milvus) и Model Store -- stateful, их масштабирование требует sharding, replication и consistency guarantees. Redis Cluster при 10 shards теряет ~15% throughput на cross-shard операциях. FAISS-индекс на 100M векторов занимает 50GB RAM -- его нужно шардировать по машинам. Без правильного data partitioning горизонтальное масштабирование увеличивает latency.

Заблуждение: Caching -- это просто добавить Redis

Multi-level caching (L1 local LRU + L2 Redis + L3 CDN) даёт 90%+ hit rate, но создаёт проблему cache invalidation. При обновлении item (цена, stock) нужно инвалидировать все 3 уровня. TTL-based invalidation приводит к показу устаревших данных (item out of stock). Event-driven invalidation через Kafka + pub/sub решает проблему, но добавляет 10-50ms latency на write path. Trade-off: freshness vs latency.

Заблуждение: GPU нужны для всего пайплайна

GPU нужны только для Fine Ranking (deep model, 200 candidates). Candidate Generation (ANN search) эффективнее на CPU с SIMD (FAISS). Coarse Ranking (XGBoost) работает на CPU. Feature Store (Redis) -- CPU only. GPU-серверы стоят 3-5x дороже CPU. При 100K RPS нужно 10 GPU серверов (\(5,200/мес) только для ranking, всё остальное -- на CPU (\)4,800/мес). Оптимизация: model quantization (FP32 -> INT8) снижает latency на 2-3x и позволяет использовать CPU вместо GPU.

Собеседование

Как масштабировать рекомендательную систему до 100K RPS?

❌ "Добавим больше серверов за load balancer."

✅ "Stateless компоненты (API, ranking service): HPA с min=10, max=100 pods, target CPU 70%. Каждый pod обрабатывает ~2K RPS. Feature Store: Redis Cluster 10 shards по 10GB, 1M ops/s суммарно. Vector DB: FAISS-индекс шардированный на 5 машин (по 2M embeddings). GPU ranking: 10 серверов g4dn.xlarge с request batching (batch=32, timeout=5ms). Multi-level cache: L1 local (10K entries, < 1ms) + L2 Redis (< 5ms). Итого: ~50 pods + 10 Redis shards + 5 FAISS shards + 10 GPU servers = ~$30K/month."

Как обеспечить latency < 100ms при таком масштабе?

❌ "Используем быстрые серверы и SSD."

✅ "Latency budget: Network 10ms + Gateway 5ms + Feature Retrieval 10ms + Candidate Gen 20ms + Ranking 40ms + Filtering 5ms + Blending 5ms + Response 5ms = 100ms. Ключевые оптимизации: (1) Parallel execution -- feature retrieval, candidate gen и context параллельно через asyncio.gather; (2) Request batching -- GPU inference batch=32 с timeout=5ms; (3) Multi-level cache -- L1 local < 1ms, L2 Redis < 5ms, hit rate > 90%; (4) Timeout manager -- если этап превышает budget, skip с fallback. P50 = 35ms, P99 = 80ms."

Что делать при перегрузке системы?

❌ "Возвращаем ошибку 503 и ждём."

✅ "4 уровня graceful degradation: Normal -- full pipeline (ANN + CF + content, deep ranking, diversity, explanations). Elevated -- reduced pipeline (ANN + popular, XGBoost, без explanations). High -- minimal pipeline (popular only, simple scorer). Critical -- precomputed static recommendations. Circuit breaker: 5 failures -> fallback на 30s. Timeout manager: если осталось < 20ms, skip ranking и вернуть candidates as-is. Никогда не 503 -- всегда fallback, пусть менее персонализированный."