ML System Design: Учебные материалы¶
~37 минут чтения
Предварительно: Подготовка к интервью MLSD | Deep Learning материалы
ML System Design охватывает 23 темы: от model serving и A/B testing до AI agents в production и ML security. Каждая тема содержит лучшие источники (книги, papers, блоги), ключевые формулы и концепции, gotchas, и interview-ready Q&A. По данным levels.fyi за 2025, MLSD-раунд присутствует в 85% интервью на позиции ML Engineer уровня Senior+.
Источники для 23 задач ML System Design Обновлено: 2026-02-11
Содержание¶
- Обзор задач
- 1. Model Serving & Latency Optimization
- 2. A/B Testing
- 3. Drift Detection
- 4. Model Calibration
- 5. Ranking Metrics Advanced
- 6. Recommendation Systems (RecSys)
- 7. ML Trade-offs Quiz
- 8. LLM Production
- 9. Feature Stores
- 10. ML Infrastructure (Model Registry & Experiment Tracking)
- 11. Multi-Armed Bandits (Exploration vs Exploitation)
- 12. Online Learning (Streaming ML)
- 13. Multi-Stage Recommender Systems
- 14. Causal Inference for ML
- 15. Vector Databases for ML
- 16. Cost Optimization for ML Inference
- 17. Multi-Model Serving and Model Routing
- 18. Data Quality for ML in Production
- 19. Foundation Models in Production
- 20. AI Agents in Production
- 21. Model Compression (Knowledge Distillation, Pruning, Edge Deployment)
- 22. Monitoring & Observability for ML
- 23. Security for ML (Adversarial, Model Extraction, Privacy)
- Видео-ресурсы
- Связанные темы
Обзор задач¶
| ID | Задача | Фокус |
|---|---|---|
| mlsd_005 | Model Serving / Latency Optimization | Inference, batching, quantization |
| mlsd_002 | A/B Testing | Statistical significance, sample size |
| mlsd_004 | Drift Detection | PSI, KS-test, monitoring |
| mlsd_008 | Model Calibration | Platt, Isotonic, Brier score |
| mlsd_001 | Ranking Metrics Advanced | NCE, CTR Lift, NDCG |
| mlsd_003 | Trade-offs Quiz | 15 production scenarios |
| mlsd_006 | RecSys | Two-Tower, Cold Start |
| mlsd_007 | LLM Production | Guardrails, Prompt Injection |
1. Model Serving & Latency Optimization¶
Книги¶
Designing Machine Learning Systems (Chip Huyen) - Chapter 7: Model Deployment - Chapter 8: Monitoring - URL: https://www.oreilly.com/library/view/designing-machine-learning/9781098107956/
Machine Learning Engineering (Andriy Burkov) - Chapter 5: Model Serving - URL: http://www.mlebook.com/
Статьи¶
ML Inference Latency Optimization - System Architecture Guide - URL: https://gist.github.com/epappas/bdbf6f4d0f333fdb1230e20af8d7540d - Охватывает: model optimization, serving infrastructure, hardware utilization
Model Serving: Architectures for High-Performance Inference - URL: https://www.whaleflux.com/blog/efficient-model-serving-architectures-for-high-performance-inference/ - Паттерны: dynamic batching, quantization, multi-model serving
MLOps: Model serving (Raghu Hemadri) - URL: https://raghuhemadri.github.io/blog/2025/mlops-model-serving/ - Баланс между speed, cost, reliability
Ключевые концепции¶
Latency Budget:
- P50 < 50ms (interactive)
- P99 < 200ms (acceptable)
- P99.9 < 500ms (edge cases)
Optimization Techniques:
1. Model-level: quantization, pruning, distillation
2. Serving-level: batching, caching, async
3. Hardware-level: GPU, TPU, inference chips
Код¶
# Dynamic Batching
class DynamicBatcher:
def __init__(self, model, max_batch_size=32, max_wait_ms=10):
self.model = model
self.max_batch_size = max_batch_size
self.max_wait_ms = max_wait_ms
self.queue = []
async def predict(self, input_data):
self.queue.append(input_data)
if len(self.queue) >= self.max_batch_size:
return await self._process_batch()
await asyncio.sleep(self.max_wait_ms / 1000)
return await self._process_batch()
async def _process_batch(self):
batch = self.queue[:self.max_batch_size]
self.queue = self.queue[self.max_batch_size:]
return self.model.predict_batch(batch)
2. A/B Testing¶
Книги¶
Trustworthy Online Controlled Experiments (Kohavi, Tang, Xu) - Главы: Sample Size, Statistical Significance, Common Pitfalls - URL: https://www.wiley.com/en-us/Trustworthy+Online+Controlled+Experiments-p-9781119726649
Статьи¶
How to Crack Machine Learning System-Design Interviews - URL: https://towardsdatascience.com/cracking-machine-learning-system-design-interviews/ - A/B testing framework
ML System Design Interview Guide (Exponent) - URL: https://www.tryexponent.com/blog/machine-learning-system-design-interview-guide - Structured approach
Ключевые формулы¶
Sample Size (Two-proportion z-test):
n = 16 * sigma^2 / delta^2
где:
- sigma^2 = p(1-p) для binary outcomes
- delta = minimum detectable effect
- 16 = (1.96 + 0.84)^2 для 95% confidence, 80% power
Statistical Significance:
z = (p_A - p_B) / sqrt(p_pool * (1-p_pool) * (1/n_A + 1/n_B))
где p_pool = (x_A + x_B) / (n_A + n_B)
Код¶
import numpy as np
from scipy import stats
def calculate_sample_size(p_baseline, mde, alpha=0.05, power=0.8):
"""Calculate required sample size per variant"""
z_alpha = stats.norm.ppf(1 - alpha/2) # 1.96
z_beta = stats.norm.ppf(power) # 0.84
p_pooled = p_baseline + mde/2
variance = 2 * p_pooled * (1 - p_pooled)
n = variance * (z_alpha + z_beta)**2 / mde**2
return int(np.ceil(n))
def ab_test_significance(control, treatment, alpha=0.05):
"""Run chi-square test for A/B test"""
contingency = np.array([[control['success'], control['total'] - control['success']],
[treatment['success'], treatment['total'] - treatment['success']]])
chi2, p_value, dof, expected = stats.chi2_contingency(contingency)
return {
'p_value': p_value,
'significant': p_value < alpha,
'lift': (treatment['success']/treatment['total'] - control['success']/control['total']) / (control['success']/control['total'])
}
3. Drift Detection¶
Статьи¶
Model Drift in Production (2026): Detection, Monitoring & Response - URL: https://alldaystech.com/guides/artificial-intelligence/model-drift-detection-monitoring-response - Типы drift: data, concept, label - Метрики: PSI, KS, Wasserstein, JS/KL
Monitoring Model Drift in Production: A Step-by-Step Playbook - URL: https://srikanthdevarajan.substack.com/p/monitoring-model-drift-in-production - KL divergence, PSI, KS-test explanations
Data Drift: Key Detection and Monitoring Techniques in 2026 - URL: https://labelyourdata.com/articles/machine-learning/data-drift
Инструменты¶
- Evidently AI - open-source drift detection
- NannyML - post-deployment monitoring
- WhyLabs - observability platform
- Fiddler AI - model performance monitoring
Ключевые метрики¶
Population Stability Index (PSI):
PSI = sum((actual_i - expected_i) * ln(actual_i / expected_i))
Interpretation:
- PSI < 0.1: No significant change
- 0.1 <= PSI < 0.25: Moderate change
- PSI >= 0.25: Significant change (ACTION REQUIRED)
Kolmogorov-Smirnov Test:
KS = max|F_baseline(x) - F_production(x)|
Wasserstein Distance (Earth Mover's):
W = integral |F_baseline(x) - F_production(x)| dx
Код¶
import numpy as np
from scipy import stats
def calculate_psi(expected, actual, buckets=10):
"""Calculate Population Stability Index"""
def scale_range(series, range_min, range_max):
return (series - series.min()) / (series.max() - series.min()) * (range_max - range_min) + range_min
breakpoints = np.arange(0, buckets + 1) / buckets * 100
if len(np.unique(expected)) < buckets:
breakpoints = np.unique(expected)
else:
breakpoints = np.percentile(expected, breakpoints)
expected_percents = np.histogram(expected, breakpoints)[0] / len(expected)
actual_percents = np.histogram(actual, breakpoints)[0] / len(actual)
# Handle zero divisions
expected_percents = np.where(expected_percents == 0, 0.0001, expected_percents)
actual_percents = np.where(actual_percents == 0, 0.0001, actual_percents)
psi_value = np.sum((actual_percents - expected_percents) * np.log(actual_percents / expected_percents))
return psi_value
def ks_test_drift(baseline, production):
"""Kolmogorov-Smirnov test for drift"""
statistic, p_value = stats.ks_2samp(baseline, production)
return {
'ks_statistic': statistic,
'p_value': p_value,
'drift_detected': p_value < 0.05
}
4. Model Calibration¶
Статьи¶
The Complete Guide to Platt Scaling - URL: https://www.blog.trainindata.com/complete-guide-to-platt-scaling/ - Platt scaling deep dive
Model Calibration and Isotonic Regression - URL: https://omnetaplconline.com/model-calibration-and-isotonic-regression-adjusting-confidence-scores-in-machine-learning/
Calibration Techniques for Machine Learning - URL: https://www.numberanalytics.com/blog/calibration-techniques-for-machine-learning-models
Papers¶
Probabilistic Outputs for Support Vector Machines (Platt, 1999) - Original Platt scaling paper (Advances in Large Margin Classifiers, MIT Press) - URL: https://home.cs.colorado.edu/~mozer/Teaching/syllabi/6622/papers/Platt1999.pdf
Ключевые концепции¶
Platt Scaling (Parametric):
- Fits logistic regression: P(y=1|f(x)) = 1 / (1 + exp(A*f(x) + B))
- Good for sigmoid-shaped calibration curves
- Fast, 2 parameters to learn
Isotonic Regression (Non-parametric):
- Fits piecewise constant function
- More flexible, can handle any shape
- Requires more data (risk of overfitting)
Brier Score (Evaluation):
BS = (1/N) * sum((p_i - y_i)^2)
Lower = better calibrated
Код¶
from sklearn.calibration import CalibratedClassifierCV, calibration_curve
from sklearn.isotonic import IsotonicRegression
import matplotlib.pyplot as plt
def calibrate_model(model, X_val, y_val, method='isotonic'):
"""Calibrate a trained model"""
calibrated = CalibratedClassifierCV(model, method=method, cv='prefit')
calibrated.fit(X_val, y_val)
return calibrated
def plot_calibration_curve(y_true, y_prob, n_bins=10):
"""Plot calibration curve (reliability diagram)"""
prob_true, prob_pred = calibration_curve(y_true, y_prob, n_bins=n_bins)
plt.figure(figsize=(8, 6))
plt.plot([0, 1], [0, 1], 'k--', label='Perfectly calibrated')
plt.plot(prob_pred, prob_true, 's-', label='Model')
plt.xlabel('Mean predicted probability')
plt.ylabel('Fraction of positives')
plt.title('Calibration Curve')
plt.legend()
return plt
def brier_score_loss(y_true, y_prob):
"""Calculate Brier score"""
return np.mean((y_prob - y_true) ** 2)
5. Ranking Metrics Advanced¶
Книги¶
Recommender Systems Handbook - Chapter on Evaluation Metrics - URL: https://www.springer.com/gp/book/9781489976355
Статьи¶
Learning to Rank - URL: https://en.wikipedia.org/wiki/Learning_to_rank - Pointwise, pairwise, listwise approaches
Ключевые метрики¶
Normalized Discounted Cumulative Gain (NDCG):
DCG@k = sum((2^rel_i - 1) / log2(i + 1)) for i in 1..k
NDCG@k = DCG@k / IDCG@k
Normalized Cross-Entropy (NCE):
NCE = -1/N * sum(y_i * log(p_i) + (1-y_i) * log(1-p_i)) / H(entropy_of_baseline)
CTR Lift:
Lift = (CTR_treatment - CTR_control) / CTR_control
Mean Reciprocal Rank (MRR):
MRR = 1/|Q| * sum(1/rank_i)
Expected Reciprocal Rank (ERR):
ERR = sum(1/r * prod(1 - r_j) * P(relevant|r))
Код¶
import numpy as np
def dcg_at_k(relevances, k):
"""Discounted Cumulative Gain at k"""
relevances = np.array(relevances)[:k]
if relevances.size == 0:
return 0.0
discounts = np.log2(np.arange(2, relevances.size + 2))
return np.sum(relevances / discounts)
def ndcg_at_k(relevances, k):
"""Normalized DCG at k"""
dcg = dcg_at_k(relevances, k)
ideal_relevances = sorted(relevances, reverse=True)
idcg = dcg_at_k(ideal_relevances, k)
if idcg == 0:
return 0.0
return dcg / idcg
def mean_reciprocal_rank(ranks):
"""Mean Reciprocal Rank"""
ranks = np.array(ranks)
return np.mean(1.0 / ranks[ranks > 0]) if np.any(ranks > 0) else 0.0
def ctr_lift(treatment_ctr, control_ctr):
"""Calculate CTR lift"""
return (treatment_ctr - control_ctr) / control_ctr
6. Recommendation Systems (RecSys)¶
Статьи¶
The Two-Tower Model for Recommendation Systems: A Deep Dive - URL: https://www.shaped.ai/blog/the-two-tower-model-for-recommendation-systems-a-deep-dive - Two-Tower architecture explained
Two Towers RecSys: Python Embedding Towers Serving 2025 - URL: https://www.johal.in/two-towers-recsys-python-embedding-towers-serving-2025/ - Production implementation
Two Tower Models in Industry - URL: https://machinelearningatscale.substack.com/p/recsys-part-2-two-tower-models-in - Industry examples
Papers¶
Suggest, Complement, Inspire: Story of Two-Tower Recommendations at Allegro (2025) - URL: https://arxiv.org/abs/2508.03702 - Production RecSys at Allegro.com
Архитектура Two-Tower¶
Query Tower Item Tower
| |
[User Features] [Item Features]
| |
[Dense Layers] [Dense Layers]
| |
[Embedding] [Embedding]
\ /
\ /
------ Dot Product ---
|
[Similarity Score]
|
[Top-K Items]
Cold Start Solutions¶
- Content-based: Use item features directly
- Popularity: Default to popular items
- Exploration: Bandit algorithms (epsilon-greedy, UCB)
- Cross-domain: Transfer from related domain
- Side information: Use metadata, context
Код¶
import torch
import torch.nn as nn
class L2Normalize(nn.Module):
"""L2 normalize along dim; nn.L2Normalize does NOT exist in PyTorch."""
def __init__(self, dim=1):
super().__init__()
self.dim = dim
def forward(self, x):
return nn.functional.normalize(x, p=2, dim=self.dim)
class TwoTowerModel(nn.Module):
def __init__(self, user_dim, item_dim, embedding_dim=64):
super().__init__()
self.user_tower = nn.Sequential(
nn.Linear(user_dim, 128),
nn.ReLU(),
nn.Linear(128, embedding_dim),
L2Normalize(dim=1)
)
self.item_tower = nn.Sequential(
nn.Linear(item_dim, 128),
nn.ReLU(),
nn.Linear(128, embedding_dim),
L2Normalize(dim=1)
)
def forward(self, user_features, item_features):
user_emb = self.user_tower(user_features)
item_emb = self.item_tower(item_features)
return torch.sum(user_emb * item_emb, dim=1, keepdim=True)
def get_user_embedding(self, user_features):
return self.user_tower(user_features)
def get_item_embedding(self, item_features):
return self.item_tower(item_features)
7. ML Trade-offs Quiz¶
Типичные trade-offs¶
| Scenario | Trade-off | Recommendation |
|---|---|---|
| Low latency vs High accuracy | Latency budget | Quantize model, reduce ensemble size |
| Interpretability vs Performance | Black-box models | Use SHAP/LIME for interpretability |
| Cold start vs Popularity bias | New users/items | Hybrid approach, exploration |
| Batch vs Real-time | Freshness vs Cost | Lambda architecture (batch + streaming) |
| Feature complexity vs Maintenance | Feature bloat | Regular feature pruning, feature stores |
| Model complexity vs Inference cost | Deep models | Distillation, pruning, efficient architectures |
| Data freshness vs Stability | Feature drift | Version features, gradual rollout |
| Personalization vs Privacy | User data | Federated learning, differential privacy |
| Exploration vs Exploitation | Bandits | epsilon-greedy, Thompson sampling |
| Precision vs Recall | Threshold | Business metric optimization |
| Online vs Offline evaluation | A/B test cost | Proxy metrics, gradual rollout |
| Model size vs Latency | Mobile deployment | Quantization, knowledge distillation |
| Training time vs Model quality | Resources | Early stopping, hyperparameter search |
| Feature engineering vs Deep learning | Data vs Compute | Start simple, add complexity |
| Monolith vs Microservices | Architecture | Domain-driven design, service boundaries |
8. LLM Production¶
Guardrails¶
OWASP Top 10 for LLM Applications (2025) - LLM01: Prompt Injection - LLM02: Insecure Output Handling - LLM03: Training Data Poisoning - LLM04: Model Denial of Service - LLM05: Supply Chain Vulnerabilities - LLM06: Sensitive Information Disclosure - LLM07: Insecure Plugin Design - LLM08: Excessive Agency - LLM09: Overreliance - LLM10: Model Theft
Инструменты¶
- NeMo Guardrails - NVIDIA
- Guardrails AI - Open source
- Lakera - Enterprise security
Prompt Injection Defense¶
def sanitize_input(user_input: str) -> str:
"""Basic input sanitization"""
# Remove potential injection patterns
patterns = [
r'ignore (all )?(previous|above) instructions',
r'disregard (all )?(previous|above)',
r'you are now',
r'system:',
r'<\|.*?\|>',
]
for pattern in patterns:
user_input = re.sub(pattern, '', user_input, flags=re.IGNORECASE)
return user_input.strip()
def structured_output_guard(output: str, schema: dict) -> bool:
"""Validate output against schema"""
try:
parsed = json.loads(output)
validate(parsed, schema)
return True
except (json.JSONDecodeError, ValidationError):
return False
9. Feature Stores¶
Источники: Aerospike Blog (July 2025), Reintech.io Feature Store Comparison (Jan 2026)
What is a Feature Store?¶
Feature Store — centralized repository для ML features, который обеспечивает: - Единый источник правды для features - Consistency между training и inference - Reusability features между командами - Low-latency serving для real-time predictions
Why Feature Stores Matter¶
Проблема без Feature Store: - Data scientists тратят ~80% времени на feature engineering - Дублирование transformation logic - Training-serving skew (features отличаются между train/inference) - Нет versioning'а features
Architecture Components¶
graph TD
RAW["Raw Data Sources"] --> ING["Ingestion Pipelines"]
ING --> REG["Feature Registry<br/>(Metadata)"]
REG --> OFF["Offline Store<br/>(Data Lake/Warehouse)<br/>For Training"]
REG --> ON["Online Store<br/>(Redis/DynamoDB)<br/>For Real-time"]
OFF --> TRAIN["Model Training<br/>(Batch)"]
ON --> INF["Real-time Inference"]
style RAW fill:#e8eaf6,stroke:#3f51b5
style ING fill:#fff3e0,stroke:#ef6c00
style REG fill:#f3e5f5,stroke:#9c27b0
style OFF fill:#e8eaf6,stroke:#3f51b5
style ON fill:#e8f5e9,stroke:#4caf50
style TRAIN fill:#e8eaf6,stroke:#3f51b5
style INF fill:#e8f5e9,stroke:#4caf50
Offline vs Online Store¶
| Store | Purpose | Latency | Storage | Use Case |
|---|---|---|---|---|
| Offline | Training data | Seconds-minutes | Data Lake (S3, BigQuery) | Model training, analytics |
| Online | Real-time serving | <10ms | KV Store (Redis, DynamoDB) | Real-time predictions |
Point-in-Time Correctness¶
Problem: Training data leakage when features are computed using future data.
Solution: Point-in-time joins ensure features reflect state at prediction time.
-- Point-in-time correct feature retrieval
SELECT
e.entity_id,
e.event_time,
f.feature_value
FROM events e
JOIN feature_history f
ON e.entity_id = f.entity_id
AND f.feature_time <= e.event_time
AND f.feature_time > e.event_time - INTERVAL '7 days'
Feature Store Comparison¶
| Feature | Feast | Tecton | SageMaker |
|---|---|---|---|
| Type | Open-source | Managed | AWS-native |
| Cost | Infra only | Usage-based | Pay-per-use |
| Streaming | Limited | Excellent | Via Kinesis |
| Best For | Full control | Enterprise scale | AWS ecosystem |
Python: Feast Example¶
from feast import Entity, FeatureView, Field
from feast.types import Float32, Int64
from datetime import timedelta
# Define entity
user = Entity(
name="user_id",
join_keys=["user_id"],
)
# Define feature view
user_features = FeatureView(
name="user_transaction_features",
entities=[user],
ttl=timedelta(days=365),
schema=[
Field(name="total_transactions", dtype=Int64),
Field(name="avg_transaction_value", dtype=Float32),
],
online=True,
source=batch_source, # BigQuery, Snowflake, etc.
)
# Retrieve features for inference
from feast import FeatureStore
store = FeatureStore(repo_path=".")
# Online serving (real-time)
features = store.get_online_features(
features=[
"user_transaction_features:total_transactions",
"user_transaction_features:avg_transaction_value",
],
entity_rows=[{"user_id": 12345}]
).to_dict()
# Offline training (batch)
training_df = store.get_historical_features(
entity_df=entity_df, # Contains user_id and event_time
features=[
"user_transaction_features:total_transactions",
"user_transaction_features:avg_transaction_value",
],
).to_df()
Key Concepts¶
| Concept | Description |
|---|---|
| Feature View | Logical grouping of related features |
| Entity | Primary key for feature lookup (user_id, item_id) |
| TTL | Time-to-live for feature freshness |
| Materialization | Process of writing features to online store |
| Feature Service | Collection of features for a specific model |
Interview Questions¶
Q: Зачем нужен Feature Store?
A: Feature Store решает три проблемы: (1) Training-serving skew — одинаковые features для train и inference; (2) Reusability — features используются многократно разными командами; (3) Governance — versioning, lineage, access control для features.
Q: В чём разница между Online и Offline Store?
A: Offline Store — для training, хранит historical data в data lake/warehouse, оптимизирован для throughput. Online Store — для real-time inference, хранит только latest features в low-latency KV store (Redis), оптимизирован для latency (<10ms).
Q: Что такое Point-in-Time Correctness?
A: При обучении нужно использовать features такими, какими они были на момент события (prediction time), а не текущими. Иначе — data leakage: модель видит "future" информацию. Feature Store автоматически делает point-in-time joins.
Q: Feast vs Tecton vs SageMaker — когда что выбирать?
A: Feast — для команд с limited budget, нужен full control over infrastructure. Tecton — для enterprise с heavy streaming needs, managed service. SageMaker — если уже в AWS ecosystem, нужна tight integration с SageMaker pipelines.
10. ML Infrastructure (Model Registry & Experiment Tracking)¶
Источники: ML Journey "Model Versioning Strategies" (Sep 2025), Conduktor "Real-Time ML Pipelines" (Feb 2026)
Why ML Infrastructure Matters¶
Проблема: ML development is inherently experimental — constant changes to data, hyperparameters, architectures. Without proper infrastructure: - Невозможно reproduce results - Нет audit trail для compliance - Training-serving mismatch - Команды дублируют work
Core Components¶
| Component | Purpose | Popular Tools |
|---|---|---|
| Experiment Tracking | Log parameters, metrics, artifacts | MLflow, W&B, Neptune, Comet |
| Model Registry | Version models, stage transitions | MLflow Registry, W&B Artifacts |
| Data Versioning | Track datasets, lineage | DVC, Delta Lake, LakeFS |
| Pipeline Orchestration | Automate ML workflows | Kubeflow, Airflow, Prefect, ZenML |
Tool Comparison¶
| Feature | MLflow | Weights & Biases | DVC |
|---|---|---|---|
| Type | Open-source | SaaS | Open-source |
| Experiment Tracking | Excellent | Excellent | Basic |
| Model Registry | Built-in | Built-in | Via Git |
| Visualization | Good | Excellent | Basic |
| Pipeline Management | Projects | Limited | Excellent |
| Cost | Self-hosted | Usage-based | Free |
| Best For | End-to-end lifecycle | Team collaboration | Git-based workflows |
MLflow Architecture¶
graph TD
ML["MLflow"]
ML --> TR["Tracking<br/>(experiments)"]
ML --> PR["Projects<br/>(packaging)"]
ML --> MO["Models<br/>(deployment)"]
ML --> RE["Registry<br/>(lifecycle)"]
TR --> BS["Backend Store<br/>(PostgreSQL)"]
PR --> ENV["Conda/Docker<br/>Environments"]
MO --> SRV["Serving Tools<br/>(REST, Batch)"]
RE --> STG["Staging<br/>(Prod/Archived)"]
style ML fill:#f3e5f5,stroke:#9c27b0
style TR fill:#e8eaf6,stroke:#3f51b5
style PR fill:#e8eaf6,stroke:#3f51b5
style MO fill:#e8eaf6,stroke:#3f51b5
style RE fill:#e8eaf6,stroke:#3f51b5
style BS fill:#e8f5e9,stroke:#4caf50
style ENV fill:#e8f5e9,stroke:#4caf50
style SRV fill:#e8f5e9,stroke:#4caf50
style STG fill:#e8f5e9,stroke:#4caf50
Python: MLflow Example¶
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
# Start experiment
mlflow.set_experiment("fraud_detection_v2")
with mlflow.start_run():
# Log parameters
mlflow.log_param("n_estimators", 100)
mlflow.log_param("max_depth", 10)
# Train model
model = RandomForestClassifier(n_estimators=100, max_depth=10)
model.fit(X_train, y_train)
# Log metrics
accuracy = model.score(X_test, y_test)
mlflow.log_metric("accuracy", accuracy)
mlflow.log_metric("f1_score", f1_score(y_test, model.predict(X_test)))
# Log model
mlflow.sklearn.log_model(model, "model")
# Register model
mlflow.register_model(
f"runs:/{mlflow.active_run().info.run_id}/model",
"fraud_detection_model"
)
Model Registry Workflow¶
1. Experiment → 2. Staging → 3. Production → 4. Archived
│ │ │ │
Many runs Manual/ Auto-scale Keep for
tracked Auto approve serving rollback
Model Stages: - None/Archived — Initial or retired models - Staging — Ready for testing/validation - Production — Live serving - Challenger — A/B testing against champion
Python: Model Registry Operations¶
from mlflow.tracking import MlflowClient
client = MlflowClient()
# List all versions of a model
versions = client.search_model_versions("name='fraud_detection_model'")
# Transition model to production
client.transition_model_version_stage(
name="fraud_detection_model",
version=3,
stage="Production"
)
# Get production model
prod_version = client.get_latest_versions(
"fraud_detection_model",
stages=["Production"]
)[0]
# Load production model
model = mlflow.sklearn.load_model(f"models:/fraud_detection_model/Production")
Decision Framework¶
| Scenario | Choose |
|---|---|
| Small team, budget-constrained | MLflow (self-hosted) |
| Heavy visualization needs | W&B |
| Git-centric workflow | DVC + MLflow |
| Enterprise compliance | MLflow + Databricks |
| Cloud-native, AWS | SageMaker + Model Registry |
Interview Questions¶
Q: Зачем нужен Model Registry?
A: Model Registry обеспечивает: (1) Versioning — каждая версия модели сохранена; (2) Staging — workflow от testing до production; (3) Lineage — связь с experiment, data, code; (4) Rollback — возможность вернуться к предыдущей версии; (5) Approval workflow — процесс promotion в production.
Q: MLflow vs Weights & Biases — когда что?
A: MLflow — open-source, self-hosted, для команд которые хотят full control, end-to-end lifecycle (tracking + registry + deployment). W&B — SaaS, superior visualization, team collaboration, quick setup, но ongoing cost. Для enterprise compliance — MLflow, для speed/collaboration — W&B.
Q: Что такое experiment tracking?
A: Experiment tracking — систематическое логирование всех aspects ML эксперимента: hyperparameters, metrics, artifacts (models, datasets), code version, environment. Позволяет сравнивать experiments, reproduce results, найти лучший model. Инструменты: MLflow Tracking, W&B, Neptune.
Q: Как организовать ML platform для команды из 20 ML engineers?
A: Ключевые компоненты: (1) Experiment Tracking (MLflow/W&B) — central server для всех команд; (2) Model Registry — единое место для production models; (3) Feature Store — shared features; (4) CI/CD — automated training и deployment pipelines; (5) Monitoring — model performance в production; (6) Access control — RBAC для compliance.
11. Multi-Armed Bandits (Exploration vs Exploitation)¶
Лучшие источники¶
Practical Guides: - Philipp Dubach: Bandits and Agents - Netflix/Spotify 2026 (Jan 2026) - Statsig: Thompson Sampling (June 2025)
Academic: - Russo et al.: Tutorial on Thompson Sampling
The Exploration-Exploitation Dilemma¶
Classic problem: Casino slot machines with unknown payouts. Do you keep playing the machine that's been paying out, or try others?
Regret formula: $\(R(T) = \mu^* \cdot T - \sum_{t=1}^{T} \mu(a_t)\)$
Where: - \(\mu^*\) = best possible average reward - \(\mu(a_t)\) = reward from action at time \(t\) - Goal: Make regret sublinear in \(T\) (learn fast enough)
Algorithm Comparison¶
| Algorithm | Strategy | Pros | Cons |
|---|---|---|---|
| ε-greedy | Random exploration (ε% of time) | Simple, easy to implement | Explores blindly |
| UCB | Prefer uncertain actions | Guided exploration | Can be too optimistic |
| Thompson Sampling | Sample from posterior | Bayesian, adaptive | Computational overhead |
ε-Greedy¶
Algorithm: 1. With probability ε: choose random action (explore) 2. With probability 1-ε: choose best known action (exploit)
import numpy as np
def epsilon_greedy(n_arms, n_rounds, true_rewards, epsilon=0.1):
rewards = np.zeros(n_arms)
counts = np.zeros(n_arms)
for t in range(n_rounds):
if np.random.random() < epsilon:
arm = np.random.randint(n_arms) # Explore
else:
arm = np.argmax(rewards / (counts + 1e-10)) # Exploit
reward = np.random.binomial(1, true_rewards[arm])
rewards[arm] += reward
counts[arm] += 1
return rewards / (counts + 1e-10)
Upper Confidence Bound (UCB)¶
Formula: $\(UCB_i = \bar{X}_i + \sqrt{\frac{2\ln n}{n_i}}\)$
Where: - \(\bar{X}_i\) = average reward for arm \(i\) - \(n\) = total rounds - \(n_i\) = times arm \(i\) was pulled
Intuition: "Optimism in the face of uncertainty" — prefer actions with high uncertainty.
def ucb1(n_arms, n_rounds, true_rewards):
rewards = np.zeros(n_arms)
counts = np.zeros(n_arms)
# Initial exploration: pull each arm once
for arm in range(n_arms):
rewards[arm] = np.random.binomial(1, true_rewards[arm])
counts[arm] = 1
for t in range(n_arms, n_rounds):
ucb_values = rewards / counts + np.sqrt(2 * np.log(t) / counts)
arm = np.argmax(ucb_values)
reward = np.random.binomial(1, true_rewards[arm])
rewards[arm] += reward
counts[arm] += 1
return rewards / counts
Thompson Sampling (Bayesian)¶
Key idea: Maintain probability distributions over rewards, sample and pick the best.
For Bernoulli rewards (click/no-click): Use Beta distribution.
from scipy.stats import beta
def thompson_sampling(n_arms, n_rounds, true_rewards):
# Beta distribution parameters: α (successes), β (failures)
alphas = np.ones(n_arms)
betas = np.ones(n_arms)
for t in range(n_rounds):
# Sample from each arm's posterior
samples = beta.rvs(alphas, betas)
arm = np.argmax(samples)
reward = np.random.binomial(1, true_rewards[arm])
# Update posterior
alphas[arm] += reward
betas[arm] += 1 - reward
return alphas / (alphas + betas)
Why Thompson Sampling wins: - Explores more when uncertain, exploits more when confident - Adapts naturally to changing conditions - No manual tuning of exploration rate
Contextual Bandits (LinUCB)¶
When: Actions depend on context (user features, time of day, etc.)
LinUCB formula: $\(UCB_i = x_i^T \hat{\theta}_i + \alpha \sqrt{x_i^T A_i^{-1} x_i}\)$
Where: - \(x_i\) = context vector - \(\hat{\theta}_i\) = learned parameters - \(A_i\) = design matrix for arm \(i\)
When to Use Bandits vs A/B Tests¶
| A/B Test | Bandit | |
|---|---|---|
| Goal | Statistical significance | Maximize reward |
| Duration | Fixed period | Continuous |
| Exploration | Equal allocation | Adaptive |
| Best for | Learning, papers | Production optimization |
| Regret | Linear in T | Sublinear in T |
Use bandits when: - Continuous optimization (ads, recommendations) - Delayed feedback acceptable - Can't afford equal allocation to bad options
Use A/B tests when: - Need statistical rigor (publishing, compliance) - Comparing few variants - Need to understand why something works
Production Use Cases¶
Netflix: Multi-armed bandit for homepage recommendations - Offline: Train collaborative filtering models - Nearline: Update user embeddings after clicks - Online: Real-time personalization with bandits
Spotify AI DJ: Hybrid approach - "Agentic router" decides: LLM or classical bandit - Complex queries → expensive reasoning - Simple queries → fast bandit path
Interview Questions¶
Q: When would you use Thompson Sampling over ε-greedy?
Thompson Sampling adapts exploration based on uncertainty. It explores more when uncertain, exploits more when confident. ε-greedy explores blindly regardless of confidence. Use Thompson for non-stationary environments, ε-greedy for simplicity.
Q: What's the difference between bandits and A/B testing?
A/B tests allocate traffic equally for statistical rigor. Bandits adapt allocation based on performance. Bandits minimize regret, A/B tests find significance. Use bandits for continuous optimization, A/B tests for learning.
Q: What are contextual bandits?
Bandits where the reward depends on context (user features, time, device). LinUCB and Thompson Sampling with linear payoff are common algorithms. Netflix uses contextual bandits: recommendations depend on user history, time of day, device.
Q: How does Netflix use bandits in production?
Three-tier architecture: Offline (train deep models), Nearline (update embeddings after clicks), Online (real-time bandit decisions). The goal is "incrementality" — not what users will watch, but what they wouldn't have found without the recommendation.
12. Online Learning (Streaming ML)¶
Лучшие источники¶
Practical Guides: - ML Journey: Online Learning Algorithms for Streaming Data (Nov 2025) - Conduktor: Real-Time ML Pipelines (Feb 2026) - Medium: Real-Time ML with partial_fit (July 2025)
Libraries: - River: ML for Streaming Data — Python library for online/incremental learning - scikit-learn partial_fit estimators — incremental learning support
Apache Flink: - Confluent: Flink for Model Inference (Oct 2025) - Apache Flink 2.2.0 ML Release — online learning capabilities
Online vs Batch Learning¶
| Batch Learning | Online Learning | |
|---|---|---|
| Training | Retrain on all data | Update with each sample |
| Memory | O(dataset size) | O(model size) |
| Adaptation | Periodic | Real-time |
| Best for | Static distributions | Streaming, concept drift |
Online Gradient Descent¶
Key difference: Update weights after each sample (or mini-batch), not after epoch.
import numpy as np
class OnlineLinearRegression:
def __init__(self, n_features, learning_rate=0.01):
self.weights = np.zeros(n_features)
self.lr = learning_rate
def partial_fit(self, X, y):
"""Update model with one or more samples."""
for xi, yi in zip(X, y):
prediction = np.dot(xi, self.weights)
error = yi - prediction
self.weights += self.lr * error * xi
return self
def predict(self, X):
return X @ self.weights
# Usage in streaming context
model = OnlineLinearRegression(n_features=10)
for batch in data_stream:
X_batch, y_batch = preprocess(batch)
model.partial_fit(X_batch, y_batch)
predictions = model.predict(new_data)
Regret Framework¶
Goal: Minimize regret compared to the best fixed decision.
Good online learners have sublinear regret: \(\text{Regret}(T) = O(\sqrt{T})\)
FTRL-Proximal (Follow-The-Regularized-Leader)¶
Used by: Google for large-scale click-through rate prediction.
Update rule: $\(w_{t+1} = \arg\min_w \left( \sum_{s=1}^{t} g_s \cdot w + \lambda_1 \|w\|_1 + \frac{\lambda_2}{2} \|w\|_2^2 \right)\)$
- L1 regularization → sparsity (critical for high-dimensional features)
- L2 regularization → stability
import numpy as np
class FTRLProximal:
def __init__(self, n_features, alpha=0.1, beta=1.0, l1=1.0, l2=1.0):
self.z = np.zeros(n_features) # accumulated gradients
self.n = np.zeros(n_features) # accumulated squared gradients
self.alpha = alpha
self.beta = beta
self.l1 = l1
self.l2 = l2
def predict(self, x):
w = self._get_weights()
return np.dot(x, w)
def _get_weights(self):
"""Compute sparse weights from accumulated statistics."""
w = np.zeros_like(self.z)
mask = np.abs(self.z) > self.l1
w[mask] = -(self.z[mask] - np.sign(self.z[mask]) * self.l1) / \
((self.beta + np.sqrt(self.n[mask])) / self.alpha + self.l2)
return w
def update(self, x, y):
"""Update with one sample (y ∈ {0, 1} for logistic)."""
w = self._get_weights()
p = 1 / (1 + np.exp(-np.dot(x, w))) # sigmoid
g = (p - y) * x # gradient
sigma = (np.sqrt(self.n + g ** 2) - np.sqrt(self.n)) / self.alpha
self.z += g - sigma * w
self.n += g ** 2
Concept Drift Detection¶
Types of drift: 1. Sudden: Distribution changes abruptly (system update, bug) 2. Gradual: Slow shift over time (user preferences, seasonality) 3. Incremental: Continuous small changes 4. Recurring: Patterns repeat (seasonal)
Detection methods:
| Method | Type | Idea |
|---|---|---|
| ADWIN | Window-based | Adaptive window size, detects change when cutpoint found |
| DDM | Error-based | Monitors error rate, alerts when error + std dev increase |
| EDDM | Distance-based | Tracks distance between classification errors |
| Page-Hinkley | Statistical | Detects mean change in Gaussian stream |
from river import drift
# ADWIN for concept drift detection
adwin = drift.ADWIN(delta=0.002)
for i, (x, y) in enumerate(data_stream):
# Make prediction
y_pred = model.predict_one(x)
# Update model
model.learn_one(x, y)
# Check for drift (using accuracy as metric)
correct = 1 if y_pred == y else 0
adwin.update(correct)
if adwin.drift_detected:
print(f"Drift detected at sample {i}")
# Option 1: Reset model
# model = reset_model()
# Option 2: Incremental adaptation
# model.adapt_to_drift()
Hoeffding Trees (VFDT)¶
For streaming data: Build decision trees that are "probably approximately correct" with high probability.
Hoeffding bound: $\(\epsilon = \sqrt{\frac{R^2 \ln(1/\delta)}{2n}}\)$
Where: - \(R\) = range of the attribute (e.g., 1 for Bernoulli) - \(\delta\) = confidence (1 - probability) - \(n\) = number of samples seen
Idea: If the best split attribute is \(\epsilon\) better than the second-best, split with confidence \(1-\delta\).
from river import tree
# Hoeffding Adaptive Tree (HAT)
model = tree.HoeffdingAdaptiveTreeClassifier(
grace_period=100, # Min samples before split evaluation
max_depth=10, # Tree depth limit
split_confidence=1e-7, # Hoeffding bound δ
tie_threshold=0.05 # Minimum gain difference
)
# Online training
for x, y in data_stream:
y_pred = model.predict_one(x)
model.learn_one(x, y)
Flink ML Pipeline¶
Architecture:
Kafka → Flink DataStream → Feature Engineering → Model Inference → Kafka/DB
↓
Online Learning (optional)
Flink ML example:
// Flink ML pipeline for streaming
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Read from Kafka
DataStream<Record> stream = env
.addSource(new FlinkKafkaConsumer<>("topic", schema, props));
// Feature extraction
DataStream<Features> features = stream
.process(new FeatureExtractor());
// Online model inference
DataStream<Prediction> predictions = features
.map(new ModelInference(model));
// Write predictions
predictions.addSink(new FlinkKafkaProducer<>("predictions", schema, props));
Production Considerations¶
Challenges: 1. Data quality: Missing values, outliers in real-time 2. Feature engineering: Must be computed on-the-fly 3. Model versioning: A/B between model versions 4. Backpressure: Handle bursty traffic
Best practices: - Buffer accumulation: Accumulate features for mini-batch updates - Shadow mode: Deploy new model alongside old, compare - Canary deployment: Gradual rollout to subset of traffic - Rollback plan: Quick way to revert to previous model
Interview Questions¶
Q: When would you use online learning vs batch learning?
Online learning when: (1) Data arrives continuously (logs, sensors), (2) Concept drift is expected (user preferences), (3) Memory constraints (can't store all data), (4) Low latency requirements. Batch learning when: (1) Static distribution, (2) Need optimal accuracy, (3) Can afford periodic retraining, (4) Want to use all data.
Q: How do you detect concept drift in production?
Monitor model performance metrics over time. ADWIN (Adaptive Windowing) automatically adjusts window size when distribution changes. DDM (Drift Detection Method) tracks error rate and alerts when error significantly increases. EDDM (Early Drift Detection Method) monitors distance between errors. Set up alerts for: accuracy drops, feature distribution shifts (PSI), prediction confidence changes.
Q: What's the difference between online learning and incremental learning?
Often used interchangeably, but: Online learning = update after each sample (or small batch). Incremental learning = ability to learn from new data without catastrophic forgetting. All online learners are incremental, but not all incremental methods are online (e.g., fine-tuning on new data batch).
Q: How does River library compare to scikit-learn's partial_fit?
River is built specifically for streaming/online learning: (1) All algorithms support online learning, (2) Built-in drift detection (ADWIN, DDM), (3) Streaming metrics that update incrementally, (4) Better memory efficiency. scikit-learn's partial_fit: (1) Only some estimators support it, (2) No drift detection, (3) Same API but limited functionality. Use River for streaming, scikit-learn when batch training with occasional incremental updates.
Q: How would you implement real-time fraud detection with online learning?
(1) Stream: Kafka for transaction events, (2) Features: Real-time feature store for user history, (3) Model: FTRL-Proximal for sparse high-dimensional features, (4) Drift detection: ADWIN on prediction accuracy, (5) Serving: Flink for feature extraction + model inference, (6) Fallback: Rule-based system if model confidence low, (7) Monitoring: Track precision/recall on labeled feedback, (8) Update: Online model updates from delayed labels.
13. Multi-Stage Recommender Systems¶
Лучшие источники¶
Architecture Deep Dives: - Shaped.ai: Two-Tower Model Deep Dive (May 2025) - Fan Luo: Design a Modern Recommendation System (Oct 2025) - Shaped.ai: Anatomy of Modern Ranking Architectures (Oct 2025)
Production Case Studies: - YouTube Recommendation Paper (2016) — foundational multi-stage architecture - Allegro.com: Two Tower Recommendations (Jul 2025) — real-world production story - Netflix Research: Recommendations (2025)
Interview Prep: - Hello Interview: Design Recommender System - IGotAnOffer: ML System Design
The Funnel Architecture¶
Why multi-stage? Scale. YouTube has billions of videos, Netflix has millions. Can't rank all candidates in <200ms.
graph TD
ALL["1B+ items"] --> S1["Stage 1: RETRIEVAL<br/>Two-Tower, MF, ANN<br/>Output: ~1000, ~10ms"]
S1 --> S2["Stage 2: PRE-RANKING<br/>Light model (small DNN)<br/>Output: ~500, ~20ms"]
S2 --> S3["Stage 3: RANKING<br/>Deep model (DIN, DCN, DeepFM)<br/>Output: ~100, ~100ms"]
S3 --> S4["Stage 4: RE-RANKING<br/>Diversity, freshness, business rules<br/>Output: Final ~20, ~20ms"]
style ALL fill:#e8eaf6,stroke:#3f51b5
style S1 fill:#e8f5e9,stroke:#4caf50
style S2 fill:#fff3e0,stroke:#ef6c00
style S3 fill:#f3e5f5,stroke:#9c27b0
style S4 fill:#fce4ec,stroke:#c62828
Total Latency: ~150ms (within 200ms budget)
Stage 1: Retrieval (Two-Tower Architecture)¶
Core idea: Learn embeddings for users and items, find nearest neighbors.
graph TD
UF["User Features"] --> UT["User Tower (DNN)"]
IF["Item Features"] --> IT["Item Tower (DNN)"]
UT --> UE["User Embedding"]
IT --> IE["Item Embedding"]
UE --> DOT["Dot Product<br/>Similarity Score"]
IE --> DOT
style UF fill:#e8eaf6,stroke:#3f51b5
style IF fill:#e8eaf6,stroke:#3f51b5
style UT fill:#f3e5f5,stroke:#9c27b0
style IT fill:#f3e5f5,stroke:#9c27b0
style UE fill:#e8f5e9,stroke:#4caf50
style IE fill:#e8f5e9,stroke:#4caf50
style DOT fill:#fff3e0,stroke:#ef6c00
Training with in-batch negatives:
import torch
import torch.nn as nn
class TwoTowerModel(nn.Module):
def __init__(self, user_dim, item_dim, embedding_dim=64):
super().__init__()
self.user_tower = nn.Sequential(
nn.Linear(user_dim, 256),
nn.ReLU(),
nn.Linear(256, embedding_dim)
)
self.item_tower = nn.Sequential(
nn.Linear(item_dim, 256),
nn.ReLU(),
nn.Linear(256, embedding_dim)
)
self.temperature = 0.1
def encode_user(self, user_features):
return self.user_tower(user_features)
def encode_item(self, item_features):
return self.item_tower(item_features)
def forward(self, user_features, item_features):
user_emb = self.encode_user(user_features) # [batch, dim]
item_emb = self.encode_item(item_features) # [batch, dim]
# Normalize for cosine similarity
user_emb = nn.functional.normalize(user_emb, dim=-1)
item_emb = nn.functional.normalize(item_emb, dim=-1)
# In-batch negatives: [batch, batch]
logits = torch.matmul(user_emb, item_emb.T) / self.temperature
# Labels are diagonal (positive pairs)
labels = torch.arange(user_features.size(0), device=user_features.device)
return nn.CrossEntropyLoss()(logits, labels)
ANN Indexes (FAISS, ScaNN)¶
Why ANN? Exact nearest neighbor is O(N). ANN is O(log N) with ~95% recall.
FAISS (Facebook AI Similarity Search):
import faiss
import numpy as np
# Build index (item embeddings from Two-Tower)
n_items = 1_000_000
embedding_dim = 64
item_embeddings = np.random.randn(n_items, embedding_dim).astype('float32')
# IVF + HNSW index (good balance of speed/recall)
nlist = 100 # number of clusters
quantizer = faiss.IndexHNSWFlat(embedding_dim, 32)
index = faiss.IndexIVFFlat(quantizer, embedding_dim, nlist, faiss.METRIC_INNER_PRODUCT)
# Train on sample
index.train(item_embeddings[:10000])
index.add(item_embeddings)
# Search
user_emb = np.random.randn(1, embedding_dim).astype('float32')
k = 100 # retrieve top 100
distances, indices = index.search(user_emb, k)
ScaNN (Google):
# ScaNN often faster for inner product search
import scann
# Build searchable index
searcher = scann.scann_ops_pybind.builder(item_embeddings, 100, "dot_product") \
.tree(num_leaves=100, num_leaves_to_search=10) \
.score_ah(2, anisotropic_quantization_threshold=0.2) \
.reorder(100) \
.build()
# Query
neighbors, distances = searcher.search(user_emb[0]) # single vector
Index comparison:
| Index | Speed | Recall | Memory | Best for |
|---|---|---|---|---|
| IVF | Fast | ~95% | Low | General purpose |
| HNSW | Very fast | ~97% | High | Low latency |
| ScaNN | Fastest | ~96% | Medium | Inner product |
| Flat | Slow | 100% | Low | Small scale |
Stage 3: Ranking (Deep Models)¶
Input: ~500 candidates with rich features.
Architecture options:
| Model | Key Idea | When to Use |
|---|---|---|
| DIN (Deep Interest Network) | Attention over user history | Long user history |
| DCN (Deep & Cross) | Explicit feature crossing | High-order interactions |
| DeepFM | Factorization + Deep | Sparse categorical |
| DCNv2 | Low-rank crossing | Large scale |
Ranking model structure:
class RankingModel(nn.Module):
def __init__(self, feature_dim):
super().__init__()
# Feature interactions
self.cross_layer = CrossLayer(feature_dim, num_layers=3)
# Deep part
self.deep = nn.Sequential(
nn.Linear(feature_dim, 512),
nn.ReLU(),
nn.Linear(512, 256),
nn.ReLU(),
nn.Linear(256, 1)
)
def forward(self, features):
cross_out = self.cross_layer(features)
deep_out = self.deep(features)
return torch.sigmoid(cross_out + deep_out)
Stage 4: Re-ranking¶
Goals beyond relevance: 1. Diversity: Don't show 10 similar items 2. Freshness: Promote new content 3. Business rules: Remove watched, banned content 4. Fairness: Don't dominate from one source
Diversity techniques: - MMR (Maximal Marginal Relevance): Balance relevance and diversity - DPP (Determinantal Point Process): Mathematically principled diversity
def mmr_rerank(scores, embeddings, lambda_param=0.5, k=20):
"""Maximal Marginal Relevance re-ranking."""
selected = []
remaining = list(range(len(scores)))
for _ in range(k):
if not remaining:
break
best_score = -np.inf
best_idx = None
for idx in remaining:
# Relevance score
relevance = scores[idx]
# Diversity penalty (max similarity to already selected)
if selected:
similarities = [
np.dot(embeddings[idx], embeddings[s])
for s in selected
]
diversity_penalty = max(similarities)
else:
diversity_penalty = 0
# MMR score
mmr_score = lambda_param * relevance - (1 - lambda_param) * diversity_penalty
if mmr_score > best_score:
best_score = mmr_score
best_idx = idx
selected.append(best_idx)
remaining.remove(best_idx)
return selected
Production Architecture (YouTube-scale)¶
graph TD
FS["Feature Store<br/>User history, item features"] --> CG["Candidate Gen<br/>Two-Tower ANN, Collaborative, Trending"]
CG -->|~1000 candidates| RM["Ranking Model<br/>(Deep model on GPU)"]
RM -->|~100 ranked| RR["Re-ranking<br/>Diversity, freshness, filters"]
RR -->|~20 final| RESP["Response to User"]
style FS fill:#e8eaf6,stroke:#3f51b5
style CG fill:#e8f5e9,stroke:#4caf50
style RM fill:#f3e5f5,stroke:#9c27b0
style RR fill:#fff3e0,stroke:#ef6c00
style RESP fill:#e8f5e9,stroke:#4caf50
Interview Questions¶
Q: Why do we need multi-stage recommendation instead of a single model?
Scale and latency. If you have 1B items and need <200ms response, you can't score all items with a deep model. Multi-stage is a funnel: Retrieval quickly narrows to ~1000 candidates using simple models (Two-Tower + ANN). Ranking applies expensive deep models to ~500. Re-ranking enforces business rules. This is the only viable architecture at scale — used by YouTube, Netflix, TikTok.
Q: Explain Two-Tower architecture.
Two separate neural networks: User Tower encodes user features (history, demographics) into embedding. Item Tower encodes item features (ID, metadata) into embedding. Both towers share the same embedding dimension. Similarity = dot product of embeddings. Train with in-batch negatives (each user's positive item is contrasted with all other items in batch). At serving time, pre-compute item embeddings, build ANN index, query with user embedding.
Q: What's the difference between FAISS and ScaNN?
Both are ANN libraries for fast nearest neighbor search. FAISS (Meta) supports many index types (IVF, HNSW, PQ), more flexible, larger community. ScaNN (Google) optimized for inner product search, often faster for recommendation use cases. FAISS better for L2/cosine, ScaNN for dot product. Production choice often comes down to: (1) similarity metric, (2) latency requirements, (3) memory constraints.
Q: How do you handle cold start in recommendations?
For new users: (1) Ask for preferences on signup, (2) Use demographic/content-based features instead of history, (3) Show trending/popular items, (4) Bootstrap from similar users. For new items: (1) Use content features (title, category, creator), (2) Explore-exploit: show to small audience first, (3) Bandit algorithms for balancing new item exposure vs known good items.
Q: What is MMR and why is it used in re-ranking?
MMR (Maximal Marginal Relevance) balances relevance and diversity. Standard ranking gives similar items (e.g., 10 cat videos). MMR selects items that are both relevant AND different from already-selected items. Formula: MMR = λ × relevance - (1-λ) × max_similarity_to_selected. Higher λ = more relevance, lower = more diversity. Essential for user engagement (boredom from similar content) and discovery.
Q: How would you design a recommendation system for a new platform with no user history?
Start simple: (1) Trending/popular items for everyone, (2) Content-based filtering using item metadata, (3) Collaborative filtering as soon as you have interactions. Gradually add: (4) Two-Tower for retrieval when scale justifies, (5) Deep ranking models. Key insight: Don't over-engineer early. A simple popularity baseline + content-based often beats complex models with sparse data.
14. Causal Inference for ML¶
Лучшие источники¶
Tutorials: - ML Journey: Causal Inference with DoWhy and EconML (July 2025) - Medium: Building Causal Inference Project in Python (Nov 2025) - GitHub: Official DoWhy + EconML Tutorial
Interview Prep: - GrabNGoInfo: Top 10 Causal Inference Interview Questions - Medium: ATE vs CATE vs ATT vs ATC
The Fundamental Problem¶
Correlation ≠ Causation. We can never observe both potential outcomes for the same unit.
Treatment Effect Definitions¶
| Metric | Definition | Formula |
|---|---|---|
| ATE | Average Treatment Effect | \(E[Y(1) - Y(0)]\) |
| ATT | Average Treatment Effect on Treated | \(E[Y(1) - Y(0) \| T=1]\) |
| ATC | Average Treatment Effect on Control | \(E[Y(1) - Y(0) \| T=0]\) |
| CATE | Conditional ATE (heterogeneous) | \(E[Y(1) - Y(0) \| X=x]\) |
Key Assumptions¶
1. SUTVA: No interference between units, one version of treatment 2. Unconfoundedness: \(Y(1), Y(0) \perp T \mid X\) — all confounders observed 3. Overlap: \(0 < P(T=1 \mid X) < 1\) — every unit can receive treatment 4. Consistency: Observed outcome = potential outcome under assigned treatment
Method 1: Propensity Score Matching¶
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import NearestNeighbors
import numpy as np
def propensity_score_matching(X, treatment, outcome):
# Step 1: Estimate propensity scores
ps_model = LogisticRegression(max_iter=1000)
ps_model.fit(X, treatment)
propensity_scores = ps_model.predict_proba(X)[:, 1]
# Step 2: Match treated to control units
treated_idx = np.where(treatment == 1)[0]
control_idx = np.where(treatment == 0)[0]
nn = NearestNeighbors(n_neighbors=1)
nn.fit(propensity_scores[control_idx].reshape(-1, 1))
distances, matched_control_idx = nn.kneighbors(
propensity_scores[treated_idx].reshape(-1, 1)
)
# Step 3: Calculate ATT
treated_outcomes = outcome[treated_idx]
matched_control_outcomes = outcome[control_idx[matched_control_idx.flatten()]]
return np.mean(treated_outcomes - matched_control_outcomes)
Method 2: Difference-in-Differences (DiD)¶
Formula: $\(\text{DiD} = (Y_{treat, post} - Y_{treat, pre}) - (Y_{control, post} - Y_{control, pre})\)$
Assumption: Parallel trends — without treatment, both groups would follow same trend.
Method 3: Instrumental Variables (IV)¶
Requirements for instrument Z: 1. Relevance: \(Z\) affects \(T\) 2. Exclusion: \(Z\) affects \(Y\) only through \(T\) 3. Exogeneity: \(Z\) independent of confounders
Method 4: Uplift Modeling¶
Meta-learners: | Method | Description | Best for | |--------|-------------|----------| | S-Learner | Single model with treatment as feature | Simple cases | | T-Learner | Two models for T=0/1 | Heterogeneous effects | | X-Learner | T-Learner + correction | Imbalanced treatment |
When to Use Which Method¶
| Situation | Method |
|---|---|
| RCT data | Direct comparison |
| Observational with confounders | Propensity Score, DiD |
| Policy change over time | Difference-in-Differences |
| Non-compliance in RCT | Instrumental Variables |
| Personalization | Uplift Modeling |
Interview Questions¶
Q: What's the difference between ATE and ATT?
ATE = effect for entire population. ATT = effect for those who received treatment. They differ when there's selection bias. In RCTs, ATE = ATT. In observational data, ATT is often what we can credibly estimate.
Q: What is the parallel trends assumption in DiD?
Without treatment, treatment and control groups would follow same trend over time. Untestable but can assess pre-treatment trends. If groups were diverging before treatment, assumption violated.
Q: When would you use instrumental variables?
When there's unmeasured confounding but you have a valid instrument: (1) Random assignment with non-compliance, (2) Geographic variation, (3) Policy timing changes across states. Instrument must affect treatment (relevance) and affect outcome ONLY through treatment (exclusion).
Q: What is uplift modeling?
Predicts individual treatment effects to identify who to target. Goal: Find "persuadables" who respond positively, avoid "sleeping dogs" who respond negatively. Used in marketing, medicine, pricing.
15. Vector Databases for ML¶
Лучшие источники¶
Comparisons: - DataCamp: The 7 Best Vector Databases in 2026 - Markaicode: Pinecone vs Weaviate vs Milvus 2025 - Firecrawl: Best Vector Databases 2025
Index Deep Dives: - Medium: IVF vs HNSW vs PQ Index Guide (Jan 2026) - The Data Guy: Production Video Search Infrastructure (Dec 2025)
Hybrid Search: - TowardsAI: Hybrid Search RAG That Works (Jan 2026) - AILog: Hybrid Search for RAG Tutorial
What is a Vector Database?¶
Purpose: Store and search high-dimensional embeddings efficiently using Approximate Nearest Neighbor (ANN) algorithms.
Key operations: 1. Upsert: Add/update vectors with metadata 2. Query: Find k nearest neighbors to query vector 3. Delete: Remove vectors by ID or filter
ANN Index Types¶
| Index | Description | Memory | Recall | Latency | Best For |
|---|---|---|---|---|---|
| HNSW | Hierarchical Navigable Small World graph | High | ~98% | Very Low | Real-time search, high recall |
| IVF | Inverted File (cluster-based) | Medium | ~95% | Low | Large scale, batch workloads |
| IVF-PQ | IVF + Product Quantization | Low | ~90% | Low | Memory-constrained, huge datasets |
| Flat | Brute force | High | 100% | High | Small datasets, exact search |
HNSW (Hierarchical Navigable Small World): - Graph-based: nodes = vectors, edges = nearest neighbors - Multi-layer: top layers are sparse, bottom layers are dense - Search: start from top, greedy descent to nearest neighbor
# HNSW parameters
hnsw_config = {
"M": 16, # Connections per node (higher = better recall, more memory)
"ef_construction": 200, # Build-time neighbor list size
"ef_search": 100, # Search-time neighbor list size
# Level multiplier ml = 1/ln(M), default for M=16: 1/ln(16) ~ 0.36
}
# Trade-offs:
# - Higher M: Better recall, more memory, slower build
# - Higher ef_construction: Better index quality, slower build
# - Higher ef_search: Better recall, slower queries
IVF (Inverted File Index):
- Partition vectors into clusters using k-means
- At query time: find nearest clusters, search only within them
- nlist = number of clusters
- nprobe = clusters to search (higher = better recall, slower)
# IVF parameters
ivf_config = {
"nlist": 1000, # Number of clusters (sqrt(N) is good starting point)
"nprobe": 10, # Clusters to search at query time
}
# Trade-offs:
# - Higher nlist: Faster search, more memory for centroids
# - Higher nprobe: Better recall, slower queries
Vector Database Comparison¶
| Database | Type | Index Types | Strengths | Weaknesses |
|---|---|---|---|---|
| Pinecone | Cloud-only | Proprietary | Zero ops, auto-scaling | Expensive, vendor lock-in |
| Milvus | Self-hosted/Cloud | HNSW, IVF, PQ, GPU | Open source, flexible | Complex setup |
| Weaviate | Self-hosted/Cloud | HNSW | GraphQL API, modules | Higher memory |
| Qdrant | Self-hosted/Cloud | HNSW | Rust-based, fast | Smaller community |
| pgvector | Postgres extension | IVF, HNSW | No new infra needed | Limited features |
| Chroma | Embedded | HNSW | Simple, Python-native | Not for production scale |
Python Examples¶
Qdrant (self-hosted/cloud):
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
client = QdrantClient(":memory:") # or host="localhost", port=6333
# Create collection
client.create_collection(
collection_name="documents",
vectors_config=VectorParams(size=384, distance=Distance.COSINE)
)
# Upsert vectors
points = [
PointStruct(id=i, vector=embeddings[i], payload={"text": texts[i]})
for i in range(len(embeddings))
]
client.upsert(collection_name="documents", points=points)
# Query
results = client.search(
collection_name="documents",
query_vector=query_embedding,
limit=10,
score_threshold=0.7
)
Milvus (production scale):
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
connections.connect("default", host="localhost", port="19530")
# Define schema
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=768)
]
schema = CollectionSchema(fields, "document collection")
collection = Collection("documents", schema)
# Create HNSW index
index_params = {
"metric_type": "COSINE",
"index_type": "HNSW",
"params": {"M": 16, "efConstruction": 200}
}
collection.create_index("embedding", index_params)
# Search
collection.load()
results = collection.search(
data=[query_embedding],
anns_field="embedding",
param={"metric_type": "COSINE", "params": {"ef": 100}},
limit=10
)
Hybrid Search (Vector + Keyword)¶
Why hybrid? Pure vector search misses exact matches. BM25 misses semantics. Combine both.
Approach: 1. BM25 (keyword): Find documents with exact term matches 2. Vector (semantic): Find documents with similar meaning 3. RRF (Reciprocal Rank Fusion): Combine rankings
import numpy as np
from rank_bm25 import BM25Okapi
def hybrid_search(query, vector_db, documents, k=10, rrf_k=60):
"""
Uses Reciprocal Rank Fusion (RRF) to combine vector and BM25 results.
rrf_k: RRF constant (default 60), higher = more uniform weighting
"""
# 1. Vector search
query_embedding = embed(query)
vector_results = vector_db.search(query_embedding, limit=k*2)
vector_scores = {r.id: r.score for r in vector_results}
# 2. BM25 keyword search
tokenized_docs = [doc.split() for doc in documents]
bm25 = BM25Okapi(tokenized_docs)
bm25_scores = bm25.get_scores(query.split())
# 3. Combine with RRF
def rrf_score(rank):
return 1 / (rrf_k + rank)
# Get rankings
vector_ranking = sorted(vector_scores.keys(), key=lambda x: vector_scores[x], reverse=True)
bm25_ranking = sorted(range(len(documents)), key=lambda x: bm25_scores[x], reverse=True)
# RRF fusion
combined_scores = {}
for rank, doc_id in enumerate(vector_ranking):
combined_scores[doc_id] = combined_scores.get(doc_id, 0) + rrf_score(rank)
for rank, doc_id in enumerate(bm25_ranking):
combined_scores[doc_id] = combined_scores.get(doc_id, 0) + rrf_score(rank)
# Return top k
return sorted(combined_scores.keys(), key=lambda x: combined_scores[x], reverse=True)[:k]
Index Refresh Strategies¶
Challenge: How to update index when new documents arrive?
| Strategy | Description | Use Case |
|---|---|---|
| Full rebuild | Rebuild entire index periodically | Small datasets, batch updates |
| Incremental | Add new vectors to existing index | Real-time, small updates |
| Dual index | Maintain old + new index, switch atomically | Zero-downtime updates |
Best practices: - HNSW: Supports incremental updates but quality degrades. Rebuild periodically. - IVF: Requires full rebuild. Use dual-index strategy for zero downtime. - Batch updates: Collect updates, apply in bulk every N minutes/hours.
# Dual index pattern
class DualIndexManager:
def __init__(self):
self.current_index = create_index()
self.next_index = None
def search(self, query_vector):
return self.current_index.search(query_vector)
def start_rebuild(self, all_vectors):
self.next_index = create_index()
self.next_index.add(all_vectors)
def swap_index(self):
if self.next_index:
old_index = self.current_index
self.current_index = self.next_index
self.next_index = None
del old_index # Cleanup
Interview Questions¶
Q: When would you use HNSW vs IVF?
HNSW for real-time search with high recall requirements (recommendations, chatbots). Lower latency (~1-5ms) but higher memory (~2-4x vector size). IVF for large-scale batch workloads or memory-constrained environments. IVF-PQ for billion-scale vectors where memory is the bottleneck. Trade-off: HNSW = speed + recall + memory, IVF-PQ = memory efficiency + lower recall.
Q: What is hybrid search and why is it important for RAG?
Hybrid search combines vector similarity (semantic) with keyword matching (BM25). Pure vector search can miss exact matches (product names, acronyms). Pure keyword search misses semantic similarity. Hybrid gives best of both: vector finds "laptop" when querying "computer", BM25 finds "GPT-4" exactly. Production RAG systems almost always use hybrid + reranking.
Q: How do you handle index updates in production?
Three strategies: (1) Incremental: Add vectors to existing HNSW index in real-time (quality degrades over time), (2) Periodic rebuild: Rebuild entire index every N hours (brief downtime or use dual index), (3) Dual index: Build new index in background, atomic swap when ready. Choice depends on update frequency, latency requirements, and acceptable staleness.
Q: Pinecone vs Milvus vs pgvector — how do you choose?
Pinecone: Zero ops, fastest to production, expensive at scale, vendor lock-in. Milvus: Full control, complex setup, open source, best for large scale. pgvector: No new infrastructure, limited features, good for adding vector search to existing Postgres app. Start with pgvector if already using Postgres, Pinecone for quick prototype, Milvus/Qdrant for production scale with cost control.
Q: What is RRF (Reciprocal Rank Fusion) in hybrid search?
RRF combines rankings from multiple retrievers by scoring each document as sum of 1/(k + rank). Formula: \(RRF(d) = \sum_{r \in R} \frac{1}{k + rank_r(d)}\). Intuition: Being ranked #1 by any retriever gives significant boost, but multiple retrievers agreeing gives even more. k=60 is common default. Simple, parameter-free, often beats weighted score combination.
16. Cost Optimization for ML Inference¶
Источники: Lambda Labs GPU Pricing (2026), Neptune.ai "Inference Cost Optimization" (Jan 2026), Runhouse "LLM Cost Optimization" (2025)
The Problem: Inference Dominates Costs¶
Key Insight: - Inference costs = 15-20x training costs for production models - LLM inference costs declined 10x annually (2023-2025) but remain significant - GPU utilization in most orgs: 10-30% (70-90% waste)
Cost Breakdown¶
| Component | % of Total | Optimization Potential |
|---|---|---|
| GPU compute | 60-70% | High (spot, batching, quantization) |
| Memory/Storage | 15-20% | Medium (model compression, caching) |
| Network | 5-10% | Low (region selection) |
| Orchestration | 5-10% | Low (right-sizing) |
Strategy 1: GPU Utilization¶
Problem: GPUs idle most of the time.
Solutions:
# 1. Request Batching (critical for GPUs)
from transformers import pipeline
# BAD: Sequential requests
for text in texts:
result = pipe(text) # GPU sits idle between requests
# GOOD: Batched requests
results = pipe(texts, batch_size=32) # Full GPU utilization
# 2. Dynamic Batching (Triton Inference Server)
# Accumulate requests until batch_size or timeout
config = {
"max_batch_size": 32,
"batch_timeout_micros": 5000, # 5ms max wait
}
GPU Utilization Metrics:
import pynvml
def get_gpu_utilization():
pynvml.nvmlInit()
handle = pynvml.nvmlDeviceGetHandleByIndex(0)
util = pynvml.nvmlDeviceGetUtilizationRates(handle)
return {
"gpu": util.gpu, # GPU compute %
"memory": util.memory # GPU memory %
}
# Target: GPU utilization > 70%, Memory > 80%
Strategy 2: Spot Instances¶
What: Spare cloud capacity at 60-90% discount.
| Provider | Spot Discount | Use Case |
|---|---|---|
| AWS Spot | 70-90% | Training, batch inference |
| GCP Preemptible | 60-80% | Training jobs |
| Lambda Labs | 50-70% | GPU-intensive workloads |
| Azure Spot | 60-80% | Stateful + stateless |
Spot Instance Strategy:
# Spot instance with checkpointing
import torch
class SpotAwareTrainer:
def __init__(self, checkpoint_interval=100):
self.checkpoint_interval = checkpoint_interval
def train_step(self, batch_idx):
# Check for preemption signal
if self.check_preemption():
self.save_checkpoint("emergency")
return False
if batch_idx % self.checkpoint_interval == 0:
self.save_checkpoint(f"step_{batch_idx}")
# ... training logic
return True
def check_preemption(self):
# AWS: http://169.254.169.254/latest/meta-data/spot/termination-time
# GCP: http://metadata.google.internal/computeMetadata/v1/instance/preempted
import requests
try:
resp = requests.get(
"http://169.254.169.254/latest/meta-data/spot/termination-time",
timeout=1
)
return resp.status_code == 200
except:
return False
When to Use Spot: - ✅ Training jobs (checkpointable) - ✅ Batch inference (resumable) - ✅ Data preprocessing - ❌ Real-time inference (need guaranteed capacity) - ❌ Distributed training with tight coupling
Strategy 3: Model Right-Sizing¶
Match Model to Task:
| Task Type | Recommended Model | Cost/1M tokens |
|---|---|---|
| Simple classification | DistilBERT, RoBERTa-tiny | $0.01-0.05 |
| Summarization | T5-small, BART-base | $0.05-0.10 |
| RAG embeddings | all-MiniLM-L6-v2 | $0.01 |
| Complex reasoning | Llama-3-8B, Mistral-7B | $0.20-0.50 |
| Enterprise grade | GPT-4, Claude Opus | $15-30 |
Decision Framework:
def select_model(task_complexity, latency_requirement, budget):
"""
Select optimal model based on constraints.
task_complexity: 1-5 (1=simple, 5=very complex)
latency_requirement: latency in ms
budget: cost per 1M tokens
"""
if task_complexity <= 2 and latency_requirement < 50:
return "distilbert-base" # $0.01/1M
elif task_complexity <= 3 and budget < 1:
return "llama-3-8b" # $0.20/1M
elif task_complexity >= 4:
return "gpt-4" if budget > 10 else "llama-3-70b"
else:
return "mistral-7b" # Default middle ground
Strategy 4: Inference Cost Per Prediction¶
Metrics to Track:
# Cost per prediction formula
cost_per_prediction = (
gpu_hourly_cost * inference_time_seconds / 3600
) / batch_size
# Example calculation
# A100 GPU: $2.50/hour
# Inference time: 100ms
# Batch size: 1
cost_per_prediction = 2.50 * 0.1 / 3600 / 1 # = $0.000069 ≈ $0.07/1000
# With batching (batch_size=32)
cost_per_prediction = 2.50 * 0.1 / 3600 / 32 # = $0.000002 ≈ $0.002/1000
LLM Token Cost Calculator:
def calculate_llm_cost(
input_tokens: int,
output_tokens: int,
model: str
) -> float:
"""Calculate cost for LLM API call."""
pricing = {
"gpt-4-turbo": {"input": 0.01, "output": 0.03}, # per 1K tokens
"gpt-3.5-turbo": {"input": 0.0005, "output": 0.0015},
"claude-3-opus": {"input": 0.015, "output": 0.075},
"claude-3-sonnet": {"input": 0.003, "output": 0.015},
"llama-3-8b-self-hosted": {"input": 0.0002, "output": 0.0002},
}
rates = pricing.get(model, pricing["llama-3-8b-self-hosted"])
return (
input_tokens * rates["input"] / 1000 +
output_tokens * rates["output"] / 1000
)
# Example: 500 input + 200 output tokens with GPT-4
cost = calculate_llm_cost(500, 200, "gpt-4-turbo") # = $0.011
Strategy 5: Caching¶
Semantic Caching for LLMs:
import hashlib
from sentence_transformers import SentenceTransformer
import numpy as np
class SemanticCache:
def __init__(self, similarity_threshold=0.95):
self.encoder = SentenceTransformer('all-MiniLM-L6-v2')
self.cache = {} # embedding -> response
self.threshold = similarity_threshold
def get(self, query: str):
query_emb = self.encoder.encode(query)
for cached_emb, (response, count) in self.cache.items():
similarity = np.dot(query_emb, cached_emb) / (
np.linalg.norm(query_emb) * np.linalg.norm(cached_emb)
)
if similarity > self.threshold:
count += 1 # Track cache hit
return response, True
return None, False
def set(self, query: str, response: str):
query_emb = self.encoder.encode(query)
self.cache[tuple(query_emb)] = (response, 1)
# Usage
cache = SemanticCache()
def query_llm(prompt: str) -> str:
cached, hit = cache.get(prompt)
if hit:
return cached # Skip expensive API call
response = call_llm_api(prompt)
cache.set(prompt, response)
return response
Strategy 6: Auto-Scaling¶
# Kubernetes HPA for inference
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: llm-inference-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: llm-inference
minReplicas: 1
maxReplicas: 10
metrics:
- type: Resource
resource:
name: nvidia.com/gpu-utilization
target:
type: Utilization
averageUtilization: 70 # Scale up at 70% GPU util
- type: External
external:
metric:
name: queue_length
target:
type: AverageValue
averageValue: 100 # Scale up when queue > 100
Cost Optimization Decision Matrix¶
| Technique | Effort | Savings | When to Use |
|---|---|---|---|
| Batching | Low | 2-5x | Always |
| Spot instances | Low | 60-80% | Non-critical workloads |
| Model quantization | Medium | 2-4x | Latency-sensitive |
| Model distillation | High | 5-10x | High volume, fixed task |
| Semantic caching | Medium | 10-30% | Repetitive queries |
| Right-sizing | Low | 2-5x | Over-provisioned |
Interview Questions¶
Q: How do you reduce inference costs by 70%?
(1) Batching: Dynamic batching increases GPU utilization from 20% to 80% (4x improvement). (2) Quantization: INT8 quantization halves memory and compute (2x). (3) Model selection: Use smaller models when possible (2-5x). Combined: 4 * 2 * 2 = 16x cost reduction, exceeding 70%.
Q: When should you use spot instances for ML?
Use spot for fault-tolerant workloads: training jobs (with checkpointing), batch inference, data preprocessing, hyperparameter tuning. Avoid for real-time inference where latency SLA matters, or tightly coupled distributed training without checkpointing. Always implement preemption detection and graceful shutdown.
Q: How do you calculate cost per prediction?
Cost/prediction = (GPU_hourly_cost * inference_time / 3600) / batch_size. For A100 ($2.50/hr), 100ms inference, batch=1: $0.000069/prediction. With batch=32: $0.000002/prediction. Track this metric over time to catch cost regressions.
Q: What is semantic caching and when is it useful?
Semantic caching stores LLM responses keyed by meaning rather than exact string match. Uses embedding similarity (>0.95) to identify semantically equivalent queries. Useful when users ask similar questions differently ("What is Python?" vs "Explain Python"). Typically achieves 10-30% cache hit rate in production, directly reducing API costs.
17. Multi-Model Serving and Model Routing¶
Источники: TrueFoundry "LLM Load Balancing" (2025), LogRocket "LLM Routing in Production" (2026), arXiv "Universal Model Routing" (2025)
The Multi-Model Problem¶
Why Multi-Model? - Different tasks need different capabilities - Cost optimization: small models for simple tasks - Redundancy: avoid single point of failure - A/B testing: compare model versions
Key Insight: Most production systems use 5-10 models simultaneously, with intelligent routing between them.
Routing Strategies Comparison¶
| Strategy | Description | Use Case | Complexity |
|---|---|---|---|
| Weighted Round-Robin | Static % split per model | A/B testing, canary | Low |
| Latency-Based | Route to fastest responding | Real-time apps | Medium |
| Cost-Aware | Cheap models for simple tasks | Cost optimization | Medium |
| Confidence-Based | Try cheap, escalate if uncertain | Classification | High |
| Cascade | Sequential fallback chain | Resilience | Low |
| ML-Based Router | Learned routing policy | Production scale | High |
Strategy 1: Weighted Round-Robin¶
import random
from dataclasses import dataclass
from typing import List
@dataclass
class ModelEndpoint:
name: str
url: str
weight: float # 0.0 to 1.0
class WeightedRouter:
def __init__(self, endpoints: List[ModelEndpoint]):
self.endpoints = endpoints
# Normalize weights
total = sum(e.weight for e in endpoints)
for e in endpoints:
e.weight /= total
def route(self) -> ModelEndpoint:
"""Select endpoint based on weight distribution."""
r = random.random()
cumulative = 0.0
for endpoint in self.endpoints:
cumulative += endpoint.weight
if r <= cumulative:
return endpoint
return self.endpoints[-1] # Fallback
# Usage: 80% to GPT-4, 20% to Claude
router = WeightedRouter([
ModelEndpoint("gpt-4", "https://api.openai.com/v1/chat", 0.8),
ModelEndpoint("claude", "https://api.anthropic.com/v1/messages", 0.2),
])
endpoint = router.route()
Strategy 2: Confidence-Based Routing (Cascade)¶
from dataclasses import dataclass
@dataclass
class ModelResponse:
content: str
confidence: float
model: str
class CascadeRouter:
"""Try cheap model first, escalate to expensive if uncertain."""
def __init__(self, cheap_model, expensive_model, threshold=0.85):
self.cheap = cheap_model
self.expensive = expensive_model
self.threshold = threshold
async def route(self, prompt: str) -> ModelResponse:
# Try cheap model first
response = await self.cheap.generate(prompt)
if response.confidence >= self.threshold:
# Good enough, return it
return response
# Low confidence, escalate to expensive model
return await self.expensive.generate(prompt)
# Example setup
cascade = CascadeRouter(
cheap_model=LLMClient("llama-3-8b"),
expensive_model=LLMClient("gpt-4"),
threshold=0.85
)
# If 70% of requests are confident, saves 70% of expensive calls
Strategy 3: Latency-Based Routing¶
import time
from collections import deque
from dataclasses import dataclass
from typing import Dict, Deque
@dataclass
class LatencyStats:
samples: Deque[float]
avg_latency: float
class LatencyAwareRouter:
"""Route to model with best recent latency."""
def __init__(self, models: Dict[str, str], window_size=100):
self.models = models
self.stats = {
name: LatencyStats(deque(maxlen=window_size), 0.0)
for name in models
}
def record_latency(self, model: str, latency_ms: float):
"""Update rolling average latency."""
stats = self.stats[model]
stats.samples.append(latency_ms)
stats.avg_latency = sum(stats.samples) / len(stats.samples)
def route(self, fairness_factor: float = 1.2) -> str:
"""
Select model with best latency, with fairness band.
fairness_factor: choose any model within X% of fastest
"""
best_model = None
best_latency = float('inf')
for name, stats in self.stats.items():
if stats.avg_latency < best_latency:
best_latency = stats.avg_latency
best_model = name
# Find all models within fairness band
threshold = best_latency * fairness_factor
candidates = [
name for name, stats in self.stats.items()
if stats.avg_latency <= threshold
]
# Random choice among candidates (load balancing)
import random
return random.choice(candidates)
Strategy 4: Fallback Chain (Circuit Breaker)¶
from enum import Enum
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import List, Optional
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject traffic
HALF_OPEN = "half_open" # Testing recovery
@dataclass
class CircuitBreaker:
model: str
state: CircuitState = CircuitState.CLOSED
failure_count: int = 0
last_failure: Optional[datetime] = None
threshold: int = 5
cooldown_minutes: int = 5
def record_success(self):
self.failure_count = 0
self.state = CircuitState.CLOSED
def record_failure(self):
self.failure_count += 1
self.last_failure = datetime.now()
if self.failure_count >= self.threshold:
self.state = CircuitState.OPEN
def should_try(self) -> bool:
"""Check if we should attempt this model."""
if self.state == CircuitState.CLOSED:
return True
if self.state == CircuitState.OPEN:
# Check if cooldown passed
if datetime.now() - self.last_failure > timedelta(minutes=self.cooldown_minutes):
self.state = CircuitState.HALF_OPEN
return True
return False
# HALF_OPEN: allow one test request
return True
class FallbackRouter:
"""Chain of models with circuit breakers."""
def __init__(self, models: List[str]):
self.circuits = {m: CircuitBreaker(m) for m in models}
self.order = models
async def route(self, prompt: str) -> str:
"""Try models in order, skip failing ones."""
for model in self.order:
circuit = self.circuits[model]
if not circuit.should_try():
continue # Model is in cooldown
try:
response = await self.call_model(model, prompt)
circuit.record_success()
return response
except Exception as e:
circuit.record_failure()
continue
raise Exception("All models failed")
async def call_model(self, model: str, prompt: str) -> str:
# Implementation depends on your model client
pass
Strategy 5: A/B Testing Between Models¶
import hashlib
from dataclasses import dataclass
from typing import Dict
@dataclass
class ABTest:
experiment_id: str
variants: Dict[str, float] # variant_name -> traffic %
class ABTestRouter:
"""A/B testing with consistent user assignment."""
def __init__(self, experiments: Dict[str, ABTest]):
self.experiments = experiments
def get_variant(self, experiment_id: str, user_id: str) -> str:
"""
Deterministically assign user to variant.
Same user always gets same variant.
"""
exp = self.experiments[experiment_id]
# Hash user_id for consistent assignment
hash_input = f"{experiment_id}:{user_id}"
hash_value = int(hashlib.md5(hash_input.encode()).hexdigest(), 16)
bucket = (hash_value % 100) / 100
# Assign to variant based on bucket
cumulative = 0.0
for variant, traffic in sorted(exp.variants.items()):
cumulative += traffic
if bucket < cumulative:
return variant
return list(exp.variants.keys())[0]
def route(self, experiment_id: str, user_id: str) -> str:
variant = self.get_variant(experiment_id, user_id)
# Map variant to model
variant_model_map = {
"control": "gpt-4-turbo",
"treatment_a": "gpt-4o",
"treatment_b": "claude-3-opus",
}
return variant_model_map[variant]
# Usage
ab_router = ABTestRouter({
"chatbot_v2": ABTest(
experiment_id="chatbot_v2",
variants={"control": 0.5, "treatment_a": 0.25, "treatment_b": 0.25}
)
})
# User 123 always gets same variant
model = ab_router.route("chatbot_v2", user_id="123")
Model Router Decision Matrix¶
| Scenario | Recommended Strategy | Why |
|---|---|---|
| A/B testing new model | Weighted Round-Robin | Simple, deterministic split |
| Cost optimization | Confidence-Based | Cheap first, expensive only if needed |
| Latency SLA | Latency-Based | Always fastest available |
| High availability | Fallback Chain + Circuit Breaker | Graceful degradation |
| Multi-tenant | Metadata-based routing | Per-tenant model pools |
Interview Questions¶
Q: What is cascade routing and when would you use it?
Cascade routing tries a cheap model first, then escalates to an expensive model if the cheap model's confidence is below threshold. Use it for classification tasks where models can return confidence scores. If 70% of requests are handled confidently by a cheap model, you reduce expensive model usage by 70% while maintaining quality. The overhead is the initial cheap request for the remaining 30%.
Q: How do you implement A/B testing between ML models?
Use deterministic hashing based on user_id to assign users to variants. Hash(user_id + experiment_id) mod 100 gives consistent bucket assignment — same user always sees same variant. Track metrics per variant (latency, quality, cost). Statistical significance requires enough samples per variant (typically 1000+ conversions).
Q: What is a circuit breaker and why is it important for multi-model serving?
Circuit breaker tracks failures per model and temporarily stops sending traffic to failing models. After N consecutive failures, circuit opens and rejects all traffic for a cooldown period (typically 5 minutes). Then transitions to half-open state for a test request. This prevents cascading failures and allows graceful degradation when providers have outages.
Q: How do you choose between latency-based vs cost-aware routing?
Latency-based for interactive applications (chat, autocomplete) where user experience depends on response time. Route to fastest responding endpoint. Cost-aware for batch processing or when margins matter. Classify request complexity and route simple requests to cheap models. Can combine both: cost-aware for classification, latency-based as tiebreaker.
18. Data Quality for ML in Production¶
Источники: Uplatz "Data Validation and Quality in MLOps" (Nov 2025), ML Journey "Data Lineage Tracking" (Sep 2025), Gartner Data Quality Report (2025)
The Data Quality Problem¶
Key Statistics: - 60% of ML project failures attributed to poor data quality - $12.9M average annual cost of poor data quality per organization - 80% of ML project time spent on data preparation
GIGO Principle: Models precisely learn the noise and errors in training data.
Data Quality Dimensions for ML¶
| Dimension | Definition | ML Impact |
|---|---|---|
| Accuracy | Data conforms to reality | Incorrect patterns learned |
| Completeness | Absence of missing values | Biased models, discarded records |
| Consistency | Standard format/structure | Fragmented categories, diluted signals |
| Timeliness | Data is up-to-date | Stale models, irrelevant predictions |
| Relevance | Data applies to the problem | Noise, computational waste |
| Uniqueness | No duplicate records | Inflated importance of certain instances |
| Validity | Conforms to business rules | Type errors, out-of-range values |
Validation Types by Pipeline Stage¶
| Stage | Validation Focus | Key Checks |
|---|---|---|
| Ingestion | Raw data integrity | Schema, freshness, row counts, source integrity |
| Preparation | Cleanliness | Missing values, duplicates, outliers, format |
| Training | Fitness for model | Train-test split, feature validation, distribution |
| Production | Ongoing quality | Drift detection, skew detection, schema evolution |
Schema Validation with Great Expectations¶
import great_expectations as gx
from great_expectations.dataset import PandasDataset
# Define expectations (data contract)
def create_expectations(df: PandasDataset) -> PandasDataset:
# Schema checks
df.expect_column_to_exist("user_id")
df.expect_column_to_exist("timestamp")
df.expect_column_to_exist("features")
# Type checks
df.expect_column_values_to_be_of_type("user_id", "int")
df.expect_column_values_to_be_of_type("timestamp", "datetime64[ns]")
# Range checks
df.expect_column_values_to_be_between("age", min_value=0, max_value=120)
df.expect_column_values_to_be_between("score", min_value=0.0, max_value=1.0)
# Completeness checks
df.expect_column_values_to_not_be_null("user_id")
df.expect_column_values_to_not_be_null("timestamp")
# Uniqueness checks
df.expect_column_values_to_be_unique("user_id")
# Categorical checks
df.expect_column_values_to_be_in_set("status", ["active", "inactive", "pending"])
return df
# Validate data
validation_result = df.validate()
if not validation_result.success:
print(f"Validation failed: {validation_result.statistics}")
# Halt pipeline or raise alert
TensorFlow Data Validation (TFDV)¶
import tensorflow_data_validation as tfdv
# Generate statistics from training data
train_stats = tfdv.generate_statistics_from_csv(train_data_path)
# Infer schema from statistics
schema = tfdv.infer_schema(train_stats)
# Customize schema constraints
tfdv.set_domain(schema, 'age', min=0, max=120)
tfdv.set_domain(schema, 'category', ['A', 'B', 'C'])
# Validate new data against schema
validation_result = tfdv.validate_statistics(
statistics=new_data_stats,
schema=schema
)
# Check for anomalies
if validation_result.anomaly_info:
for feature, anomaly in validation_result.anomaly_info.items():
print(f"Anomaly in {feature}: {anomaly.description}")
# Detect drift between training and serving
tfdv.visualize_statistics(
lhs_statistics=train_stats,
rhs_statistics=serving_stats,
lhs_name='Training',
rhs_name='Serving'
)
Schema Evolution Strategies¶
| Strategy | Description | When to Use |
|---|---|---|
| Additive | Add new optional columns | Non-breaking changes |
| Backward Compatible | New code reads old data | Gradual rollout |
| Forward Compatible | Old code reads new data | Multi-version serving |
| Dual-Write | Write to old + new schemas | Migration period |
class SchemaEvolution:
"""Handle schema versioning in ML pipelines."""
def __init__(self, current_version: str):
self.version = current_version
self.schemas = {
"v1": {"features": ["age", "income", "location"]},
"v2": {"features": ["age", "income", "location", "education", "tenure"]},
"v3": {"features": ["age", "income", "region", "education", "tenure", "segment"]},
}
def migrate(self, df, from_version: str, to_version: str):
"""Migrate data between schema versions."""
source_features = self.schemas[from_version]["features"]
target_features = self.schemas[to_version]["features"]
# Add missing columns with defaults
for feature in target_features:
if feature not in df.columns:
df[feature] = self.get_default(feature)
# Rename columns (e.g., location -> region)
if from_version == "v2" and to_version == "v3":
df = df.rename(columns={"location": "region"})
return df[target_features]
def get_default(self, feature: str):
defaults = {
"education": "unknown",
"tenure": 0,
"segment": "default"
}
return defaults.get(feature, None)
Data Lineage Tracking¶
from dataclasses import dataclass
from datetime import datetime
from typing import List, Dict, Optional
import hashlib
@dataclass
class LineageNode:
"""A node in the data lineage graph."""
node_id: str
node_type: str # "source", "transform", "model", "prediction"
name: str
timestamp: datetime
metadata: Dict
inputs: List[str]
outputs: List[str]
class DataLineageTracker:
"""Track data lineage through ML pipeline."""
def __init__(self):
self.nodes: Dict[str, LineageNode] = {}
self.edges: List[tuple] = []
def track_source(self, name: str, source_path: str, **metadata):
"""Track data source ingestion."""
node_id = self._generate_id(f"source:{name}")
node = LineageNode(
node_id=node_id,
node_type="source",
name=name,
timestamp=datetime.now(),
metadata={"path": source_path, **metadata},
inputs=[],
outputs=[]
)
self.nodes[node_id] = node
return node_id
def track_transform(
self,
name: str,
input_ids: List[str],
transform_code: str,
**metadata
):
"""Track data transformation."""
node_id = self._generate_id(f"transform:{name}")
node = LineageNode(
node_id=node_id,
node_type="transform",
name=name,
timestamp=datetime.now(),
metadata={"code_hash": hashlib.md5(transform_code.encode()).hexdigest(), **metadata},
inputs=input_ids,
outputs=[]
)
self.nodes[node_id] = node
# Create edges
for input_id in input_ids:
self.edges.append((input_id, node_id))
return node_id
def track_model(self, name: str, training_data_id: str, model_params: Dict):
"""Track model training."""
node_id = self._generate_id(f"model:{name}")
node = LineageNode(
node_id=node_id,
node_type="model",
name=name,
timestamp=datetime.now(),
metadata=model_params,
inputs=[training_data_id],
outputs=[]
)
self.nodes[node_id] = node
self.edges.append((training_data_id, node_id))
return node_id
def get_lineage(self, node_id: str) -> Dict:
"""Get full lineage for a node (upstream and downstream)."""
upstream = self._trace_upstream(node_id)
downstream = self._trace_downstream(node_id)
return {
"node": self.nodes[node_id],
"upstream": [self.nodes[n] for n in upstream],
"downstream": [self.nodes[n] for n in downstream]
}
def _generate_id(self, base: str) -> str:
return hashlib.md5(f"{base}:{datetime.now().isoformat()}".encode()).hexdigest()[:12]
def _trace_upstream(self, node_id: str, visited: set = None) -> List[str]:
if visited is None:
visited = set()
if node_id in visited:
return []
visited.add(node_id)
node = self.nodes[node_id]
result = list(node.inputs)
for input_id in node.inputs:
result.extend(self._trace_upstream(input_id, visited))
return result
def _trace_downstream(self, node_id: str, visited: set = None) -> List[str]:
if visited is None:
visited = set()
if node_id in visited:
return []
visited.add(node_id)
result = []
for src, dst in self.edges:
if src == node_id:
result.append(dst)
result.extend(self._trace_downstream(dst, visited))
return result
# Usage
lineage = DataLineageTracker()
# Track pipeline
source_id = lineage.track_source("user_data", "/data/users.parquet", rows=100000)
features_id = lineage.track_transform("feature_engineering", [source_id], "features.py")
model_id = lineage.track_model("churn_model", features_id, {"algorithm": "xgboost", "auc": 0.85})
# Query lineage
print(lineage.get_lineage(model_id))
Data Quality Tools Comparison¶
| Tool | Strengths | Use Case |
|---|---|---|
| Great Expectations | Rich expectations, docs, integrations | General validation |
| TFDV | TensorFlow ecosystem, drift detection | TF/TFX pipelines |
| Deepchecks | ML-specific checks, model validation | Model quality |
| Pandera | Type-safe schemas, DataFrame contracts | Python validation |
| Evidently AI | Drift dashboards, reports | Monitoring |
Interview Questions¶
Q: What are the key dimensions of data quality for ML?
Accuracy (no factual errors), Completeness (no missing values), Consistency (standard formats), Timeliness (fresh data), Relevance (applicable features), Uniqueness (no duplicates), Validity (conforms to rules). Each dimension directly impacts model behavior - incomplete data causes bias, stale data causes drift, inconsistent data fragments categories.
Q: How do you handle schema evolution in production ML?
Three strategies: (1) Additive: Only add optional columns (backward compatible). (2) Dual-write: Write to both old and new schemas during migration. (3) Version-based routing: Route requests based on schema version metadata. Always maintain schema registry and validation at pipeline entry points. Test with both schemas before deprecating old.
Q: What is data lineage and why is it important?
Data lineage tracks the complete path of data through ML pipelines - from source ingestion through transformations to model outputs. Critical for: (1) Debugging - trace root cause of failures, (2) Compliance - GDPR/CCPA data subject requests, (3) Reproducibility - recreate experiments with exact data versions, (4) Impact analysis - understand downstream effects of upstream changes. Implement with graph-based lineage models and metadata catalogs.
Q: How do you detect data drift vs training-serving skew?
Data drift: Statistical distribution of production data shifts from training baseline (e.g., user demographics change). Detect with KS test, PSI, or divergence metrics. Training-serving skew: Same data processed differently in training vs inference (e.g., different preprocessing code). Detect by comparing feature statistics between training pipeline and serving endpoint. Drift requires retraining; skew requires code fix.
19. Foundation Models in Production¶
Источники: Collabnix "Multi-Tenant LLM Platform" (2025), Lilian Li "Prompt Caching" (2026), Zylos Research "LLM Caching Strategies" (2025)
Multi-Tenant LLM Serving¶
Key Challenges: - Resource Isolation: GPU-intensive inference requires strict quotas to prevent noisy neighbors - Data Privacy: Tenant requests/responses must remain isolated - Cost Attribution: Track GPU usage, tokens, API calls per tenant - Performance SLAs: Different tenants need different latency guarantees - Model Versioning: Support different model versions per tenant
Multi-Tenant Architecture on Kubernetes¶
┌─────────────────────────────────────────────────────────────┐
│ API Gateway / Istio │
│ (Auth, Rate Limiting, Routing) │
└─────────────────────┬───────────────────────────────────────┘
│
┌──────────────┼──────────────┐
│ │ │
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ tenant-acme │ │ tenant-beta │ │ tenant-gamma │
│ Namespace │ │ Namespace │ │ Namespace │
│ │ │ │ │ │
│ ┌──────────┐ │ │ ┌──────────┐ │ │ ┌──────────┐ │
│ │ vLLM │ │ │ │ vLLM │ │ │ │ vLLM │ │
│ │ Llama-2 │ │ │ │ Mistral │ │ │ │ GPT-4 │ │
│ └──────────┘ │ │ └──────────┘ │ │ └──────────┘ │
│ │ │ │ │ │
│ Quota: 2GPU │ │ Quota: 4GPU │ │ Quota: 1GPU │
└──────────────┘ └──────────────┘ └──────────────┘
│
▼
┌──────────────────┐
│ Shared GPU Pool │
│ (A100/H100) │
└──────────────────┘
Tenant Resource Quotas (Kubernetes)¶
apiVersion: v1
kind: Namespace
metadata:
name: tenant-acme
labels:
tenant: acme
istio-injection: enabled
---
apiVersion: v1
kind: ResourceQuota
metadata:
name: tenant-acme-quota
namespace: tenant-acme
spec:
hard:
requests.cpu: "32"
requests.memory: 128Gi
requests.nvidia.com/gpu: "2"
limits.nvidia.com/gpu: "2"
Prompt Caching Economics¶
How It Works: Reuse Key-Value (KV) Cache from previous requests. When a model processes text, it generates KV tensors. Caching skips expensive computation for identical prefix text.
Pricing Comparison (2026):
| Provider | Discount | Duration | Strategy |
|---|---|---|---|
| OpenAI | 50% | Auto (24h) | Zero config, same price |
| Anthropic | 90% | 5min-1h | Explicit cache_control |
| 75% | Configurable | Vertex AI caching |
Decision Framework:
Step 1: Is prompt >= 1024 tokens?
NO → Skip caching (won't activate)
YES → Continue
Step 2: Expected reuses >= 3 times?
NO → Skip caching (not cost-effective)
YES → Continue
Step 3: Match strategy to time span:
Minutes → Anthropic ephemeral
Hours → OpenAI auto / Anthropic 1h
Days → Persistent cache + TTL
Prompt Caching Implementation¶
# OpenAI: Automatic caching (same price)
response = openai.chat.completions.create(
model="gpt-4-turbo",
messages=[
{"role": "system", "content": "Your 10k token system prompt..."},
{"role": "user", "content": "User query"}
]
# Caching happens automatically for identical prefixes
)
# Anthropic: Explicit caching (90% discount)
response = client.messages.create(
model="claude-3-sonnet",
system=[{
"type": "text",
"text": "Your 10k token system prompt...",
"cache_control": {"type": "ephemeral"} # 5min cache
}],
messages=[{"role": "user", "content": "User query"}]
)
# Multi-layer caching architecture
class MultiLayerCache:
"""L1: Exact match (Redis) → L2: Semantic (Vector) → L3: Provider cache"""
def __init__(self):
self.redis = redis.Redis() # L1: <10ms
self.vector_store = QdrantClient() # L2: 50-150ms
# L3: Provider's built-in caching
async def get_or_generate(self, prompt: str, generate_fn):
# L1: Exact match
cache_key = hashlib.sha256(prompt.encode()).hexdigest()
if cached := self.redis.get(cache_key):
return cached
# L2: Semantic similarity
similar = await self.find_similar(prompt, threshold=0.95)
if similar:
return similar.response
# L3: Generate with provider caching
response = await generate_fn(prompt)
# Cache for future
self.redis.setex(cache_key, 3600, response) # 1 hour TTL
return response
Structuring Prompts for Caching¶
# GOOD: Static prefix enables caching
SYSTEM_PROMPT = """
You are a helpful assistant with access to the following knowledge base:
[50k token knowledge base content here]
Always follow these rules:
1. Be concise
2. Cite sources
3. Ask clarifying questions if needed
"""
def query(user_question: str):
return client.messages.create(
model="claude-3-sonnet",
system=[{
"type": "text",
"text": SYSTEM_PROMPT, # Cached!
"cache_control": {"type": "ephemeral"}
}],
messages=[{"role": "user", "content": user_question}]
)
# BAD: Dynamic prefix breaks cache
def query_bad(user_id: str, timestamp: str, question: str):
return client.messages.create(
model="claude-3-sonnet",
messages=[{
"role": "user",
"content": f"[{timestamp}] User {user_id} asks: {question}"
# Cache misses every time!
}]
)
Token Economics and Cost Optimization¶
| Optimization | Savings | Effort | Use Case |
|---|---|---|---|
| Prompt caching | 50-90% | Low | Repeated prefixes |
| Model routing | 30-60% | Medium | Variable complexity |
| Semantic caching | 10-30% | Medium | Similar queries |
| Prompt compression | 20-50% | Low | Long contexts |
| Response streaming | Latency only | Low | User experience |
Fallback Strategies¶
class FallbackChain:
"""Graceful degradation when primary fails."""
def __init__(self):
self.providers = [
{"name": "openai", "model": "gpt-4", "priority": 1},
{"name": "anthropic", "model": "claude-3", "priority": 2},
{"name": "local", "model": "llama-3-70b", "priority": 3},
]
self.circuit_breakers = {p["name"]: CircuitBreaker() for p in self.providers}
async def generate(self, prompt: str) -> str:
for provider in sorted(self.providers, key=lambda x: x["priority"]):
cb = self.circuit_breakers[provider["name"]]
if not cb.should_try():
continue
try:
response = await self.call_provider(provider, prompt)
cb.record_success()
return response
except Exception as e:
cb.record_failure()
logging.warning(f"{provider['name']} failed: {e}")
continue
raise Exception("All providers failed")
Interview Questions¶
Q: What are the key challenges in multi-tenant LLM serving?
(1) Resource isolation: GPU-intensive inference needs strict quotas to prevent noisy neighbors. (2) Data privacy: Tenant requests/responses must stay isolated with zero cross-contamination. (3) Cost attribution: Track GPU usage, tokens, API calls per tenant for billing. (4) Performance SLAs: Different tenants need different latency guarantees. (5) Model versioning: Support different model versions or fine-tuned variants per tenant. Solution: Namespace-based isolation with Kubernetes ResourceQuotas, Istio for network policies, separate vLLM deployments per tenant.
Q: How does prompt caching work and when should you use it?
Prompt caching reuses Key-Value (KV) Cache from previous requests. When a model processes text, it generates KV tensors; caching skips this computation for identical prefixes. When to use: (1) Prompt >= 1024 tokens, (2) Expected reuses >= 3 times, (3) Static prefix structure. Pricing: OpenAI 50% auto, Anthropic 90% explicit. Critical limitation: Prefix-based - changing first character invalidates entire cache. Structure prompts with static prefix at top, dynamic content at bottom.
Q: How do you structure prompts for optimal caching?
Put static content at the beginning, dynamic content at the end. Cache is prefix-based, so
[Static 50k knowledge base] [Examples] [User query]works, but[Timestamp] [User ID] [Static content]breaks cache every request. Use system prompts for static content (knowledge bases, instructions), user messages for dynamic queries. Monitor hit rate:cache_read / (cache_read + cache_creation), target >70%.
Q: What's the fallback strategy when LLM providers fail?
Implement circuit breaker pattern with provider chain: (1) Primary provider (GPT-4/Claude) with circuit breaker, (2) Secondary provider as backup, (3) Local model as final fallback. Circuit breaker opens after N failures, closes after cooldown. Track error rates per provider, route around degraded services. Always have local inference as final fallback for zero-downtime SLAs.
20. AI Agents in Production¶
Источники: Iain Harper "Security for Production AI Agents in 2026", UiPath "10 Best Practices for Building Reliable AI Agents 2025", n8n "Best Practices for Deploying AI Agents", OWASP Top 10 for LLM Applications 2025
Understanding AI Agents¶
Agent Definition: Software that uses an LLM to autonomously perform tasks on behalf of users. Unlike chatbots (respond to questions), agents can take actions: browsing web, sending emails, querying databases, executing code, interacting with other systems.
Production Criteria (when enterprise-grade guardrails needed): - Autonomy: Execute actions with real-world consequences, operate with delegated authority, chain multiple tool calls - Data: Process untrusted external content, access sensitive systems/PII, operate across trust boundaries - Consequences: Errors are costly/difficult to reverse, regulatory/reputational risk, customer-facing
Threat Landscape (OWASP 2025)¶
| Risk | Description | Mitigation |
|---|---|---|
| LLM01: Prompt Injection | Manipulating model through malicious inputs | Defence-in-depth, input validation |
| LLM02: Sensitive Data Leakage | Exposing PII, financial details | Output filtering, PII detection |
| LLM05: Improper Output Handling | Insufficient validation | Schema validation with Pydantic |
| LLM06: Excessive Agency | Too much capability without controls | Tool allowlists, permission gating |
| LLM10: Unbounded Consumption | Resource exhaustion | Rate limits, budget controls |
Prompt Injection Stats: - Appears in 73%+ of production deployments - 5 carefully crafted documents can manipulate AI 90% of time (RAG poisoning) - CVE-2025-53773: GitHub Copilot RCE (CVSS 9.6)
Defence-in-Depth Architecture¶
Six Layers: 1. Input Sanitization - Clean/validate data before AI 2. Injection Detection - Identify manipulation attempts 3. Agent Execution - Control decisions/actions 4. Tool Call Interception - Review/approve actions before execution 5. Output Validation - Check responses before users 6. Observability & Audit - Monitor everything
# Tool Allowlist with Permission Gating
class ToolGatekeeper:
"""Principle of least privilege for agent tools."""
ALLOWED_TOOLS = {
"customer_service": ["get_order_status", "search_faq", "create_ticket"],
"research": ["web_search", "read_document", "summarize"],
# NO: "delete_data", "send_email", "make_payment"
}
def validate_tool_call(self, agent_role: str, tool_name: str, params: dict) -> bool:
if tool_name not in self.ALLOWED_TOOLS.get(agent_role, []):
logging.warning(f"Blocked unauthorized tool: {tool_name}")
return False
# Validate parameters against schema
return self._validate_params(tool_name, params)
def _validate_params(self, tool_name: str, params: dict) -> bool:
# Schema-based validation
schema = TOOL_SCHEMAS.get(tool_name, {})
# ... validate params match schema
return True
Human-in-the-Loop Patterns¶
When to Require Human Approval: - Irreversible operations (sending emails, payments, deleting data) - High-cost actions (API calls exceeding threshold, bulk operations) - Novel situations (significantly different from training) - Regulated domains (healthcare, financial, legal)
# LangGraph HITL with interrupt()
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph
def build_hitl_agent():
workflow = StateGraph(AgentState)
# ... add nodes ...
# Interrupt before high-risk action
workflow.add_node("request_approval", request_human_approval)
workflow.add_edge("agent_decides", "request_approval")
# After interrupt, resume with human input
checkpointer = MemorySaver()
app = workflow.compile(checkpointer=checkpointer, interrupt_before=["execute_action"])
return app
# Human approval workflow
async def human_approval_handler(thread_id: str, action: dict):
"""Human reviews and approves/rejects action."""
# Send to Slack/Email with action details
notification = format_action_request(action)
await send_to_approval_channel(notification)
# Wait for human response
response = await wait_for_approval(thread_id, timeout_minutes=30)
if response.approved:
# Resume workflow with approval
app.update_state(thread_id, {"approved": True, "approver": response.user})
else:
# Handle rejection
app.update_state(thread_id, {"rejected": True, "reason": response.reason})
LLM-as-Judge Evaluation¶
# Judge model evaluates agent outputs
from pydantic import BaseModel
class JudgeVerdict(BaseModel):
score: float # 0.0 - 1.0
reasoning: str
issues: list[str]
passed: bool
async def evaluate_with_judge(output: str, criteria: list[str]) -> JudgeVerdict:
"""Use separate model to evaluate agent output."""
judge_prompt = f"""
Evaluate this agent output against criteria.
Criteria: {criteria}
Agent Output: {output}
Provide:
1. Score (0.0-1.0)
2. Reasoning for score
3. List of specific issues found
4. Whether output passes quality threshold
"""
response = await judge_model.generate(judge_prompt, response_format=JudgeVerdict)
return JudgeVerdict(**response)
# Best practices from research:
# - Few-shot prompting with good/bad examples
# - Chain-of-thought reasoning before scoring
# - Separate judge model from generation model
# - Calibrate against human labels
Production Best Practices (10 Commandments)¶
1. Design Agents That Fail Safe - Start small with single-responsibility agents - Modularize into specialized agents for complex workflows - For deterministic tasks, use tools (not LLM reasoning)
2. Configure Context Properly - Index enterprise context (knowledge bases, docs) - Choose right model for task (GPT-5 for complex, Flash for simple) - Maintain clarity in tool definitions
3. Treat Every Capability as a Tool - Tools should have tight input/output contracts - Schema-driven prompts with validation - Build tools for deterministic operations (math, date comparison)
4. Write Prompts Like Product Specs - Define: Role, Instructions, Goal, Success metrics, Guardrails - Use structured, multi-step reasoning - Describe what SHOULD happen, not what shouldn't
5. Evaluate for Real World - Build robust evaluation datasets (30+ cases per agent) - Test breadth AND depth (accuracy, reasoning, tool-use) - End-to-end testing in full automation context
6. Build-in Safety, Governance, Compliance - Deploy via Orchestrator/Maestro for lifecycle management - Apply AI Trust Layer (PII redaction, audit logs, throttling) - Human-in-the-loop for high-risk decisions
7. Version on Purpose and Gate Releases - Version everything: prompts, tools, datasets, evaluations - Gate production release on evaluation thresholds - Attach evaluations to version tags
8. Design Trust-Building Conversations - Set clear expectations about capabilities - Confirm irreversible actions deterministically - Show context/reasoning where appropriate
9. Control Cost and Performance - Right-size model choice per task - Limit token use, cache stable responses - Batch low-risk calls, escalate only when necessary
10. Continuous Improvement - Trace and learn from execution logs - Human feedback loop into design updates - Scale incrementally after proving stability
Agent Observability with OpenTelemetry¶
# OpenTelemetry GenAI semantic conventions
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode
tracer = trace.get_tracer(__name__)
@tracer.start_as_current_span("agent.execution")
async def execute_agent_with_telemetry(agent, input_data):
span = trace.get_current_span()
# GenAI semantic conventions
span.set_attribute("gen_ai.system", "openai")
span.set_attribute("gen_ai.request.model", "gpt-4")
span.set_attribute("gen_ai.request.max_tokens", 4096)
try:
response = await agent.run(input_data)
# Track token usage
span.set_attribute("gen_ai.usage.input_tokens", response.usage.prompt_tokens)
span.set_attribute("gen_ai.usage.output_tokens", response.usage.completion_tokens)
span.set_attribute("gen_ai.response.finish_reason", response.finish_reason)
span.set_status(Status(StatusCode.OK))
return response
except Exception as e:
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
raise
Key Observability Metrics: - Token consumption per request/session - Guardrail trigger rates - Human escalation rates - Error rates by type - Response latency percentiles (p50, p95, p99) - Cost per task
Interview Questions¶
Q: What makes AI agents different from chatbots, and when do you need production guardrails?
Agents autonomously execute tasks (browse web, send emails, modify databases) vs chatbots just respond. Production guardrails needed when: (1) Autonomy: Real-world consequences, delegated authority, multiple tool chains; (2) Data: Untrusted external content, sensitive/PII access, cross-boundary operations; (3) Consequences: Costly errors, regulatory risk, customer-facing. Key insight: The gap between "works in demo" and "production at scale" is where projects die.
Q: What is the OWASP Top 10 for LLM Applications and which are most critical?
2025 Top 10: LLM01 (Prompt Injection - 73% of deployments), LLM02 (Data Leakage), LLM05 (Output Handling), LLM06 (Excessive Agency), LLM10 (Unbounded Consumption). Critical: Prompt injection (no universal solution, use defence-in-depth), Excessive Agency (limit tools with allowlists). Unlike SQL injection (solved by parameterized queries), prompt injection may be inherent to LLMs - assume some attacks will succeed, limit blast radius.
Q: Explain defence-in-depth for AI agents.
Six layers: (1) Input Sanitization - clean data before AI; (2) Injection Detection - identify manipulation; (3) Agent Execution - control decisions; (4) Tool Interception - approve actions before execution; (5) Output Validation - check responses; (6) Observability - monitor everything. No single guardrail is sufficient - deterministic validators + LLM evaluation + human oversight + comprehensive logging. Each layer catches different failure categories.
Q: When should you use human-in-the-loop for agents?
Require human approval for: (1) Irreversible operations - sending emails, payments, deleting data; (2) High-cost actions - API calls exceeding threshold, bulk operations; (3) Novel situations - significantly different from training scenarios; (4) Regulated domains - healthcare, financial, legal decisions. Implementation: LangGraph
interrupt()for pause/resume, notification channels (Slack/Email), timeout handling, escalation workflows. Monitor escalation rates - high rates indicate agent needs improvement.
21. Model Compression (Knowledge Distillation, Pruning, Edge Deployment)¶
Как сжать модель в 10x без потери качества для edge/mobile deployment
Почему Model Compression важен¶
Проблема: LLM растут до триллионов параметров, но edge устройства ограничены. Решение: Compression techniques - pruning, distillation, quantization.
Цифры 2025: - 65% mobile ML моделей используют pruning (MLPerf 2025) - DistilBERT: 40% smaller, 60% faster, 97% accuracy retention - Pruning + quantization stack: 2x compression vs distillation alone
Knowledge Distillation¶
Concept: Teacher-student framework - компактная модель учится у большой.
Temperature Scaling¶
Проблема: Softmax outputs слишком "peaked" - [0.95, 0.02, 0.01, 0.01, 0.01]
Решение: Temperature T > 1 смягчает распределение:
С T=5: [0.4, 0.2, 0.15, 0.15, 0.1] - студент учит class relationships!
Distillation Loss¶
import torch
import torch.nn.functional as F
def distillation_loss(student_logits, teacher_logits, labels,
temperature=4.0, alpha=0.5):
"""
Combined loss for knowledge distillation.
L_total = α * CE(y_true, y_student) + β * KL(p_teacher^T || p_student^T)
"""
# Hard loss: Cross-entropy with ground truth
hard_loss = F.cross_entropy(student_logits, labels)
# Soft loss: KL divergence between teacher and student
soft_teacher = F.softmax(teacher_logits / temperature, dim=1)
soft_student = F.log_softmax(student_logits / temperature, dim=1)
# Scale by T^2 to match gradient magnitudes
soft_loss = F.kl_div(soft_student, soft_teacher, reduction='batchmean')
soft_loss *= (temperature ** 2)
# Combined
return alpha * hard_loss + (1 - alpha) * soft_loss
Types of Distillation¶
| Type | What's Transferred | Best For |
|---|---|---|
| Response-Based | Output logits | Classification |
| Feature-Based | Intermediate activations | CNNs, Vision |
| Attention-Based | Attention maps | Transformers (DistilBERT) |
| Multi-Teacher | Ensemble outputs | Critical accuracy |
Complete Training Loop¶
def train_with_distillation(teacher, student, train_loader, epochs=10,
temperature=4.0, alpha=0.5, lr=1e-4):
"""
Train student model with knowledge distillation.
"""
teacher.eval() # Freeze teacher
optimizer = torch.optim.Adam(student.parameters(), lr=lr)
for epoch in range(epochs):
student.train()
total_loss = 0
for inputs, labels in train_loader:
inputs, labels = inputs.to(device), labels.to(device)
with torch.no_grad():
teacher_logits = teacher(inputs)
student_logits = student(inputs)
loss = distillation_loss(
student_logits, teacher_logits, labels,
temperature=temperature, alpha=alpha
)
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
print(f"Epoch {epoch+1}: Loss = {total_loss/len(train_loader):.4f}")
return student
Distilled Models (Production-Ready)¶
| Model | Size Reduction | Speedup | Accuracy |
|---|---|---|---|
| DistilBERT | 40% | 60% | 97% of BERT |
| TinyBERT | 72% | 80% | 96% of BERT |
| MobileBERT | 4x | 5.5x | Optimized for mobile |
| DistilWhisper | 50% | 60% | Speech recognition |
Neural Network Pruning¶
Concept: Remove redundant weights while preserving accuracy.
Lottery Ticket Hypothesis (Frankle & Carbin 2019):
Dense networks contain sparse subnetworks ("winning tickets") that achieve full accuracy when trained in isolation.
Pruning Types¶
| Type | What's Removed | Hardware Impact |
|---|---|---|
| Unstructured | Individual weights | Needs sparse kernels |
| Structured | Entire filters/channels | GPU-friendly speedup |
| Global | Cross-layer ranking | Balanced compression |
| Iterative | Gradual increase | Better accuracy |
Pruning Criteria¶
import torch
import torch.nn.utils.prune as prune
# 1. Magnitude Pruning (L1 norm)
# Remove weights where |w_i| < θ
prune.l1_unstructured(model.conv1, name='weight', amount=0.5)
# 2. Global Pruning (cross-layer)
prune.global_unstructured(
[(module, 'weight') for module in model.modules()
if isinstance(module, (nn.Conv2d, nn.Linear))],
pruning_method=prune.L1Unstructured,
amount=0.8 # 80% sparsity
)
# 3. Structured Pruning (channel removal)
prune.ln_structured(model.conv1, name='weight', amount=0.3, n=2, dim=0)
# Make permanent
prune.remove(model.conv1, 'weight')
Iterative Pruning Pipeline¶
def iterative_prune(model, train_loader, sparsity_schedule=[0.2, 0.5, 0.8],
fine_tune_epochs=5):
"""
Gradually increase sparsity with retraining between steps.
Key insight: 90% sparsity with iterative approach retains 99% accuracy
on Vision Transformers (NeurIPS 2024 findings).
"""
for target_sparsity in sparsity_schedule:
# Prune
params_to_prune = [
(m, 'weight') for m in model.modules()
if isinstance(m, (nn.Conv2d, nn.Linear))
]
prune.global_unstructured(
params_to_prune,
pruning_method=prune.L1Unstructured,
amount=target_sparsity
)
# Fine-tune to recover accuracy
model.train()
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
for epoch in range(fine_tune_epochs):
for data, target in train_loader:
optimizer.zero_grad()
output = model(data)
loss = F.cross_entropy(output, target)
loss.backward()
optimizer.step()
# Permanentize pruning
for m, _ in params_to_prune:
if hasattr(m, 'weight_mask'):
prune.remove(m, 'weight')
return model
Pruning Benchmarks (ResNet-50, ImageNet, RTX 4090)¶
| Model | Inference (ms) | Top-1 Acc | Size (MB) | FLOPs (G) |
|---|---|---|---|---|
| Dense Baseline | 35.4 | 76.1% | 98 | 4.1 |
| Prune 90% | 8.2 | 75.8% | 12 | 2.1 |
| INT8 Quant | 12.5 | 74.2% | 25 | 4.1 |
| Distilled | 15.1 | 75.3% | 49 | 2.8 |
Hybrid Compression Pipeline¶
Best practice 2025: Pruning → Quantization → Distillation stack
Original Model (7B, 28GB)
↓ Pruning (90% sparsity)
Compressed (700M params, 2.8GB)
↓ Quantization (INT8)
Smaller (350MB)
↓ Distillation (student model)
Final (100MB, 10x faster)
def hybrid_compress(teacher_model, train_loader, val_loader):
"""
P-Q-D pipeline: Pruning → Quantization → Distillation
Achieves 100x compression with <5% accuracy loss.
"""
# Step 1: Prune teacher
pruned = iterative_prune(
teacher_model, train_loader,
sparsity_schedule=[0.3, 0.6, 0.9]
)
# Step 2: Quantize (INT8)
quantized = torch.quantization.quantize_dynamic(
pruned, {nn.Linear}, dtype=torch.qint8
)
# Step 3: Distill to student
student = SmallModel() # e.g., 10x smaller
distilled = train_with_distillation(
teacher=quantized,
student=student,
train_loader=train_loader
)
return distilled
Edge Deployment Optimization¶
Target Devices: - Smartphones (5.4 billion users, 2025) - IoT sensors (TinyML) - Edge servers (Jetson Orin)
Model Size Targets¶
| Device | RAM Limit | Model Size Target | Techniques |
|---|---|---|---|
| Mobile CPU | 2-4 GB | <100 MB | Pruning + INT8 |
| IoT/TinyML | 256 KB | <1 MB | Quantization + Pruning |
| Edge GPU | 8-16 GB | <500 MB | Structured pruning |
Mobile Deployment (TensorFlow Lite)¶
import tensorflow as tf
def optimize_for_mobile(keras_model):
"""Optimize model for mobile deployment."""
# Convert to TFLite
converter = tf.lite.TFLiteConverter.from_keras_model(keras_model)
# Enable optimizations
converter.optimizations = [tf.lite.Optimize.DEFAULT]
# INT8 quantization with representative dataset
def representative_dataset():
for data in train_dataset.take(100):
yield [tf.cast(data, tf.float32)]
converter.representative_dataset = representative_dataset
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.uint8
converter.inference_output_type = tf.uint8
tflite_model = converter.convert()
# Save
with open('model_int8.tflite', 'wb') as f:
f.write(tflite_model)
return tflite_model
ONNX Export for Cross-Platform¶
import torch.onnx
def export_to_onnx(model, input_shape=(1, 3, 224, 224), output_path="model.onnx"):
"""Export PyTorch model to ONNX for TensorRT/CoreML deployment."""
model.eval()
dummy_input = torch.randn(input_shape)
torch.onnx.export(
model,
dummy_input,
output_path,
export_params=True,
opset_version=14,
do_constant_folding=True,
input_names=['input'],
output_names=['output'],
dynamic_axes={
'input': {0: 'batch_size'},
'output': {0: 'batch_size'}
}
)
print(f"Exported to {output_path}")
Cost Savings (2025 Benchmarks)¶
| Metric | Dense | Pruned 90% | Savings |
|---|---|---|---|
| GPU Hours (Fine-tune) | $4,000 | $1,500 | 62% |
| Storage/Month | $300 | $50 | 83% |
| Edge Inference | $500 | $100 | 80% |
| Total | $4,800 | $1,650 | 66% |
Interview Questions¶
Q: Explain knowledge distillation and why temperature matters.
Teacher-student framework where compact model learns from large model's outputs. Temperature T > 1 softens softmax outputs:
softmax(z_i/T)spreads probability mass across classes, revealing class relationships. T=2-5 typical; higher T = softer targets = more "dark knowledge" transferred. Without temperature, student only learns the top prediction, not the nuanced relationships between classes.
Q: What is the Lottery Ticket Hypothesis and how does it relate to pruning?
Frankle & Carbin (2019): Dense networks contain sparse subnetworks ("winning tickets") that match full accuracy when trained in isolation. Implication: Most parameters are redundant. Key insight - must reset remaining weights to initialization before retraining. Iterative magnitude pruning (IMP) achieves 90% sparsity with <2% accuracy loss by gradually pruning and retraining.
Q: When would you use structured vs unstructured pruning?
Structured (remove entire filters/channels) when: GPU inference needed, no sparse kernel support, want actual speedup on stock PyTorch. Unstructured (remove individual weights) when: Maximum compression needed, sparse tensor cores available (Ampere+ GPUs), storage-constrained (model size). Structured gives direct speedup but may lose more accuracy; unstructured achieves higher sparsity but needs hardware support.
Q: How do you deploy a 7B parameter LLM on a smartphone?
Multi-stage pipeline: (1) Pruning - 90% unstructured sparsity → 700M params; (2) Quantization - INT8/INT4 → 350-175MB; (3) Distillation - Student model 10x smaller → ~100MB. Target: <100MB for mobile. Use TFLite/ONNX Mobile for deployment. Consider model sharding for larger models. Test latency targets: <50ms for interactive, <200ms for background tasks.
Sources¶
- LabelYourData "Knowledge Distillation" (2025)
- Arik Poz "Knowledge Distillation in PyTorch" (Apr 2025)
- Johal.in "Neural Network Pruning with Torch-Prune" (Nov 2025)
- Frontiers in Robotics and AI "Survey of Model Compression" (2025)
- Hinton et al. "Distilling the Knowledge in a Neural Network" (2015, canonical)
22. Monitoring & Observability for ML¶
Prometheus + Grafana + Structured Logging для production ML
Почему ML Monitoring критичен¶
Проблема: 70% AI deployments fail из-за undetected performance drifts (Gartner 2025) Уникальные ML вызовы: Data drift, concept drift, model degradation - молча эродируют performance
Four Pillars of Observability: 1. Metrics - Prometheus (latency, accuracy, throughput) 2. Logs - Structured JSON (errors, predictions, audit) 3. Traces - Distributed tracing (Jaeger) 4. Dashboards - Grafana visualizations
Key Metrics to Monitor¶
Model Performance Metrics¶
| Metric | Type | Prometheus Type | Purpose |
|---|---|---|---|
| Accuracy | Gauge | ml_accuracy |
Track model quality |
| Prediction latency | Histogram | ml_latency_seconds |
P50/P95/P99 |
| Prediction count | Counter | ml_predictions_total |
Throughput |
| Confidence scores | Histogram | ml_confidence_scores |
Distribution |
| Error rate | Counter | ml_errors_total |
By error type |
Data Quality Metrics¶
| Metric | Detection Method |
|---|---|
| Feature Drift | KS test, PSI |
| Missing Values | % per feature |
| Outliers | Isolation Forest |
| Data Freshness | Timestamp check |
System Metrics¶
| Metric | Threshold |
|---|---|
| API Latency | P99 < 500ms |
| CPU/GPU Utilization | < 80% |
| Memory | < 85% |
| Request Rate | Track trends |
Prometheus Instrumentation¶
from prometheus_client import Counter, Gauge, Histogram, generate_latest, REGISTRY
from flask import Flask, Response
import numpy as np
# Define ML-specific metrics
predictions_total = Counter(
'ml_predictions_total',
'Total predictions',
['model_name', 'model_version', 'status']
)
prediction_accuracy = Gauge(
'ml_prediction_accuracy',
'Rolling accuracy',
['model_name']
)
inference_latency = Histogram(
'ml_inference_latency_seconds',
'Inference latency',
buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0]
)
feature_drift_score = Gauge(
'ml_feature_drift_score',
'Drift score per feature',
['feature_name']
)
class MLModelMonitor:
def __init__(self, model, model_name, model_version):
self.model = model
self.model_name = model_name
self.model_version = model_version
self.app = Flask(__name__)
self._setup_routes()
def _setup_routes(self):
@self.app.route('/metrics')
def metrics():
return Response(generate_latest(REGISTRY), mimetype='text/plain')
@self.app.route('/predict', methods=['POST'])
def predict():
import time
data = request.json
with inference_latency.time():
try:
prediction = self.model.predict(data['features'])
predictions_total.labels(
model_name=self.model_name,
model_version=self.model_version,
status='success'
).inc()
return {'prediction': prediction.tolist()}
except Exception as e:
predictions_total.labels(
model_name=self.model_name,
model_version=self.model_version,
status='error'
).inc()
raise
def run(self, port=5000):
self.app.run(host='0.0.0.0', port=port)
Prometheus Configuration¶
# prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: 'ml-models'
static_configs:
- targets: ['model-server:5000']
metrics_path: '/metrics'
rule_files:
- 'alerts.yml'
alerting:
alertmanagers:
- static_configs:
- targets: ['alertmanager:9093']
Alerting Rules¶
# alerts.yml
groups:
- name: ml-model-alerts
rules:
# Accuracy degradation
- alert: ModelAccuracyDropped
expr: ml_prediction_accuracy < 0.85
for: 5m
labels:
severity: warning
annotations:
summary: "Model accuracy dropped below 85%"
description: "{{ $labels.model_name }} accuracy is {{ $value }}"
# Latency spike
- alert: HighInferenceLatency
expr: histogram_quantile(0.99, rate(ml_inference_latency_seconds_bucket[5m])) > 0.5
for: 2m
labels:
severity: critical
annotations:
summary: "P99 latency exceeds 500ms"
# Data drift detected
- alert: DataDriftDetected
expr: ml_feature_drift_score > 0.1
for: 10m
labels:
severity: warning
annotations:
summary: "Feature {{ $labels.feature_name }} showing drift"
# Error rate spike
- alert: HighErrorRate
expr: rate(ml_predictions_total{status="error"}[5m]) / rate(ml_predictions_total[5m]) > 0.05
for: 2m
labels:
severity: critical
annotations:
summary: "Error rate exceeds 5%"
Grafana Dashboard Design¶
Dashboard Hierarchy¶
Overview Dashboard
├── Model Health Panel (status indicators)
├── Key Metrics Row (accuracy, latency, throughput)
└── Alert Summary
Model Detail Dashboard
├── Accuracy Trend (time series)
├── Prediction Distribution (histogram)
├── Feature Drift Heatmap
└── Confusion Matrix
Troubleshooting Dashboard
├── Error Logs
├── Resource Utilization
└── Request Traces
PromQL Queries¶
# Prediction rate (requests/second)
rate(ml_predictions_total[5m])
# P95 latency
histogram_quantile(0.95, rate(ml_inference_latency_seconds_bucket[5m]))
# Accuracy trend (if logged)
avg_over_time(ml_prediction_accuracy[1h])
# Error rate percentage
100 * sum(rate(ml_predictions_total{status="error"}[5m])) / sum(rate(ml_predictions_total[5m]))
# Feature drift score
ml_feature_drift_score > 0.1
Structured Logging Best Practices¶
import structlog
import json
from datetime import datetime
# Configure structured logging
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer()
],
wrapper_class=structlog.stdlib.BoundLogger,
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
)
logger = structlog.get_logger()
def log_prediction(model_name: str, model_version: str,
input_features: dict, prediction: any,
latency_ms: float, confidence: float):
"""Structured log for each prediction."""
logger.info(
"prediction",
model_name=model_name,
model_version=model_version,
input_hash=hash(frozenset(input_features.items())) % 10000, # Anonymized
prediction=str(prediction),
latency_ms=latency_ms,
confidence=confidence,
timestamp=datetime.utcnow().isoformat()
)
def log_error(model_name: str, error_type: str,
error_message: str, stack_trace: str = None):
"""Structured error logging."""
logger.error(
"model_error",
model_name=model_name,
error_type=error_type,
error_message=error_message,
stack_trace=stack_trace,
timestamp=datetime.utcnow().isoformat()
)
Log Schema Example¶
{
"timestamp": "2025-09-14T14:30:00.123Z",
"level": "INFO",
"event": "prediction",
"model_name": "fraud_detector_v2",
"model_version": "2.3.1",
"input_hash": 4521,
"prediction": "not_fraud",
"latency_ms": 23.5,
"confidence": 0.92,
"request_id": "req-abc123",
"user_id": "user-456"
}
Drift Detection Implementation¶
from scipy import stats
import numpy as np
class DriftMonitor:
"""Statistical drift detection for feature monitoring."""
def __init__(self, reference_data: np.ndarray, feature_names: list):
self.reference = reference_data
self.feature_names = feature_names
self.baseline_stats = self._compute_baseline()
def _compute_baseline(self):
"""Compute reference statistics."""
return {
'mean': np.mean(self.reference, axis=0),
'std': np.std(self.reference, axis=0),
'percentiles': np.percentile(self.reference, [5, 50, 95], axis=0)
}
def ks_test_drift(self, current_data: np.ndarray) -> dict:
"""
Kolmogorov-Smirnov test for drift detection.
Returns p-value and statistic per feature.
"""
drift_scores = {}
for i, name in enumerate(self.feature_names):
stat, p_value = stats.ks_2samp(
self.reference[:, i],
current_data[:, i]
)
drift_scores[name] = {
'statistic': stat,
'p_value': p_value,
'drift_detected': p_value < 0.05
}
# Update Prometheus gauge
feature_drift_score.labels(feature_name=name).set(stat)
return drift_scores
def psi_score(self, current_data: np.ndarray, bins=10) -> dict:
"""
Population Stability Index (PSI) for drift.
PSI < 0.1: No drift
PSI 0.1-0.25: Moderate drift
PSI > 0.25: Significant drift
"""
psi_scores = {}
for i, name in enumerate(self.feature_names):
ref_hist, bin_edges = np.histogram(self.reference[:, i], bins=bins)
cur_hist, _ = np.histogram(current_data[:, i], bins=bin_edges)
# Normalize
ref_pct = ref_hist / len(self.reference)
cur_pct = cur_hist / len(current_data)
# PSI formula
psi = np.sum((cur_pct - ref_pct) * np.log(cur_pct / ref_pct + 1e-10))
psi_scores[name] = psi
return psi_scores
Multi-Level Alerting Strategy¶
| Level | Severity | Response Time | Example |
|---|---|---|---|
| P0 | Critical | <5 min | Service down, accuracy <70% |
| P1 | High | <30 min | Latency P99 > 1s, error rate >10% |
| P2 | Warning | <4 hours | Drift detected, accuracy <85% |
| P3 | Info | Next business day | Minor degradation trends |
Alert Fatigue Prevention: 1. Group related alerts - Single notification for related issues 2. Set appropriate for: durations - Avoid transient spikes 3. Use composite alerts - Multiple conditions must be true 4. Implement acknowledgments - Track response 5. Regular threshold review - Tune based on SLOs
Monitoring Stack Comparison¶
| Tool | Purpose | Strengths |
|---|---|---|
| Prometheus | Metrics collection | Pull model, PromQL, alerting |
| Grafana | Visualization | Multi-source, dashboards, alerts |
| MLflow | Experiment tracking | Model registry, versioning |
| Evidently AI | ML-specific monitoring | Drift dashboards, reports |
| Datadog | Full-stack observability | Logs+metrics+traces |
Interview Questions¶
Q: What metrics would you monitor for a production ML model?
Three categories: Model Performance (accuracy, F1, confidence distribution, prediction latency histogram), Data Quality (feature drift via KS test/PSI, missing value rates, outlier frequency, data freshness), System Health (API latency P50/P95/P99, CPU/GPU utilization, memory, request throughput). Key insight: Monitor prediction confidence distributions - shifts indicate input changes even before accuracy drops.
Q: How do you detect data drift in production?
Two main methods: KS Test (Kolmogorov-Smirnov) - compares CDFs of reference vs current data, D = sup_x |F_1(x) - F_2(x)|, alert if p < 0.05. PSI (Population Stability Index) - bin-based comparison, PSI < 0.1 no drift, 0.1-0.25 moderate, >0.25 significant. Implementation: Store reference distributions from training, compute rolling statistics on production data, update Prometheus gauges, trigger alerts when thresholds exceeded.
Q: What's the difference between monitoring and observability?
Monitoring = "Is it working?" (dashboards, alerts, known unknowns). Observability = "Why isn't it working?" (logs + metrics + traces, unknown unknowns). Monitoring answers predefined questions; observability enables ad-hoc debugging. For ML: monitoring tracks accuracy, latency; observability helps diagnose WHY accuracy dropped through correlated logs, traces, and metric drill-downs.
Q: How do you design alerts to avoid alert fatigue?
(1) Multi-level severity - P0 critical <5min, P3 info next day. (2) Composite alerts - require multiple conditions (accuracy low AND latency high). (3) Appropriate
for:durations - avoid transient spikes. (4) Group related alerts - single notification for cascading failures. (5) Regular tuning - review false positive rates weekly. Target: <5 actionable alerts/day, <1 false positive/week.
Sources¶
- ML Journey "Monitoring ML Models with Prometheus and Grafana" (Sep 2025)
- Johal.in "MLOps Monitoring with Prometheus and Grafana" (Sep 2025)
- Diousoft "Model Monitoring & Logging" (2025)
- Grafana Labs "Observability Survey" (Mar 2025)
- UpCloud "Observability with Prometheus" (2025)
23. Security for ML (Adversarial, Model Extraction, Privacy)¶
Защита ML моделей от атак: adversarial examples, model stealing, privacy breaches
Почему ML Security критичен¶
Проблема: 41% enterprises reported AI security incidents by late 2024 (Gartner) Первый agentic AI attack: September 2025 - AI performed 80-90% of attack work autonomously
ML-specific attack surface: - Models learn from data → attackers manipulate data or learning - Black-box access via API → can probe and extract - Confidence scores leak information → enable inversion attacks
Attack Taxonomy¶
| Attack Type | Phase | Target | Access Required |
|---|---|---|---|
| Evasion | Inference | Model predictions | Query access |
| Data Poisoning | Training | Training data | Data pipeline |
| Model Extraction | Inference | Model IP | Query access |
| Model Inversion | Inference | Training data | Query access |
| Membership Inference | Inference | Training set membership | Query access |
1. Adversarial Attacks (Evasion)¶
Concept: Imperceptible input modifications that fool ML models.
Original Image: [cat] → Model: "cat" (99%)
+ Adversarial Noise (invisible to human)
Adversarial Image: [cat+ε] → Model: "dog" (99%)
Attack Methods¶
FGSM (Fast Gradient Sign Method): $\(x_{adv} = x + \epsilon \cdot \text{sign}(\nabla_x J(\theta, x, y))\)$
Where: - \(x\) = original input - \(\epsilon\) = perturbation magnitude - \(J\) = loss function - \(\nabla_x J\) = gradient of loss w.r.t. input
PGD (Projected Gradient Descent): Iterative FGSM with projection back to ε-ball.
import torch
import torch.nn.functional as F
def fgsm_attack(model, x, y, epsilon=0.1):
"""Fast Gradient Sign Method attack."""
x_adv = x.clone().detach().requires_grad_(True)
output = model(x_adv)
loss = F.cross_entropy(output, y)
loss.backward()
# Add perturbation in gradient direction
x_adv = x_adv + epsilon * x_adv.grad.sign()
# Clip to valid range
x_adv = torch.clamp(x_adv, 0, 1)
return x_adv.detach()
def pgd_attack(model, x, y, epsilon=0.1, alpha=0.01, iterations=40):
"""Projected Gradient Descent attack (stronger)."""
x_adv = x.clone().detach()
for _ in range(iterations):
x_adv.requires_grad_(True)
output = model(x_adv)
loss = F.cross_entropy(output, y)
loss.backward()
# Step in gradient direction
x_adv = x_adv + alpha * x_adv.grad.sign()
# Project back to epsilon ball
delta = torch.clamp(x_adv - x, -epsilon, epsilon)
x_adv = torch.clamp(x + delta, 0, 1)
x_adv = x_adv.detach()
return x_adv
White-box vs Black-box¶
| Setting | Attacker Knowledge | Attack Strength |
|---|---|---|
| White-box | Architecture, weights, data | FGSM, PGD, CW |
| Black-box | Query access only | Query-based, transfer attacks |
2. Model Extraction Attacks¶
Concept: Recreate proprietary model through systematic queries.
Real case: DeepSeek used GPT-¾ API outputs for distillation (Dec 2024), OpenAI revoked access.
class ModelExtractionDefense:
"""Defenses against model stealing."""
def __init__(self, rate_limit=1000, noise_scale=0.01):
self.rate_limit = rate_limit
self.noise_scale = noise_scale
self.query_counts = {} # Track per-user queries
def check_rate_limit(self, user_id):
"""Prevent mass querying."""
count = self.query_counts.get(user_id, 0)
if count > self.rate_limit:
raise Exception("Rate limit exceeded")
self.query_counts[user_id] = count + 1
def perturb_output(self, probs):
"""Add noise to obscure exact model behavior."""
noise = torch.randn_like(probs) * self.noise_scale
perturbed = probs + noise
return F.softmax(perturbed, dim=-1)
def limit_output_precision(self, probs, top_k=5):
"""Only return top-k probabilities, not full distribution."""
top_probs, top_indices = torch.topk(probs, k=top_k)
return {idx.item(): prob.item() for idx, prob in zip(top_indices, top_probs)}
3. Model Inversion Attacks¶
Concept: Reconstruct training data from model outputs.
Example: Medical imaging model → reconstruct patient SSNs, names from confidence scores.
NIST Classification (Mar 2025): - Confidence score attacks: Analyze probability distributions - Label-only attacks: Use only hard labels (more queries needed) - Attribute inference: Infer specific sensitive features
class ModelInversionDefense:
"""Defenses against training data reconstruction."""
def __init__(self, model, epsilon=1.0):
self.model = model
self.epsilon = epsilon # Differential privacy parameter
def apply_differential_privacy(self, gradients):
"""
DP-SGD: Clip gradients + add Gaussian noise.
Prevents any single training sample from dominating.
"""
# Clip per-sample gradients
max_norm = 1.0
total_norm = torch.norm(gradients)
if total_norm > max_norm:
gradients = gradients * (max_norm / total_norm)
# Add calibrated noise
noise = torch.randn_like(gradients) * self.epsilon
return gradients + noise
def reduce_output_precision(self, logits, decimals=2):
"""Round logits to reduce information leakage."""
return torch.round(logits * (10 ** decimals)) / (10 ** decimals)
4. Membership Inference Attacks (MIA)¶
Concept: Determine if a specific sample was in training data.
Risk: HIPAA/GDPR violations - reveals if patient data was used.
class MembershipInferenceDefense:
"""Prevent determination of training set membership."""
def __init__(self, threshold=0.5):
self.threshold = threshold
def regularize_confidence(self, probs):
"""
Reduce overconfidence on training samples.
Models often show higher confidence on training data.
"""
# Label smoothing effect
smoothed = probs * 0.9 + 0.1 / probs.size(-1)
return smoothed
def detect_attack_pattern(self, query_history):
"""Detect MIA attack patterns: many similar queries."""
# MIA attackers query similar inputs to probe membership
# Monitor for suspicious query patterns
pass
5. Differential Privacy¶
Definition: A mechanism M satisfies (ε, δ)-DP if for all datasets D, D' differing by one record:
Key idea: Output doesn't change significantly if any one record is removed.
import torch
from torch import nn
class DPSGDTrainer:
"""Differentially Private Stochastic Gradient Descent."""
def __init__(self, model, epsilon=1.0, delta=1e-5, max_grad_norm=1.0):
self.model = model
self.epsilon = epsilon
self.delta = delta
self.max_grad_norm = max_grad_norm
# Calculate noise scale
self.noise_scale = self._compute_noise_scale()
def _compute_noise_scale(self):
"""Compute Gaussian noise scale for (ε, δ)-DP."""
# Simplified: sigma = sqrt(2 * ln(1.25/delta)) / epsilon
import math
return math.sqrt(2 * math.log(1.25 / self.delta)) / self.epsilon
def step(self, loss, optimizer):
"""Perform one DP-SGD step."""
# Compute per-sample gradients
loss.backward()
# Clip gradients
torch.nn.utils.clip_grad_norm_(
self.model.parameters(),
self.max_grad_norm
)
# Add Gaussian noise to gradients
for param in self.model.parameters():
if param.grad is not None:
noise = torch.randn_like(param.grad) * self.noise_scale * self.max_grad_norm
param.grad += noise
optimizer.step()
optimizer.zero_grad()
Defense Summary Table¶
| Attack | Primary Defense | Secondary Defense |
|---|---|---|
| Evasion | Adversarial training | Input preprocessing, detection |
| Poisoning | Data validation, provenance | Robust aggregation, DP-SGD |
| Extraction | Rate limiting, output perturbation | Watermarking, query monitoring |
| Inversion | Differential privacy | Limit output precision |
| MIA | Regularization, DP | Confidence calibration |
Multi-Layer Defense Architecture¶
Layer 1: Data Security
├── Validate all inputs
├── Track data provenance
├── Apply differential privacy to sensitive data
Layer 2: Training Security
├── Adversarial training
├── DP-SGD for privacy
├── Model versioning with attestations
Layer 3: Access Control
├── Zero trust for model APIs
├── Rate limiting per user
├── Authentication required
Layer 4: Output Protection
├── Limit confidence score precision
├── Add calibrated noise to outputs
├── Return top-k only, not full distributions
Layer 5: Monitoring
├── Detect unusual query patterns
├── Alert on extraction attempts
├── Audit trail for compliance
Interview Questions¶
Q: What is an adversarial attack and how does FGSM work?
Adversarial attacks add imperceptible perturbations to inputs that fool ML models. FGSM (Fast Gradient Sign Method) computes: x_adv = x + ε·sign(∇_x J(θ, x, y)) - adds perturbation in the direction that maximizes loss. ε controls perturbation magnitude (typically 0.01-0.1). Works in white-box setting where attacker has model access. Defenses include adversarial training (include adversarial examples in training data), input preprocessing (feature squeezing), and detection methods.
Q: How do model extraction attacks work and how do you prevent them?
Attackers query the model API systematically with diverse inputs, collect (input, output) pairs, then train a "copycat" model to mimic the original. Defenses: (1) Rate limiting - prevent mass queries; (2) Output perturbation - add noise to confidence scores; (3) Limit information - return only top-k predictions, not full distributions; (4) Watermarking - embed unique patterns in predictions to prove theft; (5) Query monitoring - detect suspicious patterns (high volume, diverse inputs probing decision boundaries).
Q: What is differential privacy and why does it help against model inversion?
DP ensures model output doesn't change significantly if any single training record is removed - formally: P[M(D)∈S] ≤ e^ε · P[M(D')∈S] + δ. Implementation: DP-SGD clips per-sample gradients and adds Gaussian noise calibrated to (ε, δ). This prevents attackers from reconstructing training data because no individual record can significantly influence the model. Trade-off: higher ε = less privacy, better utility; typical ε = 1-10.
Q: How would you secure a model deployed as a public API?
Multi-layer defense: (1) Authentication - API keys, JWT tokens; (2) Rate limiting - prevent extraction attacks, e.g., 1000 queries/day per key; (3) Output limiting - return predictions only, not confidence distributions; (4) Monitoring - alert on unusual patterns (many similar queries, probing behavior); (5) Logging - audit trail for compliance; (6) Versioning - ability to rollback if compromised; (7) Input validation - reject malformed or suspicious inputs; (8) Consider differential privacy if training data is sensitive.
Sources¶
- SentinelOne "Model Inversion Attacks" (Jan 2026)
- YASH Technologies "Adversarial Attack Defenses" (Jan 2026)
- ByteJournal "Model Security" (Feb 2025)
- NIST Adversarial ML Taxonomy (Mar 2025)
- ResearchGate "Security and Privacy-Preserving for ML" (Jul 2025)
Видео-ресурсы¶
YouTube¶
Stanford CS329S: Machine Learning Systems Design - URL: https://stanford-cs329s.github.io/ - Lectures on ML deployment, monitoring
Full Stack LLM Bootcamp - Production LLM applications - URL: https://fullstackdeeplearning.com/llm-bootcamp/
Типичные заблуждения¶
Заблуждение: Feature Store нужен только для больших компаний
Training-serving skew -- причина #1 silent model degradation в production. Даже в команде из 3 ML-инженеров, без feature store features вычисляются по-разному в training (Spark/Pandas) и serving (Python/SQL), что даёт -5-15% accuracy drop в production. Feast (open-source) можно развернуть за 1 день.
Заблуждение: A/B тест ML-модели -- это просто random split пользователей
При network effects (соцсети, маркетплейсы) random split нарушает SUTVA -- поведение пользователя A влияет на пользователя B. Для RecSys: пользователь в treatment видит новые рекомендации, делится ими с пользователем в control. Решение: cluster-based randomization, geo-based split, или switchback experiments.
Заблуждение: drift detected = нужно переобучить модель
PSI > 0.25 на фичах означает сдвиг распределения, но НЕ обязательно падение quality. Сезонность в e-commerce (Black Friday) вызывает PSI spike, но модель может работать отлично. Правильный порядок: (1) зафиксировать drift, (2) проверить model quality на свежих данных, (3) переобучить только если quality упала. Автоматический retrain по PSI без проверки quality -- дорогая ошибка.
Вопросы с оценкой ответов¶
Зачем нужен multi-stage recommender вместо одной модели?
"Для улучшения accuracy через каскад моделей" -- не объясняет главную причину
"Scale и latency. При 1B items и SLA < 200ms невозможно скоринг всех items тяжёлой моделью. Multi-stage funnel: Retrieval (Two-Tower + ANN) сужает до ~1000 за ~10ms, Ranking (deep model) скорит ~500 за ~100ms, Re-ranking применяет diversity и business rules за ~20ms. Total ~150ms. Это единственная viable архитектура при YouTube/Netflix масштабе."
Как выбрать между Feast (OSS) и Tecton (managed) feature store?
"Feast бесплатный, поэтому всегда лучше" -- игнорирует total cost of ownership
"Зависит от scale и team: Feast (OSS) -- бесплатный, но self-managed: подходит для startups, обучения, бюджетных constraints. Нужен DevOps для Redis + Spark. Tecton (managed) -- дорого, но enterprise-grade: rich real-time transformations, advanced monitoring, full support. Окупается при 5+ ML-инженерах, когда time-to-production важнее cost. Hopsworks -- middle ground: open-core, хороший real-time."
Связанные темы¶
dl_004_optimizers- Training optimizationllm_001_rag_pipeline- RAG architecturellm_012_llm_prod- LLM production (детальнее)Section 11-13 cross-refs:
Section 14-15 cross-refs:
Section 16 cross-refs:
Section 17-19 cross-refs:
Section 20 cross-refs: