Перейти к содержанию

Ранжирование поиска: компоненты системы

~3 минуты чтения

Предварительно: Определение задачи

Поисковый pipeline состоит из 5 компонентов, каждый из которых сужает пространство кандидатов на порядок: от 10M документов в индексе до ~100 финальных результатов за <200ms. Ключевой архитектурный принцип -- разделение recall (retrieval: быстро, много) и precision (ranking: точно, мало). На интервью оценивается понимание trade-off между latency и качеством на каждом этапе.

High-Level Architecture

graph TD
    A["Search Request<br/>GET /search?q=красные+кроссовки+nike"] --> B["Search Service"]

    subgraph Pipeline ["Search Pipeline"]
        direction LR
        C["Query Processing"] --> D["Retrieval<br/>(Recall)"]
        D --> E["Ranking<br/>(Precision)"]
        E --> F["Re-ranking<br/>(Business)"]
    end

    B --> Pipeline
    Pipeline --> G["Response"]

    B -.-> H[("Search Index<br/>Elasticsearch")]
    B -.-> I[("Feature Store<br/>Redis")]
    B -.-> J[("Model Server<br/>TF Serving")]

    style Pipeline fill:#e8eaf6,stroke:#3f51b5
    style C fill:#fff3e0,stroke:#ff9800
    style D fill:#fff3e0,stroke:#ff9800
    style E fill:#fff3e0,stroke:#ff9800
    style F fill:#fff3e0,stroke:#ff9800

Component Details

1. Query Processing

Цель: Нормализация и понимание запроса

class QueryProcessor:
    """
    Process and understand user query
    """

    def __init__(self):
        self.tokenizer = Tokenizer("russian")
        self.spell_checker = SpellChecker()
        self.ner_model = NERModel()
        self.intent_classifier = IntentClassifier()

    def process(self, raw_query: str) -> ProcessedQuery:
        """
        Full query processing pipeline
        """
        # Step 1: Normalization
        normalized = self.normalize(raw_query)
        # "Красные Кроссовки NIKE" → "красные кроссовки nike"

        # Step 2: Tokenization
        tokens = self.tokenizer.tokenize(normalized)
        # ["красные", "кроссовки", "nike"]

        # Step 3: Spell correction
        corrected = self.spell_checker.correct(tokens)
        # ["красные", "кроссовки", "nike"] (no changes)

        # Step 4: Entity extraction
        entities = self.ner_model.extract(corrected)
        # {"color": "красный", "product": "кроссовки", "brand": "nike"}

        # Step 5: Intent classification
        intent = self.intent_classifier.predict(normalized)
        # "product_search" (vs "navigation", "informational")

        # Step 6: Query expansion
        expanded = self.expand_query(corrected, entities)
        # ["красные", "кроссовки", "nike", "sneakers", "обувь"]

        return ProcessedQuery(
            original=raw_query,
            normalized=normalized,
            tokens=corrected,
            entities=entities,
            intent=intent,
            expanded_tokens=expanded,
            embedding=self.get_embedding(normalized),
        )

    def expand_query(self, tokens: List[str], entities: dict) -> List[str]:
        """
        Add synonyms and related terms
        """
        expanded = list(tokens)

        # Add synonyms
        for token in tokens:
            synonyms = self.get_synonyms(token)
            expanded.extend(synonyms)

        # Add brand alternatives
        if "brand" in entities:
            alternatives = self.get_brand_alternatives(entities["brand"])
            expanded.extend(alternatives)

        return expanded

class SpellChecker:
    """
    Correct spelling errors in queries
    """

    def __init__(self):
        self.vocab = self.load_vocabulary()
        self.query_log_vocab = self.load_query_log_vocab()

    def correct(self, tokens: List[str]) -> List[str]:
        """
        Correct each token if needed
        """
        corrected = []
        for token in tokens:
            if token in self.vocab:
                corrected.append(token)
            else:
                # Find closest match
                suggestion = self.find_closest(token)
                corrected.append(suggestion if suggestion else token)
        return corrected

    def find_closest(self, token: str) -> Optional[str]:
        """
        Find closest word using edit distance
        """
        candidates = []

        # Generate edit distance 1 candidates
        for word in self.vocab:
            if edit_distance(token, word) <= 2:
                # Weight by query log frequency
                score = self.query_log_vocab.get(word, 0)
                candidates.append((word, score))

        if candidates:
            return max(candidates, key=lambda x: x[1])[0]
        return None

2. Retrieval (Recall)

Цель: Из миллионов документов выбрать тысячи кандидатов

class Retriever:
    """
    Multi-stage retrieval for candidate generation
    """

    def __init__(self):
        self.lexical_retriever = BM25Retriever()
        self.semantic_retriever = SemanticRetriever()
        self.filter_engine = FilterEngine()

    def retrieve(
        self,
        query: ProcessedQuery,
        filters: dict,
        limit: int = 1000
    ) -> List[Candidate]:
        """
        Hybrid retrieval combining lexical and semantic
        """
        # Apply hard filters first
        filter_query = self.filter_engine.build_filter(filters)

        # Parallel retrieval
        lexical_results = self.lexical_retriever.search(
            query.expanded_tokens,
            filter_query,
            limit=limit
        )

        semantic_results = self.semantic_retriever.search(
            query.embedding,
            filter_query,
            limit=limit
        )

        # Merge and deduplicate
        merged = self.merge_results(lexical_results, semantic_results)

        return merged[:limit]

    def merge_results(self, lexical, semantic, alpha=0.7):
        """
        Combine lexical and semantic scores
        """
        all_docs = set(d.id for d in lexical + semantic)
        merged = []

        for doc_id in all_docs:
            lex_score = next((d.score for d in lexical if d.id == doc_id), 0)
            sem_score = next((d.score for d in semantic if d.id == doc_id), 0)

            # Normalize scores
            lex_score = lex_score / max(d.score for d in lexical) if lexical else 0
            sem_score = sem_score / max(d.score for d in semantic) if semantic else 0

            combined = alpha * lex_score + (1 - alpha) * sem_score
            merged.append(Candidate(id=doc_id, score=combined))

        return sorted(merged, key=lambda x: x.score, reverse=True)

class BM25Retriever:
    """
    Lexical search using BM25
    """

    def __init__(self):
        self.es_client = Elasticsearch()

    def search(self, tokens: List[str], filters: dict, limit: int) -> List[Doc]:
        query = {
            "bool": {
                "must": [
                    {
                        "multi_match": {
                            "query": " ".join(tokens),
                            "fields": ["title^3", "description", "brand^2"],
                            "type": "best_fields"
                        }
                    }
                ],
                "filter": filters
            }
        }

        results = self.es_client.search(
            index="products",
            query=query,
            size=limit
        )

        return [Doc(id=hit["_id"], score=hit["_score"]) for hit in results["hits"]["hits"]]

class SemanticRetriever:
    """
    Semantic search using embeddings
    """

    def __init__(self):
        self.es_client = Elasticsearch()

    def search(self, query_embedding: List[float], filters: dict, limit: int) -> List[Doc]:
        query = {
            "bool": {
                "must": [
                    {
                        "script_score": {
                            "query": {"match_all": {}},
                            "script": {
                                "source": "cosineSimilarity(params.query_vector, 'title_embedding') + 1.0",
                                "params": {"query_vector": query_embedding}
                            }
                        }
                    }
                ],
                "filter": filters
            }
        }

        results = self.es_client.search(
            index="products",
            query=query,
            size=limit
        )

        return [Doc(id=hit["_id"], score=hit["_score"]) for hit in results["hits"]["hits"]]

3. Ranking Model

Цель: Отсортировать кандидатов по релевантности

class RankingModel:
    """
    Learning-to-rank model for precise ranking
    """

    def __init__(self):
        self.feature_extractor = FeatureExtractor()
        self.model = self.load_model()

    def rank(
        self,
        query: ProcessedQuery,
        candidates: List[Candidate],
        user: User,
        context: Context
    ) -> List[RankedResult]:
        """
        Score and rank candidates
        """
        # Extract features for all candidates
        features = self.feature_extractor.extract_batch(
            query, candidates, user, context
        )

        # Score with model
        scores = self.model.predict(features)

        # Create ranked results
        results = [
            RankedResult(
                doc_id=cand.id,
                score=score,
                features=feat
            )
            for cand, score, feat in zip(candidates, scores, features)
        ]

        # Sort by score
        return sorted(results, key=lambda x: x.score, reverse=True)

class LambdaMARTRanker:
    """
    Gradient boosted ranking model
    """

    def __init__(self):
        self.model = lightgbm.LGBMRanker(
            objective="lambdarank",
            metric="ndcg",
            n_estimators=500,
            num_leaves=31,
            learning_rate=0.05,
            min_child_samples=20,
        )

    def train(self, X, y, groups):
        """
        Train on listwise data
        """
        self.model.fit(
            X, y,
            group=groups,
            eval_set=[(X_val, y_val)],
            eval_group=[groups_val],
            early_stopping_rounds=50,
            callbacks=[lgb.log_evaluation(10)]
        )

    def predict(self, X):
        return self.model.predict(X)

class CrossEncoderRanker:
    """
    BERT-based cross-encoder for precise ranking
    """

    def __init__(self):
        self.model = AutoModelForSequenceClassification.from_pretrained(
            "cross-encoder/ms-marco-MiniLM-L-12-v2"
        )
        self.tokenizer = AutoTokenizer.from_pretrained(
            "cross-encoder/ms-marco-MiniLM-L-12-v2"
        )

    def score(self, query: str, documents: List[str]) -> List[float]:
        """
        Score query-document pairs
        """
        pairs = [[query, doc] for doc in documents]

        inputs = self.tokenizer(
            pairs,
            padding=True,
            truncation=True,
            return_tensors="pt",
            max_length=512
        )

        with torch.no_grad():
            scores = self.model(**inputs).logits.squeeze(-1)

        return scores.tolist()

4. Re-ranking Layer

Цель: Бизнес-логика и diversity

class ReRanker:
    """
    Apply business rules and diversity
    """

    def __init__(self):
        self.diversity_weight = 0.2
        self.freshness_boost = 1.1
        self.promotion_slots = [0, 4, 9]  # positions for promoted items

    def rerank(
        self,
        ranked_results: List[RankedResult],
        user: User,
        context: Context
    ) -> List[FinalResult]:
        """
        Final reranking with business logic
        """
        # Apply diversity (MMR)
        diverse_results = self.apply_diversity(ranked_results)

        # Apply freshness boost
        boosted = self.apply_freshness_boost(diverse_results)

        # Insert promoted items
        final = self.insert_promotions(boosted, user)

        return final

    def apply_diversity(self, results: List[RankedResult]) -> List[RankedResult]:
        """
        Maximal Marginal Relevance for diversity
        """
        selected = []
        remaining = list(results)

        while remaining and len(selected) < len(results):
            best_score = -float('inf')
            best_item = None

            for item in remaining:
                relevance = item.score

                # Compute similarity to already selected
                if selected:
                    max_sim = max(
                        self.similarity(item, sel)
                        for sel in selected
                    )
                else:
                    max_sim = 0

                # MMR score
                mmr_score = (
                    (1 - self.diversity_weight) * relevance -
                    self.diversity_weight * max_sim
                )

                if mmr_score > best_score:
                    best_score = mmr_score
                    best_item = item

            if best_item:
                selected.append(best_item)
                remaining.remove(best_item)

        return selected

    def similarity(self, item1, item2):
        """
        Category-based similarity
        """
        if item1.category == item2.category:
            return 0.8
        if item1.brand == item2.brand:
            return 0.5
        return 0.0

5. Autocomplete Service

class AutocompleteService:
    """
    Query suggestions as user types
    """

    def __init__(self):
        self.prefix_index = PrefixTree()
        self.query_popularity = QueryPopularity()
        self.personalization = PersonalizationEngine()

    def suggest(
        self,
        prefix: str,
        user_id: Optional[str] = None,
        limit: int = 10
    ) -> List[Suggestion]:
        """
        Get autocomplete suggestions
        """
        # Get prefix matches
        candidates = self.prefix_index.search(prefix, limit=100)

        # Score candidates
        scored = []
        for query in candidates:
            score = self.query_popularity.get_score(query)

            # Personalization boost
            if user_id:
                personal_boost = self.personalization.get_boost(user_id, query)
                score *= personal_boost

            scored.append(Suggestion(query=query, score=score))

        # Sort and return top-N
        scored.sort(key=lambda x: x.score, reverse=True)
        return scored[:limit]

class PrefixTree:
    """
    Trie-based prefix matching
    """

    def __init__(self):
        self.root = TrieNode()

    def insert(self, query: str, count: int):
        node = self.root
        for char in query.lower():
            if char not in node.children:
                node.children[char] = TrieNode()
            node = node.children[char]
        node.is_end = True
        node.count = count

    def search(self, prefix: str, limit: int) -> List[str]:
        node = self.root
        for char in prefix.lower():
            if char not in node.children:
                return []
            node = node.children[char]

        # DFS to find all completions
        results = []
        self._dfs(node, prefix, results, limit)
        return results

Infrastructure

Search Index Cluster

# Elasticsearch cluster configuration
apiVersion: elasticsearch.k8s.elastic.co/v1
kind: Elasticsearch
metadata:
  name: search-cluster
spec:
  version: 8.10.0
  nodeSets:
    - name: master
      count: 3
      config:
        node.roles: ["master"]
      podTemplate:
        spec:
          containers:
            - name: elasticsearch
              resources:
                limits:
                  memory: 4Gi
                  cpu: 2

    - name: data
      count: 10
      config:
        node.roles: ["data", "ingest"]
      volumeClaimTemplates:
        - metadata:
            name: elasticsearch-data
          spec:
            accessModes: ["ReadWriteOnce"]
            resources:
              requests:
                storage: 500Gi
      podTemplate:
        spec:
          containers:
            - name: elasticsearch
              resources:
                limits:
                  memory: 32Gi
                  cpu: 8

Caching Layer

class SearchCache:
    """
    Multi-layer caching for search results
    """

    def __init__(self):
        self.l1_cache = LRUCache(maxsize=10000)  # Local
        self.l2_cache = RedisCache()              # Distributed

    def get_or_compute(self, cache_key: str, compute_fn) -> SearchResults:
        # L1: Local cache
        if cache_key in self.l1_cache:
            return self.l1_cache[cache_key]

        # L2: Redis
        cached = self.l2_cache.get(cache_key)
        if cached:
            self.l1_cache[cache_key] = cached
            return cached

        # Compute
        result = compute_fn()

        # Cache
        self.l1_cache[cache_key] = result
        self.l2_cache.set(cache_key, result, ttl=300)  # 5 min

        return result

    def build_cache_key(self, query: ProcessedQuery, filters: dict, user_id: str) -> str:
        """
        Build cache key from query parameters
        """
        key_parts = [
            query.normalized,
            json.dumps(filters, sort_keys=True),
            user_id if user_id else "anonymous"
        ]
        return hashlib.md5("|".join(key_parts).encode()).hexdigest()

Заблуждение: можно ранжировать все 10M документов одной моделью

Cross-encoder (BERT) дает лучшее качество, но обрабатывает ~100 пар query-document в секунду. Для 10M документов это заняло бы ~28 часов. Поэтому обязательна multi-stage архитектура: BM25/ANN retrieval (1ms на doc) -> lightweight ranker -> cross-encoder на top-200.

Заблуждение: кэширование поиска тривиально

Персонализированный поиск имеет cache hit rate всего 5-15% (vs 30-40% для неперсонализированного). Если кэшировать по user_id + query, кэш раздувается и становится бесполезным. Решение: кэшировать retrieval stage (до персонализации), а ranking stage выполнять всегда.

Заблуждение: alpha в hybrid retrieval можно подобрать один раз

Оптимальный вес BM25 vs semantic search зависит от типа запроса: для navigational queries (brand names) BM25 доминирует (alpha=0.9), для exploratory ("подарок маме") -- semantic (alpha=0.4). Продвинутые системы обучают query-dependent alpha.

На интервью

Типичные ошибки:

❌ "Кэшируем все результаты поиска" -- с персонализацией уникальных запросов слишком много, cache hit rate <5%

❌ "Используем только BM25 для retrieval" -- пропускает семантические матчи; "sneakers" не найдет "кроссовки"

❌ "Re-ranking не нужен, модель ранжирования достаточно" -- без diversity re-ranking (MMR) первые 10 результатов могут быть от одного бренда

Сильные ответы:

✅ "Retrieval: hybrid BM25 + ANN с Reciprocal Rank Fusion. Кэширую только retrieval stage (cache key = normalized query + filters), а ranking вычисляю с user features каждый раз."

✅ "Для query processing использую pipeline: normalization -> tokenization -> spell correction (edit distance + query log frequency) -> NER (brand, size, color) -> query expansion (synonyms). Латентность ~10ms."

✅ "LambdaMART для ranking, потому что оптимизирует NDCG напрямую, быстрый inference на CPU, и дает interpretable feature importance. Cross-encoder BERT только для re-ranking top-200."