Ранжирование поиска: компоненты системы¶
~3 минуты чтения
Предварительно: Определение задачи
Поисковый pipeline состоит из 5 компонентов, каждый из которых сужает пространство кандидатов на порядок: от 10M документов в индексе до ~100 финальных результатов за <200ms. Ключевой архитектурный принцип -- разделение recall (retrieval: быстро, много) и precision (ranking: точно, мало). На интервью оценивается понимание trade-off между latency и качеством на каждом этапе.
High-Level Architecture¶
graph TD
A["Search Request<br/>GET /search?q=красные+кроссовки+nike"] --> B["Search Service"]
subgraph Pipeline ["Search Pipeline"]
direction LR
C["Query Processing"] --> D["Retrieval<br/>(Recall)"]
D --> E["Ranking<br/>(Precision)"]
E --> F["Re-ranking<br/>(Business)"]
end
B --> Pipeline
Pipeline --> G["Response"]
B -.-> H[("Search Index<br/>Elasticsearch")]
B -.-> I[("Feature Store<br/>Redis")]
B -.-> J[("Model Server<br/>TF Serving")]
style Pipeline fill:#e8eaf6,stroke:#3f51b5
style C fill:#fff3e0,stroke:#ff9800
style D fill:#fff3e0,stroke:#ff9800
style E fill:#fff3e0,stroke:#ff9800
style F fill:#fff3e0,stroke:#ff9800
Component Details¶
1. Query Processing¶
Цель: Нормализация и понимание запроса
class QueryProcessor:
"""
Process and understand user query
"""
def __init__(self):
self.tokenizer = Tokenizer("russian")
self.spell_checker = SpellChecker()
self.ner_model = NERModel()
self.intent_classifier = IntentClassifier()
def process(self, raw_query: str) -> ProcessedQuery:
"""
Full query processing pipeline
"""
# Step 1: Normalization
normalized = self.normalize(raw_query)
# "Красные Кроссовки NIKE" → "красные кроссовки nike"
# Step 2: Tokenization
tokens = self.tokenizer.tokenize(normalized)
# ["красные", "кроссовки", "nike"]
# Step 3: Spell correction
corrected = self.spell_checker.correct(tokens)
# ["красные", "кроссовки", "nike"] (no changes)
# Step 4: Entity extraction
entities = self.ner_model.extract(corrected)
# {"color": "красный", "product": "кроссовки", "brand": "nike"}
# Step 5: Intent classification
intent = self.intent_classifier.predict(normalized)
# "product_search" (vs "navigation", "informational")
# Step 6: Query expansion
expanded = self.expand_query(corrected, entities)
# ["красные", "кроссовки", "nike", "sneakers", "обувь"]
return ProcessedQuery(
original=raw_query,
normalized=normalized,
tokens=corrected,
entities=entities,
intent=intent,
expanded_tokens=expanded,
embedding=self.get_embedding(normalized),
)
def expand_query(self, tokens: List[str], entities: dict) -> List[str]:
"""
Add synonyms and related terms
"""
expanded = list(tokens)
# Add synonyms
for token in tokens:
synonyms = self.get_synonyms(token)
expanded.extend(synonyms)
# Add brand alternatives
if "brand" in entities:
alternatives = self.get_brand_alternatives(entities["brand"])
expanded.extend(alternatives)
return expanded
class SpellChecker:
"""
Correct spelling errors in queries
"""
def __init__(self):
self.vocab = self.load_vocabulary()
self.query_log_vocab = self.load_query_log_vocab()
def correct(self, tokens: List[str]) -> List[str]:
"""
Correct each token if needed
"""
corrected = []
for token in tokens:
if token in self.vocab:
corrected.append(token)
else:
# Find closest match
suggestion = self.find_closest(token)
corrected.append(suggestion if suggestion else token)
return corrected
def find_closest(self, token: str) -> Optional[str]:
"""
Find closest word using edit distance
"""
candidates = []
# Generate edit distance 1 candidates
for word in self.vocab:
if edit_distance(token, word) <= 2:
# Weight by query log frequency
score = self.query_log_vocab.get(word, 0)
candidates.append((word, score))
if candidates:
return max(candidates, key=lambda x: x[1])[0]
return None
2. Retrieval (Recall)¶
Цель: Из миллионов документов выбрать тысячи кандидатов
class Retriever:
"""
Multi-stage retrieval for candidate generation
"""
def __init__(self):
self.lexical_retriever = BM25Retriever()
self.semantic_retriever = SemanticRetriever()
self.filter_engine = FilterEngine()
def retrieve(
self,
query: ProcessedQuery,
filters: dict,
limit: int = 1000
) -> List[Candidate]:
"""
Hybrid retrieval combining lexical and semantic
"""
# Apply hard filters first
filter_query = self.filter_engine.build_filter(filters)
# Parallel retrieval
lexical_results = self.lexical_retriever.search(
query.expanded_tokens,
filter_query,
limit=limit
)
semantic_results = self.semantic_retriever.search(
query.embedding,
filter_query,
limit=limit
)
# Merge and deduplicate
merged = self.merge_results(lexical_results, semantic_results)
return merged[:limit]
def merge_results(self, lexical, semantic, alpha=0.7):
"""
Combine lexical and semantic scores
"""
all_docs = set(d.id for d in lexical + semantic)
merged = []
for doc_id in all_docs:
lex_score = next((d.score for d in lexical if d.id == doc_id), 0)
sem_score = next((d.score for d in semantic if d.id == doc_id), 0)
# Normalize scores
lex_score = lex_score / max(d.score for d in lexical) if lexical else 0
sem_score = sem_score / max(d.score for d in semantic) if semantic else 0
combined = alpha * lex_score + (1 - alpha) * sem_score
merged.append(Candidate(id=doc_id, score=combined))
return sorted(merged, key=lambda x: x.score, reverse=True)
class BM25Retriever:
"""
Lexical search using BM25
"""
def __init__(self):
self.es_client = Elasticsearch()
def search(self, tokens: List[str], filters: dict, limit: int) -> List[Doc]:
query = {
"bool": {
"must": [
{
"multi_match": {
"query": " ".join(tokens),
"fields": ["title^3", "description", "brand^2"],
"type": "best_fields"
}
}
],
"filter": filters
}
}
results = self.es_client.search(
index="products",
query=query,
size=limit
)
return [Doc(id=hit["_id"], score=hit["_score"]) for hit in results["hits"]["hits"]]
class SemanticRetriever:
"""
Semantic search using embeddings
"""
def __init__(self):
self.es_client = Elasticsearch()
def search(self, query_embedding: List[float], filters: dict, limit: int) -> List[Doc]:
query = {
"bool": {
"must": [
{
"script_score": {
"query": {"match_all": {}},
"script": {
"source": "cosineSimilarity(params.query_vector, 'title_embedding') + 1.0",
"params": {"query_vector": query_embedding}
}
}
}
],
"filter": filters
}
}
results = self.es_client.search(
index="products",
query=query,
size=limit
)
return [Doc(id=hit["_id"], score=hit["_score"]) for hit in results["hits"]["hits"]]
3. Ranking Model¶
Цель: Отсортировать кандидатов по релевантности
class RankingModel:
"""
Learning-to-rank model for precise ranking
"""
def __init__(self):
self.feature_extractor = FeatureExtractor()
self.model = self.load_model()
def rank(
self,
query: ProcessedQuery,
candidates: List[Candidate],
user: User,
context: Context
) -> List[RankedResult]:
"""
Score and rank candidates
"""
# Extract features for all candidates
features = self.feature_extractor.extract_batch(
query, candidates, user, context
)
# Score with model
scores = self.model.predict(features)
# Create ranked results
results = [
RankedResult(
doc_id=cand.id,
score=score,
features=feat
)
for cand, score, feat in zip(candidates, scores, features)
]
# Sort by score
return sorted(results, key=lambda x: x.score, reverse=True)
class LambdaMARTRanker:
"""
Gradient boosted ranking model
"""
def __init__(self):
self.model = lightgbm.LGBMRanker(
objective="lambdarank",
metric="ndcg",
n_estimators=500,
num_leaves=31,
learning_rate=0.05,
min_child_samples=20,
)
def train(self, X, y, groups):
"""
Train on listwise data
"""
self.model.fit(
X, y,
group=groups,
eval_set=[(X_val, y_val)],
eval_group=[groups_val],
early_stopping_rounds=50,
callbacks=[lgb.log_evaluation(10)]
)
def predict(self, X):
return self.model.predict(X)
class CrossEncoderRanker:
"""
BERT-based cross-encoder for precise ranking
"""
def __init__(self):
self.model = AutoModelForSequenceClassification.from_pretrained(
"cross-encoder/ms-marco-MiniLM-L-12-v2"
)
self.tokenizer = AutoTokenizer.from_pretrained(
"cross-encoder/ms-marco-MiniLM-L-12-v2"
)
def score(self, query: str, documents: List[str]) -> List[float]:
"""
Score query-document pairs
"""
pairs = [[query, doc] for doc in documents]
inputs = self.tokenizer(
pairs,
padding=True,
truncation=True,
return_tensors="pt",
max_length=512
)
with torch.no_grad():
scores = self.model(**inputs).logits.squeeze(-1)
return scores.tolist()
4. Re-ranking Layer¶
Цель: Бизнес-логика и diversity
class ReRanker:
"""
Apply business rules and diversity
"""
def __init__(self):
self.diversity_weight = 0.2
self.freshness_boost = 1.1
self.promotion_slots = [0, 4, 9] # positions for promoted items
def rerank(
self,
ranked_results: List[RankedResult],
user: User,
context: Context
) -> List[FinalResult]:
"""
Final reranking with business logic
"""
# Apply diversity (MMR)
diverse_results = self.apply_diversity(ranked_results)
# Apply freshness boost
boosted = self.apply_freshness_boost(diverse_results)
# Insert promoted items
final = self.insert_promotions(boosted, user)
return final
def apply_diversity(self, results: List[RankedResult]) -> List[RankedResult]:
"""
Maximal Marginal Relevance for diversity
"""
selected = []
remaining = list(results)
while remaining and len(selected) < len(results):
best_score = -float('inf')
best_item = None
for item in remaining:
relevance = item.score
# Compute similarity to already selected
if selected:
max_sim = max(
self.similarity(item, sel)
for sel in selected
)
else:
max_sim = 0
# MMR score
mmr_score = (
(1 - self.diversity_weight) * relevance -
self.diversity_weight * max_sim
)
if mmr_score > best_score:
best_score = mmr_score
best_item = item
if best_item:
selected.append(best_item)
remaining.remove(best_item)
return selected
def similarity(self, item1, item2):
"""
Category-based similarity
"""
if item1.category == item2.category:
return 0.8
if item1.brand == item2.brand:
return 0.5
return 0.0
5. Autocomplete Service¶
class AutocompleteService:
"""
Query suggestions as user types
"""
def __init__(self):
self.prefix_index = PrefixTree()
self.query_popularity = QueryPopularity()
self.personalization = PersonalizationEngine()
def suggest(
self,
prefix: str,
user_id: Optional[str] = None,
limit: int = 10
) -> List[Suggestion]:
"""
Get autocomplete suggestions
"""
# Get prefix matches
candidates = self.prefix_index.search(prefix, limit=100)
# Score candidates
scored = []
for query in candidates:
score = self.query_popularity.get_score(query)
# Personalization boost
if user_id:
personal_boost = self.personalization.get_boost(user_id, query)
score *= personal_boost
scored.append(Suggestion(query=query, score=score))
# Sort and return top-N
scored.sort(key=lambda x: x.score, reverse=True)
return scored[:limit]
class PrefixTree:
"""
Trie-based prefix matching
"""
def __init__(self):
self.root = TrieNode()
def insert(self, query: str, count: int):
node = self.root
for char in query.lower():
if char not in node.children:
node.children[char] = TrieNode()
node = node.children[char]
node.is_end = True
node.count = count
def search(self, prefix: str, limit: int) -> List[str]:
node = self.root
for char in prefix.lower():
if char not in node.children:
return []
node = node.children[char]
# DFS to find all completions
results = []
self._dfs(node, prefix, results, limit)
return results
Infrastructure¶
Search Index Cluster¶
# Elasticsearch cluster configuration
apiVersion: elasticsearch.k8s.elastic.co/v1
kind: Elasticsearch
metadata:
name: search-cluster
spec:
version: 8.10.0
nodeSets:
- name: master
count: 3
config:
node.roles: ["master"]
podTemplate:
spec:
containers:
- name: elasticsearch
resources:
limits:
memory: 4Gi
cpu: 2
- name: data
count: 10
config:
node.roles: ["data", "ingest"]
volumeClaimTemplates:
- metadata:
name: elasticsearch-data
spec:
accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: 500Gi
podTemplate:
spec:
containers:
- name: elasticsearch
resources:
limits:
memory: 32Gi
cpu: 8
Caching Layer¶
class SearchCache:
"""
Multi-layer caching for search results
"""
def __init__(self):
self.l1_cache = LRUCache(maxsize=10000) # Local
self.l2_cache = RedisCache() # Distributed
def get_or_compute(self, cache_key: str, compute_fn) -> SearchResults:
# L1: Local cache
if cache_key in self.l1_cache:
return self.l1_cache[cache_key]
# L2: Redis
cached = self.l2_cache.get(cache_key)
if cached:
self.l1_cache[cache_key] = cached
return cached
# Compute
result = compute_fn()
# Cache
self.l1_cache[cache_key] = result
self.l2_cache.set(cache_key, result, ttl=300) # 5 min
return result
def build_cache_key(self, query: ProcessedQuery, filters: dict, user_id: str) -> str:
"""
Build cache key from query parameters
"""
key_parts = [
query.normalized,
json.dumps(filters, sort_keys=True),
user_id if user_id else "anonymous"
]
return hashlib.md5("|".join(key_parts).encode()).hexdigest()
Заблуждение: можно ранжировать все 10M документов одной моделью
Cross-encoder (BERT) дает лучшее качество, но обрабатывает ~100 пар query-document в секунду. Для 10M документов это заняло бы ~28 часов. Поэтому обязательна multi-stage архитектура: BM25/ANN retrieval (1ms на doc) -> lightweight ranker -> cross-encoder на top-200.
Заблуждение: кэширование поиска тривиально
Персонализированный поиск имеет cache hit rate всего 5-15% (vs 30-40% для неперсонализированного). Если кэшировать по user_id + query, кэш раздувается и становится бесполезным. Решение: кэшировать retrieval stage (до персонализации), а ranking stage выполнять всегда.
Заблуждение: alpha в hybrid retrieval можно подобрать один раз
Оптимальный вес BM25 vs semantic search зависит от типа запроса: для navigational queries (brand names) BM25 доминирует (alpha=0.9), для exploratory ("подарок маме") -- semantic (alpha=0.4). Продвинутые системы обучают query-dependent alpha.
На интервью¶
Типичные ошибки:
"Кэшируем все результаты поиска" -- с персонализацией уникальных запросов слишком много, cache hit rate <5%
"Используем только BM25 для retrieval" -- пропускает семантические матчи; "sneakers" не найдет "кроссовки"
"Re-ranking не нужен, модель ранжирования достаточно" -- без diversity re-ranking (MMR) первые 10 результатов могут быть от одного бренда
Сильные ответы:
"Retrieval: hybrid BM25 + ANN с Reciprocal Rank Fusion. Кэширую только retrieval stage (cache key = normalized query + filters), а ranking вычисляю с user features каждый раз."
"Для query processing использую pipeline: normalization -> tokenization -> spell correction (edit distance + query log frequency) -> NER (brand, size, color) -> query expansion (synonyms). Латентность ~10ms."
"LambdaMART для ranking, потому что оптимизирует NDCG напрямую, быстрый inference на CPU, и дает interpretable feature importance. Cross-encoder BERT только для re-ranking top-200."