Компоненты рекомендательной системы¶
~3 минуты чтения
Предварительно: Определение задачи | Embedding-модели
Рекомендательный пайплайн в production -- это воронка из 4-6 компонентов, каждый с собственным latency budget. Candidate Generation отсекает 99.99% каталога за 5-10ms, Ranking Model точно скорит оставшиеся 1000 кандидатов за 30-40ms, а Filtering и Blending добавляют бизнес-логику за 5ms. В Netflix этот пайплайн обрабатывает 100K+ RPS с p99 < 100ms, обслуживая 230M подписчиков. Понимание каждого компонента и его latency budget -- ключ к успешному прохождению System Design интервью.
High-Level Architecture¶
graph TD
A["Client Request<br/>GET /recommendations?user_id=123"] --> B["API Gateway<br/>Rate Limiting, Auth, Routing"]
B --> C["Recommendation Service"]
C --> D["Candidate Generation"]
D --> E["Ranking Model"]
E --> F["Filtering Layer"]
F --> G["Blending Layer"]
G --> H["Response"]
C -.-> I[("Feature Store<br/>Redis")]
C -.-> J[("Model Store<br/>MLflow")]
C -.-> K[("Item Index<br/>Elasticsearch")]
style C fill:#e8eaf6,stroke:#3f51b5
style D fill:#fff3e0,stroke:#ff9800
style E fill:#fff3e0,stroke:#ff9800
style F fill:#fff3e0,stroke:#ff9800
style G fill:#fff3e0,stroke:#ff9800
Component Details¶
1. Candidate Generation (Retrieval)¶
Цель: Из миллионов items выбрать ~1000 кандидатов за < 10ms
class CandidateGenerator:
"""
Multi-source candidate retrieval
"""
def __init__(self):
self.sources = [
AnnRetriever(), # Approximate Nearest Neighbors
CollaborativeRetriever(), # User-based CF
ContentRetriever(), # Content-based
PopularityRetriever(), # Trending items
PersonalHistoryRetriever(), # Recently viewed
]
def generate(self, user_id: str, context: dict) -> List[Candidate]:
"""
Retrieve candidates from multiple sources
"""
all_candidates = []
# Parallel retrieval from all sources
with ThreadPoolExecutor() as executor:
futures = [
executor.submit(source.retrieve, user_id, context)
for source in self.sources
]
for future in futures:
all_candidates.extend(future.result())
# Deduplicate and merge scores
return self.merge_candidates(all_candidates)
Методы retrieval:
| Метод | Latency | Recall | Описание |
|---|---|---|---|
| ANN (FAISS/ScaNN) | 1-5ms | High | Vector similarity search |
| Two-Tower | 5-10ms | High | User-item embeddings |
| Graph-based | 10-20ms | Medium | Random walks on user-item graph |
| Rule-based | 1ms | Low | Popularity, recency |
2. Ranking Model¶
Цель: Из ~1000 кандидатов выбрать топ-100 с точным скорингом
class RankingModel:
"""
Two-stage ranking: coarse + fine
"""
def __init__(self):
self.coarse_ranker = XGBoostRanker() # Fast, less features
self.fine_ranker = DeepRankingModel() # Slow, all features
def rank(
self,
user: UserFeatures,
candidates: List[Candidate],
context: ContextFeatures
) -> List[ScoredItem]:
# Stage 1: Coarse ranking (1000 -> 200)
coarse_features = self.extract_coarse_features(user, candidates)
coarse_scores = self.coarse_ranker.predict(coarse_features)
top_200 = self.get_top_k(candidates, coarse_scores, k=200)
# Stage 2: Fine ranking (200 -> 100)
fine_features = self.extract_fine_features(user, top_200, context)
fine_scores = self.fine_ranker.predict(fine_features)
top_100 = self.get_top_k(top_200, fine_scores, k=100)
return top_100
Архитектура Deep Ranking Model:
graph TD
UF["User Features"] --> UE["User Embedding Layer"]
IF["Item Features"] --> IE["Item Embedding Layer"]
CF["Context Features"] --> CE["Context Embedding Layer"]
UE --> CONCAT["Concatenation"]
IE --> CONCAT
CE --> CONCAT
CONCAT --> MLP["MLP Layers"]
MLP --> SCORE["Score Output<br/>P(click), P(purchase), etc."]
style UF fill:#e8eaf6,stroke:#3f51b5
style IF fill:#e8f5e9,stroke:#4caf50
style CF fill:#fff3e0,stroke:#ef6c00
style UE fill:#e8eaf6,stroke:#3f51b5
style IE fill:#e8f5e9,stroke:#4caf50
style CE fill:#fff3e0,stroke:#ef6c00
style CONCAT fill:#f3e5f5,stroke:#9c27b0
style MLP fill:#f3e5f5,stroke:#9c27b0
style SCORE fill:#fce4ec,stroke:#c62828
3. Filtering Layer¶
Цель: Убрать неподходящие items
class FilteringLayer:
"""
Business rules and quality filters
"""
def __init__(self):
self.filters = [
AlreadySeenFilter(), # Уже видел
OutOfStockFilter(), # Нет в наличии
GeoRestrictionFilter(), # Географические ограничения
AgeRestrictionFilter(), # Возрастные ограничения
BlockedItemFilter(), # Заблокированные items
PriceRangeFilter(), # Вне ценового диапазона
QualityFilter(), # Низкое качество
]
def filter(
self,
user: User,
items: List[ScoredItem],
context: Context
) -> List[ScoredItem]:
filtered_items = items
for f in self.filters:
filtered_items = f.apply(user, filtered_items, context)
return filtered_items
4. Blending Layer¶
Цель: Финальное формирование списка с diversity и exploration
class BlendingLayer:
"""
Final list composition with diversity and business rules
"""
def __init__(self):
self.diversity_weight = 0.3
self.exploration_rate = 0.1
self.slot_rules = self.load_slot_rules()
def blend(
self,
ranked_items: List[ScoredItem],
user: User,
context: Context
) -> List[RecommendedItem]:
final_list = []
# Slot-based blending
for slot_idx in range(self.num_slots):
slot_rule = self.slot_rules.get(slot_idx)
if slot_rule:
# Fixed slot (e.g., sponsored, new arrivals)
item = self.get_slot_item(slot_rule, user, context)
elif random.random() < self.exploration_rate:
# Exploration slot
item = self.sample_explore_item(ranked_items)
else:
# Regular ranking slot
item = self.select_diverse_item(
ranked_items,
final_list,
self.diversity_weight
)
final_list.append(item)
return self.add_explanations(final_list)
def select_diverse_item(self, candidates, selected, diversity_weight):
"""
MMR-style selection for diversity
"""
best_score = -inf
best_item = None
for item in candidates:
if item in selected:
continue
relevance = item.score
diversity = self.compute_diversity(item, selected)
combined_score = (
(1 - diversity_weight) * relevance +
diversity_weight * diversity
)
if combined_score > best_score:
best_score = combined_score
best_item = item
return best_item
5. Feature Store¶
Цель: Быстрый доступ к features для inference
class FeatureStore:
"""
Unified feature access layer
"""
def __init__(self):
self.online_store = RedisClient() # < 5ms latency
self.offline_store = BigQueryClient() # Batch features
self.feature_registry = FeatureRegistry()
def get_user_features(self, user_id: str) -> UserFeatures:
"""
Get user features from online store
"""
cache_key = f"user:{user_id}"
# Try online store first
features = self.online_store.get(cache_key)
if not features:
# Fallback to default features (cold start)
features = self.get_default_user_features()
return self.feature_registry.parse_user_features(features)
def get_item_features_batch(self, item_ids: List[str]) -> List[ItemFeatures]:
"""
Batch get item features (for ranking)
"""
cache_keys = [f"item:{item_id}" for item_id in item_ids]
features = self.online_store.mget(cache_keys)
return [
self.feature_registry.parse_item_features(f)
for f in features
]
6. Model Serving¶
Цель: Эффективный inference моделей
class ModelServer:
"""
Model serving with batching and caching
"""
def __init__(self):
self.model = self.load_model()
self.batch_size = 100
self.cache = LRUCache(maxsize=10000)
def predict_batch(
self,
features: List[FeatureVector]
) -> List[float]:
"""
Batched prediction for efficiency
"""
# Check cache
cached_results = {}
uncached_features = []
for i, f in enumerate(features):
cache_key = self.compute_cache_key(f)
if cache_key in self.cache:
cached_results[i] = self.cache[cache_key]
else:
uncached_features.append((i, f, cache_key))
# Batch predict uncached
if uncached_features:
indices, feats, keys = zip(*uncached_features)
predictions = self.model.predict(feats)
for idx, pred, key in zip(indices, predictions, keys):
cached_results[idx] = pred
self.cache[key] = pred
# Reconstruct results
return [cached_results[i] for i in range(len(features))]
Infrastructure Components¶
Service Mesh¶
# Kubernetes service configuration
apiVersion: v1
kind: Service
metadata:
name: recommendation-service
spec:
selector:
app: recommendation
ports:
- port: 8080
targetPort: 8080
type: ClusterIP
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: recommendation-deployment
spec:
replicas: 10
selector:
matchLabels:
app: recommendation
template:
spec:
containers:
- name: recommendation
image: recommendation-service:v1.2.3
resources:
requests:
memory: "4Gi"
cpu: "2"
limits:
memory: "8Gi"
cpu: "4"
env:
- name: FEATURE_STORE_URL
value: "redis://feature-store:6379"
- name: MODEL_PATH
value: "s3://models/ranking/v1.2.3"
Monitoring Stack¶
# Prometheus metrics
from prometheus_client import Counter, Histogram
REQUEST_COUNT = Counter(
'recommendation_requests_total',
'Total recommendation requests',
['endpoint', 'status']
)
REQUEST_LATENCY = Histogram(
'recommendation_latency_seconds',
'Request latency',
['stage'], # candidate_gen, ranking, filtering, blending
buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0]
)
MODEL_PREDICTION_LATENCY = Histogram(
'model_prediction_latency_seconds',
'Model inference latency',
['model_name']
)
Заблуждение: Candidate Generation можно пропустить и сразу ранжировать
При 10M items deep ranking model тратит ~1ms на один item. Ранжирование всего каталога = 10 000 секунд. Даже batch inference на GPU обработает максимум 10K items за 100ms. Candidate Generation -- обязательный этап, сокращающий множество с 10M до 1000 за 5-10ms через ANN (FAISS/ScaNN). Без него пайплайн нереализуем.
Заблуждение: Feature Store -- это просто Redis
Feature Store -- это инфраструктурный компонент с двумя слоями: online (Redis/DynamoDB, < 5ms) для serving и offline (S3/BigQuery) для training. Ключевая сложность -- обеспечить consistency между training и serving features (training-serving skew). Feast/Tecton решают эту проблему, а голый Redis -- нет. Training-serving skew -- одна из топ-3 причин деградации ML-моделей в production.
Заблуждение: Diversity -- это просто убрать дубликаты
Diversity -- это алгоритмическая задача (MMR, DPP). Без diversity control система создаёт filter bubbles: пользователь кликнул 2 раза на кроссовки -- и весь фид заполнен кроссовками. Spotify обнаружил, что добавление 30% diversity weight к relevance score увеличило долгосрочный retention на 5%, хотя CTR краткосрочно упал на 2%.
Собеседование¶
Как устроен Candidate Generation?¶
"Берём все items и фильтруем по категории пользователя."
"Candidate Generation -- это параллельный retrieval из нескольких источников: Two-Tower ANN (user/item embeddings в FAISS, 5ms, высокий recall), Collaborative Filtering (item-item similarity matrix, precomputed), Content-based (по атрибутам items), Popularity (trending за 24h для cold start). Результаты мержатся, дедуплицируются и подаются на Ranking. Каждый источник покрывает разные сценарии: ANN -- персонализация, CF -- discovery, Popularity -- cold start."
Зачем нужен двухэтапный Ranking?¶
"Одна модель ранжирует всех кандидатов."
"Coarse Ranking (XGBoost, 100 features) быстро отсекает 80% слабых кандидатов: 1000 -> 200 за 5ms. Fine Ranking (Deep Model, 500+ features, cross-features) точно скорит 200 кандидатов за 30ms. Это trade-off между latency и quality. Одна тяжёлая модель на 1000 candidates потребует 150ms -- больше всего latency budget. Двухэтапная система укладывается в 35ms."
Какую роль играет Blending Layer?¶
"Просто берём top-N из Ranking."
"Blending Layer -- финальная сборка списка: slot-based rules (позиция 3 = sponsored, позиция 7 = new arrival), exploration slots (10% для Thompson Sampling), MMR reranking для diversity (lambda=0.3). Без Blending система скатывается в exploitation -- показывает одно и то же. Blending обеспечивает баланс revenue (sponsored), discovery (exploration) и user satisfaction (diversity)."