Перейти к содержанию

Компоненты рекомендательной системы

~3 минуты чтения

Предварительно: Определение задачи | Embedding-модели

Рекомендательный пайплайн в production -- это воронка из 4-6 компонентов, каждый с собственным latency budget. Candidate Generation отсекает 99.99% каталога за 5-10ms, Ranking Model точно скорит оставшиеся 1000 кандидатов за 30-40ms, а Filtering и Blending добавляют бизнес-логику за 5ms. В Netflix этот пайплайн обрабатывает 100K+ RPS с p99 < 100ms, обслуживая 230M подписчиков. Понимание каждого компонента и его latency budget -- ключ к успешному прохождению System Design интервью.

High-Level Architecture

graph TD
    A["Client Request<br/>GET /recommendations?user_id=123"] --> B["API Gateway<br/>Rate Limiting, Auth, Routing"]
    B --> C["Recommendation Service"]
    C --> D["Candidate Generation"]
    D --> E["Ranking Model"]
    E --> F["Filtering Layer"]
    F --> G["Blending Layer"]
    G --> H["Response"]

    C -.-> I[("Feature Store<br/>Redis")]
    C -.-> J[("Model Store<br/>MLflow")]
    C -.-> K[("Item Index<br/>Elasticsearch")]

    style C fill:#e8eaf6,stroke:#3f51b5
    style D fill:#fff3e0,stroke:#ff9800
    style E fill:#fff3e0,stroke:#ff9800
    style F fill:#fff3e0,stroke:#ff9800
    style G fill:#fff3e0,stroke:#ff9800

Component Details

1. Candidate Generation (Retrieval)

Цель: Из миллионов items выбрать ~1000 кандидатов за < 10ms

class CandidateGenerator:
    """
    Multi-source candidate retrieval
    """

    def __init__(self):
        self.sources = [
            AnnRetriever(),           # Approximate Nearest Neighbors
            CollaborativeRetriever(), # User-based CF
            ContentRetriever(),       # Content-based
            PopularityRetriever(),    # Trending items
            PersonalHistoryRetriever(), # Recently viewed
        ]

    def generate(self, user_id: str, context: dict) -> List[Candidate]:
        """
        Retrieve candidates from multiple sources
        """
        all_candidates = []

        # Parallel retrieval from all sources
        with ThreadPoolExecutor() as executor:
            futures = [
                executor.submit(source.retrieve, user_id, context)
                for source in self.sources
            ]
            for future in futures:
                all_candidates.extend(future.result())

        # Deduplicate and merge scores
        return self.merge_candidates(all_candidates)

Методы retrieval:

Метод Latency Recall Описание
ANN (FAISS/ScaNN) 1-5ms High Vector similarity search
Two-Tower 5-10ms High User-item embeddings
Graph-based 10-20ms Medium Random walks on user-item graph
Rule-based 1ms Low Popularity, recency

2. Ranking Model

Цель: Из ~1000 кандидатов выбрать топ-100 с точным скорингом

class RankingModel:
    """
    Two-stage ranking: coarse + fine
    """

    def __init__(self):
        self.coarse_ranker = XGBoostRanker()  # Fast, less features
        self.fine_ranker = DeepRankingModel()  # Slow, all features

    def rank(
        self,
        user: UserFeatures,
        candidates: List[Candidate],
        context: ContextFeatures
    ) -> List[ScoredItem]:

        # Stage 1: Coarse ranking (1000 -> 200)
        coarse_features = self.extract_coarse_features(user, candidates)
        coarse_scores = self.coarse_ranker.predict(coarse_features)
        top_200 = self.get_top_k(candidates, coarse_scores, k=200)

        # Stage 2: Fine ranking (200 -> 100)
        fine_features = self.extract_fine_features(user, top_200, context)
        fine_scores = self.fine_ranker.predict(fine_features)
        top_100 = self.get_top_k(top_200, fine_scores, k=100)

        return top_100

Архитектура Deep Ranking Model:

graph TD
    UF["User Features"] --> UE["User Embedding Layer"]
    IF["Item Features"] --> IE["Item Embedding Layer"]
    CF["Context Features"] --> CE["Context Embedding Layer"]

    UE --> CONCAT["Concatenation"]
    IE --> CONCAT
    CE --> CONCAT

    CONCAT --> MLP["MLP Layers"]
    MLP --> SCORE["Score Output<br/>P(click), P(purchase), etc."]

    style UF fill:#e8eaf6,stroke:#3f51b5
    style IF fill:#e8f5e9,stroke:#4caf50
    style CF fill:#fff3e0,stroke:#ef6c00
    style UE fill:#e8eaf6,stroke:#3f51b5
    style IE fill:#e8f5e9,stroke:#4caf50
    style CE fill:#fff3e0,stroke:#ef6c00
    style CONCAT fill:#f3e5f5,stroke:#9c27b0
    style MLP fill:#f3e5f5,stroke:#9c27b0
    style SCORE fill:#fce4ec,stroke:#c62828

