Масштабирование рекомендательной системы¶
~4 минуты чтения
Предварительно: Компоненты системы | Метрики
Рекомендательная система уровня Netflix обрабатывает 100-150K RPS с пиками до 500K в праздники, при latency budget всего 100ms (p99). Это означает 50+ stateless pods, Redis Cluster из 10 шардов на 100GB, FAISS-индекс на 100M векторов и GPU-серверы для deep ranking -- всё за ~$30K/месяц. Ключевые инженерные решения: graceful degradation (4 уровня деградации при перегрузке), multi-level caching (L1 local + L2 Redis + CDN), request batching для GPU inference. На интервью масштабирование -- это финальная часть, где отличают Senior от Staff: не "добавим серверов", а конкретные latency budgets, cost/RPS trade-offs и disaster recovery с RTO < 15 минут.
Traffic Patterns¶
Peak Load Analysis¶
Peak: 150K RPS (evening prime time)
Off-peak: 30K RPS (night)
Spike events: Black Friday 3x, New Year 5x
| Время суток | RPS | Примечание |
|---|---|---|
| 00:00-06:00 | 30K | Минимальная нагрузка |
| 06:00-12:00 | 50-100K | Утренний рост |
| 12:00-18:00 | 100-150K | Пиковый трафик |
| 18:00-00:00 | 80-100K | Вечерний спад |
Horizontal Scaling Strategy¶
Service Scaling¶
# HPA configuration
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: recommendation-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: recommendation-service
minReplicas: 10
maxReplicas: 100
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Pods
pods:
metric:
name: requests_per_second
target:
type: AverageValue
averageValue: "1000"
behavior:
scaleUp:
stabilizationWindowSeconds: 60
policies:
- type: Percent
value: 100
periodSeconds: 60
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 10
periodSeconds: 60
Data Store Scaling¶
| Component | Scaling Strategy | Capacity |
|---|---|---|
| Feature Store (Redis) | Cluster mode, 10 shards | 100GB, 1M ops/s |
| Item Index (Elasticsearch) | 5 shards, 2 replicas | 10M docs, 50K qps |
| Vector DB (Milvus) | Distributed mode | 100M vectors, 10K qps |
| Model Cache | Local + distributed | 10GB per node |
Latency Optimization¶
End-to-End Latency Budget¶
| Stage | Budget | Actual (p50) | Actual (p99) |
|---|---|---|---|
| Network (client) | 10ms | 3ms | 8ms |
| API Gateway | 5ms | 1ms | 3ms |
| Feature Retrieval | 10ms | 3ms | 8ms |
| Candidate Gen | 20ms | 8ms | 15ms |
| Ranking Model | 40ms | 15ms | 35ms |
| Filtering | 5ms | 2ms | 4ms |
| Blending | 5ms | 2ms | 4ms |
| Response | 5ms | 1ms | 3ms |
| TOTAL | 100ms | 35ms | 80ms |
Optimization Techniques¶
# 1. Parallel execution
async def get_recommendations(user_id: str) -> List[Item]:
# Run independent operations in parallel
user_features, item_candidates, context = await asyncio.gather(
feature_store.get_user_features(user_id),
candidate_generator.generate(user_id),
context_service.get_context()
)
# Sequential dependent operations
ranked = await ranker.rank(user_features, item_candidates, context)
filtered = await filter_layer.filter(ranked)
return filtered
# 2. Request batching
class BatchedModelServer:
def __init__(self):
self.batch_queue = asyncio.Queue()
self.batch_size = 32
self.batch_timeout_ms = 5
async def predict(self, features: Features) -> float:
future = asyncio.Future()
await self.batch_queue.put((features, future))
return await future
async def batch_worker(self):
while True:
batch = []
try:
while len(batch) < self.batch_size:
item = await asyncio.wait_for(
self.batch_queue.get(),
timeout=self.batch_timeout_ms / 1000
)
batch.append(item)
except asyncio.TimeoutError:
pass
if batch:
features = [f for f, _ in batch]
predictions = self.model.predict_batch(features)
for (_, future), pred in zip(batch, predictions):
future.set_result(pred)
# 3. Caching strategies
class MultiLevelCache:
def __init__(self):
self.l1_cache = LocalLRUCache(maxsize=10000) # In-memory
self.l2_cache = RedisCache() # Distributed
async def get(self, key: str) -> Optional[Any]:
# L1: Local cache (< 1ms)
if key in self.l1_cache:
return self.l1_cache[key]
# L2: Redis (< 5ms)
value = await self.l2_cache.get(key)
if value:
self.l1_cache[key] = value
return value
Reliability Patterns¶
Circuit Breaker¶
from circuitbreaker import circuit
class RecommendationService:
@circuit(failure_threshold=5, recovery_timeout=30)
async def get_personalized_recommendations(self, user_id: str):
"""
Personalized recommendations with circuit breaker
"""
return await self._get_recommendations(user_id, personalized=True)
async def get_recommendations_with_fallback(self, user_id: str):
try:
return await self.get_personalized_recommendations(user_id)
except CircuitBreakerError:
# Fallback to popular items
return await self.get_popular_items()
Graceful Degradation¶
class GracefulDegradation:
"""
Progressive feature degradation under load
"""
def __init__(self):
self.load_levels = {
"normal": self.full_pipeline,
"elevated": self.reduced_pipeline,
"high": self.minimal_pipeline,
"critical": self.fallback_pipeline,
}
def full_pipeline(self, user_id):
"""All features enabled"""
return Pipeline(
candidate_sources=["ann", "cf", "content", "popular"],
ranking_model="deep_ranking",
num_candidates=1000,
enable_diversity=True,
enable_explanations=True,
)
def reduced_pipeline(self, user_id):
"""Reduced features"""
return Pipeline(
candidate_sources=["ann", "popular"],
ranking_model="xgboost",
num_candidates=500,
enable_diversity=True,
enable_explanations=False,
)
def minimal_pipeline(self, user_id):
"""Minimal processing"""
return Pipeline(
candidate_sources=["popular"],
ranking_model="simple_scorer",
num_candidates=100,
enable_diversity=False,
enable_explanations=False,
)
def fallback_pipeline(self, user_id):
"""Static fallback"""
return Pipeline(
use_precomputed=True,
)
Timeout Management¶
class TimeoutManager:
def __init__(self, total_budget_ms: int = 100):
self.total_budget_ms = total_budget_ms
self.start_time = None
def start(self):
self.start_time = time.monotonic()
def remaining_ms(self) -> int:
elapsed = (time.monotonic() - self.start_time) * 1000
return max(0, self.total_budget_ms - elapsed)
def should_skip(self, stage: str, min_required_ms: int) -> bool:
"""Check if we have time for this stage"""
return self.remaining_ms() < min_required_ms
async def with_timeout(self, coro, timeout_ms: int, fallback):
"""Execute with timeout and fallback"""
try:
return await asyncio.wait_for(
coro,
timeout=min(timeout_ms, self.remaining_ms()) / 1000
)
except asyncio.TimeoutError:
return fallback
Cost Optimization¶
Compute Cost Analysis¶
| Component | Instance Type | Count | Cost/Month |
|---|---|---|---|
| API Servers | c5.2xlarge | 20 | $4,800 |
| Ranking Servers (GPU) | g4dn.xlarge | 10 | $5,200 |
| Feature Store (Redis) | r5.4xlarge | 10 | $8,000 |
| Vector DB (Milvus) | r5.8xlarge | 5 | $10,000 |
| Load Balancer | ALB | 2 | $400 |
| Data Transfer | - | - | $2,000 |
| TOTAL | ~$30,400 |
Cost Reduction Strategies¶
# 1. Spot instances for batch processing
# 2. Reserved instances for baseline
# 3. Precomputed recommendations for cold users
# 4. Model quantization for faster inference
# 5. Tiered storage (hot/warm/cold)
class CostOptimizedPipeline:
def __init__(self):
self.user_tier_cache = {}
def get_pipeline_tier(self, user_id: str) -> str:
"""
Different pipelines based on user value
"""
user_value = self.get_user_value(user_id)
if user_value > 1000: # High-value user
return "premium" # Full pipeline, GPU ranking
elif user_value > 100:
return "standard" # Standard pipeline
else:
return "basic" # Precomputed + simple ranking
Disaster Recovery¶
Multi-Region Setup¶
graph TD
GLB["Global LB<br/>(Route 53)"] --> USE["US-East<br/>Primary"]
GLB --> EUW["EU-West<br/>Secondary"]
GLB --> APS["AP-South<br/>Secondary"]
USE --> USFS["Regional<br/>Feature Store"]
EUW --> EUFS["Regional<br/>Feature Store"]
APS --> APFS["Regional<br/>Feature Store"]
style GLB fill:#fce4ec,stroke:#c62828
style USE fill:#e8eaf6,stroke:#3f51b5
style EUW fill:#e8f5e9,stroke:#4caf50
style APS fill:#e8f5e9,stroke:#4caf50
style USFS fill:#fff3e0,stroke:#ef6c00
style EUFS fill:#fff3e0,stroke:#ef6c00
style APFS fill:#fff3e0,stroke:#ef6c00
Recovery Procedures¶
| Scenario | RTO | RPO | Recovery Action |
|---|---|---|---|
| Single pod failure | 30s | 0 | Auto-restart by K8s |
| Node failure | 2min | 0 | Pod rescheduling |
| AZ failure | 5min | 0 | Traffic shift to other AZs |
| Region failure | 15min | 1min | DNS failover to secondary |
| Data corruption | 1hour | 1hour | Restore from backup |
Заблуждение: Горизонтальное масштабирование решает все проблемы
Добавление серверов помогает только для stateless компонентов (API, ranking). Но Feature Store (Redis), Vector DB (FAISS/Milvus) и Model Store -- stateful, их масштабирование требует sharding, replication и consistency guarantees. Redis Cluster при 10 shards теряет ~15% throughput на cross-shard операциях. FAISS-индекс на 100M векторов занимает 50GB RAM -- его нужно шардировать по машинам. Без правильного data partitioning горизонтальное масштабирование увеличивает latency.
Заблуждение: Caching -- это просто добавить Redis
Multi-level caching (L1 local LRU + L2 Redis + L3 CDN) даёт 90%+ hit rate, но создаёт проблему cache invalidation. При обновлении item (цена, stock) нужно инвалидировать все 3 уровня. TTL-based invalidation приводит к показу устаревших данных (item out of stock). Event-driven invalidation через Kafka + pub/sub решает проблему, но добавляет 10-50ms latency на write path. Trade-off: freshness vs latency.
Заблуждение: GPU нужны для всего пайплайна
GPU нужны только для Fine Ranking (deep model, 200 candidates). Candidate Generation (ANN search) эффективнее на CPU с SIMD (FAISS). Coarse Ranking (XGBoost) работает на CPU. Feature Store (Redis) -- CPU only. GPU-серверы стоят 3-5x дороже CPU. При 100K RPS нужно 10 GPU серверов (\(5,200/мес) только для ranking, всё остальное -- на CPU (\)4,800/мес). Оптимизация: model quantization (FP32 -> INT8) снижает latency на 2-3x и позволяет использовать CPU вместо GPU.
Собеседование¶
Как масштабировать рекомендательную систему до 100K RPS?¶
"Добавим больше серверов за load balancer."
"Stateless компоненты (API, ranking service): HPA с min=10, max=100 pods, target CPU 70%. Каждый pod обрабатывает ~2K RPS. Feature Store: Redis Cluster 10 shards по 10GB, 1M ops/s суммарно. Vector DB: FAISS-индекс шардированный на 5 машин (по 2M embeddings). GPU ranking: 10 серверов g4dn.xlarge с request batching (batch=32, timeout=5ms). Multi-level cache: L1 local (10K entries, < 1ms) + L2 Redis (< 5ms). Итого: ~50 pods + 10 Redis shards + 5 FAISS shards + 10 GPU servers = ~$30K/month."
Как обеспечить latency < 100ms при таком масштабе?¶
"Используем быстрые серверы и SSD."
"Latency budget: Network 10ms + Gateway 5ms + Feature Retrieval 10ms + Candidate Gen 20ms + Ranking 40ms + Filtering 5ms + Blending 5ms + Response 5ms = 100ms. Ключевые оптимизации: (1) Parallel execution -- feature retrieval, candidate gen и context параллельно через asyncio.gather; (2) Request batching -- GPU inference batch=32 с timeout=5ms; (3) Multi-level cache -- L1 local < 1ms, L2 Redis < 5ms, hit rate > 90%; (4) Timeout manager -- если этап превышает budget, skip с fallback. P50 = 35ms, P99 = 80ms."
Что делать при перегрузке системы?¶
"Возвращаем ошибку 503 и ждём."
"4 уровня graceful degradation: Normal -- full pipeline (ANN + CF + content, deep ranking, diversity, explanations). Elevated -- reduced pipeline (ANN + popular, XGBoost, без explanations). High -- minimal pipeline (popular only, simple scorer). Critical -- precomputed static recommendations. Circuit breaker: 5 failures -> fallback на 30s. Timeout manager: если осталось < 20ms, skip ranking и вернуть candidates as-is. Никогда не 503 -- всегда fallback, пусть менее персонализированный."