3. Filtering Layer

Цель: Убрать неподходящие items

class FilteringLayer:
    """
    Business rules and quality filters
    """

    def __init__(self):
        self.filters = [
            AlreadySeenFilter(),      # Уже видел
            OutOfStockFilter(),       # Нет в наличии
            GeoRestrictionFilter(),   # Географические ограничения
            AgeRestrictionFilter(),   # Возрастные ограничения
            BlockedItemFilter(),      # Заблокированные items
            PriceRangeFilter(),       # Вне ценового диапазона
            QualityFilter(),          # Низкое качество
        ]

    def filter(
        self,
        user: User,
        items: List[ScoredItem],
        context: Context
    ) -> List[ScoredItem]:

        filtered_items = items
        for f in self.filters:
            filtered_items = f.apply(user, filtered_items, context)

        return filtered_items

4. Blending Layer

Цель: Финальное формирование списка с diversity и exploration

class BlendingLayer:
    """
    Final list composition with diversity and business rules
    """

    def __init__(self):
        self.diversity_weight = 0.3
        self.exploration_rate = 0.1
        self.slot_rules = self.load_slot_rules()

    def blend(
        self,
        ranked_items: List[ScoredItem],
        user: User,
        context: Context
    ) -> List[RecommendedItem]:

        final_list = []

        # Slot-based blending
        for slot_idx in range(self.num_slots):
            slot_rule = self.slot_rules.get(slot_idx)

            if slot_rule:
                # Fixed slot (e.g., sponsored, new arrivals)
                item = self.get_slot_item(slot_rule, user, context)
            elif random.random() < self.exploration_rate:
                # Exploration slot
                item = self.sample_explore_item(ranked_items)
            else:
                # Regular ranking slot
                item = self.select_diverse_item(
                    ranked_items,
                    final_list,
                    self.diversity_weight
                )

            final_list.append(item)

        return self.add_explanations(final_list)

    def select_diverse_item(self, candidates, selected, diversity_weight):
        """
        MMR-style selection for diversity
        """
        best_score = -inf
        best_item = None

        for item in candidates:
            if item in selected:
                continue

            relevance = item.score
            diversity = self.compute_diversity(item, selected)

            combined_score = (
                (1 - diversity_weight) * relevance +
                diversity_weight * diversity
            )

            if combined_score > best_score:
                best_score = combined_score
                best_item = item

        return best_item

5. Feature Store

Цель: Быстрый доступ к features для inference

class FeatureStore:
    """
    Unified feature access layer
    """

    def __init__(self):
        self.online_store = RedisClient()      # < 5ms latency
        self.offline_store = BigQueryClient()  # Batch features
        self.feature_registry = FeatureRegistry()

    def get_user_features(self, user_id: str) -> UserFeatures:
        """
        Get user features from online store
        """
        cache_key = f"user:{user_id}"

        # Try online store first
        features = self.online_store.get(cache_key)

        if not features:
            # Fallback to default features (cold start)
            features = self.get_default_user_features()

        return self.feature_registry.parse_user_features(features)

    def get_item_features_batch(self, item_ids: List[str]) -> List[ItemFeatures]:
        """
        Batch get item features (for ranking)
        """
        cache_keys = [f"item:{item_id}" for item_id in item_ids]
        features = self.online_store.mget(cache_keys)

        return [
            self.feature_registry.parse_item_features(f)
            for f in features
        ]

6. Model Serving

Цель: Эффективный inference моделей

class ModelServer:
    """
    Model serving with batching and caching
    """

    def __init__(self):
        self.model = self.load_model()
        self.batch_size = 100
        self.cache = LRUCache(maxsize=10000)

    def predict_batch(
        self,
        features: List[FeatureVector]
    ) -> List[float]:
        """
        Batched prediction for efficiency
        """
        # Check cache
        cached_results = {}
        uncached_features = []

        for i, f in enumerate(features):
            cache_key = self.compute_cache_key(f)
            if cache_key in self.cache:
                cached_results[i] = self.cache[cache_key]
            else:
                uncached_features.append((i, f, cache_key))

        # Batch predict uncached
        if uncached_features:
            indices, feats, keys = zip(*uncached_features)
            predictions = self.model.predict(feats)

            for idx, pred, key in zip(indices, predictions, keys):
                cached_results[idx] = pred
                self.cache[key] = pred

        # Reconstruct results
        return [cached_results[i] for i in range(len(features))]

Infrastructure Components

Service Mesh

# Kubernetes service configuration
apiVersion: v1
kind: Service
metadata:
  name: recommendation-service
spec:
  selector:
    app: recommendation
  ports:
    - port: 8080
      targetPort: 8080
  type: ClusterIP
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: recommendation-deployment
spec:
  replicas: 10
  selector:
    matchLabels:
      app: recommendation
  template:
    spec:
      containers:
        - name: recommendation
          image: recommendation-service:v1.2.3
          resources:
            requests:
              memory: "4Gi"
              cpu: "2"
            limits:
              memory: "8Gi"
              cpu: "4"
          env:
            - name: FEATURE_STORE_URL
              value: "redis://feature-store:6379"
            - name: MODEL_PATH
              value: "s3://models/ranking/v1.2.3"

Monitoring Stack

# Prometheus metrics
from prometheus_client import Counter, Histogram

REQUEST_COUNT = Counter(
    'recommendation_requests_total',
    'Total recommendation requests',
    ['endpoint', 'status']
)

REQUEST_LATENCY = Histogram(
    'recommendation_latency_seconds',
    'Request latency',
    ['stage'],  # candidate_gen, ranking, filtering, blending
    buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0]
)

MODEL_PREDICTION_LATENCY = Histogram(
    'model_prediction_latency_seconds',
    'Model inference latency',
    ['model_name']
)

Заблуждение: Candidate Generation можно пропустить и сразу ранжировать

При 10M items deep ranking model тратит ~1ms на один item. Ранжирование всего каталога = 10 000 секунд. Даже batch inference на GPU обработает максимум 10K items за 100ms. Candidate Generation -- обязательный этап, сокращающий множество с 10M до 1000 за 5-10ms через ANN (FAISS/ScaNN). Без него пайплайн нереализуем.

Заблуждение: Feature Store -- это просто Redis

Feature Store -- это инфраструктурный компонент с двумя слоями: online (Redis/DynamoDB, < 5ms) для serving и offline (S3/BigQuery) для training. Ключевая сложность -- обеспечить consistency между training и serving features (training-serving skew). Feast/Tecton решают эту проблему, а голый Redis -- нет. Training-serving skew -- одна из топ-3 причин деградации ML-моделей в production.

Заблуждение: Diversity -- это просто убрать дубликаты

Diversity -- это алгоритмическая задача (MMR, DPP). Без diversity control система создаёт filter bubbles: пользователь кликнул 2 раза на кроссовки -- и весь фид заполнен кроссовками. Spotify обнаружил, что добавление 30% diversity weight к relevance score увеличило долгосрочный retention на 5%, хотя CTR краткосрочно упал на 2%.

Собеседование

Как устроен Candidate Generation?

❌ "Берём все items и фильтруем по категории пользователя."

✅ "Candidate Generation -- это параллельный retrieval из нескольких источников: Two-Tower ANN (user/item embeddings в FAISS, 5ms, высокий recall), Collaborative Filtering (item-item similarity matrix, precomputed), Content-based (по атрибутам items), Popularity (trending за 24h для cold start). Результаты мержатся, дедуплицируются и подаются на Ranking. Каждый источник покрывает разные сценарии: ANN -- персонализация, CF -- discovery, Popularity -- cold start."

Зачем нужен двухэтапный Ranking?

❌ "Одна модель ранжирует всех кандидатов."

✅ "Coarse Ranking (XGBoost, 100 features) быстро отсекает 80% слабых кандидатов: 1000 -> 200 за 5ms. Fine Ranking (Deep Model, 500+ features, cross-features) точно скорит 200 кандидатов за 30ms. Это trade-off между latency и quality. Одна тяжёлая модель на 1000 candidates потребует 150ms -- больше всего latency budget. Двухэтапная система укладывается в 35ms."

Какую роль играет Blending Layer?

❌ "Просто берём top-N из Ranking."

✅ "Blending Layer -- финальная сборка списка: slot-based rules (позиция 3 = sponsored, позиция 7 = new arrival), exploration slots (10% для Thompson Sampling), MMR reranking для diversity (lambda=0.3). Без Blending система скатывается в exploitation -- показывает одно и то же. Blending обеспечивает баланс revenue (sponsored), discovery (exploration) и user satisfaction (diversity)."