Перейти к содержанию

ML System Design: Учебные материалы

~37 минут чтения

Предварительно: Подготовка к интервью MLSD | Deep Learning материалы

ML System Design охватывает 23 темы: от model serving и A/B testing до AI agents в production и ML security. Каждая тема содержит лучшие источники (книги, papers, блоги), ключевые формулы и концепции, gotchas, и interview-ready Q&A. По данным levels.fyi за 2025, MLSD-раунд присутствует в 85% интервью на позиции ML Engineer уровня Senior+.

Источники для 23 задач ML System Design Обновлено: 2026-02-11


Содержание

  1. Обзор задач
  2. 1. Model Serving & Latency Optimization
  3. 2. A/B Testing
  4. 3. Drift Detection
  5. 4. Model Calibration
  6. 5. Ranking Metrics Advanced
  7. 6. Recommendation Systems (RecSys)
  8. 7. ML Trade-offs Quiz
  9. 8. LLM Production
  10. 9. Feature Stores
  11. 10. ML Infrastructure (Model Registry & Experiment Tracking)
  12. 11. Multi-Armed Bandits (Exploration vs Exploitation)
  13. 12. Online Learning (Streaming ML)
  14. 13. Multi-Stage Recommender Systems
  15. 14. Causal Inference for ML
  16. 15. Vector Databases for ML
  17. 16. Cost Optimization for ML Inference
  18. 17. Multi-Model Serving and Model Routing
  19. 18. Data Quality for ML in Production
  20. 19. Foundation Models in Production
  21. 20. AI Agents in Production
  22. 21. Model Compression (Knowledge Distillation, Pruning, Edge Deployment)
  23. 22. Monitoring & Observability for ML
  24. 23. Security for ML (Adversarial, Model Extraction, Privacy)
  25. Видео-ресурсы
  26. Связанные темы

Обзор задач

ID Задача Фокус
mlsd_005 Model Serving / Latency Optimization Inference, batching, quantization
mlsd_002 A/B Testing Statistical significance, sample size
mlsd_004 Drift Detection PSI, KS-test, monitoring
mlsd_008 Model Calibration Platt, Isotonic, Brier score
mlsd_001 Ranking Metrics Advanced NCE, CTR Lift, NDCG
mlsd_003 Trade-offs Quiz 15 production scenarios
mlsd_006 RecSys Two-Tower, Cold Start
mlsd_007 LLM Production Guardrails, Prompt Injection

1. Model Serving & Latency Optimization

Книги

Designing Machine Learning Systems (Chip Huyen) - Chapter 7: Model Deployment - Chapter 8: Monitoring - URL: https://www.oreilly.com/library/view/designing-machine-learning/9781098107956/

Machine Learning Engineering (Andriy Burkov) - Chapter 5: Model Serving - URL: http://www.mlebook.com/

Статьи

ML Inference Latency Optimization - System Architecture Guide - URL: https://gist.github.com/epappas/bdbf6f4d0f333fdb1230e20af8d7540d - Охватывает: model optimization, serving infrastructure, hardware utilization

Model Serving: Architectures for High-Performance Inference - URL: https://www.whaleflux.com/blog/efficient-model-serving-architectures-for-high-performance-inference/ - Паттерны: dynamic batching, quantization, multi-model serving

MLOps: Model serving (Raghu Hemadri) - URL: https://raghuhemadri.github.io/blog/2025/mlops-model-serving/ - Баланс между speed, cost, reliability

Ключевые концепции

Latency Budget:
- P50 < 50ms (interactive)
- P99 < 200ms (acceptable)
- P99.9 < 500ms (edge cases)

Optimization Techniques:
1. Model-level: quantization, pruning, distillation
2. Serving-level: batching, caching, async
3. Hardware-level: GPU, TPU, inference chips

Код

# Dynamic Batching
class DynamicBatcher:
    def __init__(self, model, max_batch_size=32, max_wait_ms=10):
        self.model = model
        self.max_batch_size = max_batch_size
        self.max_wait_ms = max_wait_ms
        self.queue = []

    async def predict(self, input_data):
        self.queue.append(input_data)
        if len(self.queue) >= self.max_batch_size:
            return await self._process_batch()
        await asyncio.sleep(self.max_wait_ms / 1000)
        return await self._process_batch()

    async def _process_batch(self):
        batch = self.queue[:self.max_batch_size]
        self.queue = self.queue[self.max_batch_size:]
        return self.model.predict_batch(batch)

2. A/B Testing

Книги

Trustworthy Online Controlled Experiments (Kohavi, Tang, Xu) - Главы: Sample Size, Statistical Significance, Common Pitfalls - URL: https://www.wiley.com/en-us/Trustworthy+Online+Controlled+Experiments-p-9781119726649

Статьи

How to Crack Machine Learning System-Design Interviews - URL: https://towardsdatascience.com/cracking-machine-learning-system-design-interviews/ - A/B testing framework

ML System Design Interview Guide (Exponent) - URL: https://www.tryexponent.com/blog/machine-learning-system-design-interview-guide - Structured approach

Ключевые формулы

Sample Size (Two-proportion z-test):
n = 16 * sigma^2 / delta^2

где:
- sigma^2 = p(1-p) для binary outcomes
- delta = minimum detectable effect
- 16 = (1.96 + 0.84)^2 для 95% confidence, 80% power

Statistical Significance:
z = (p_A - p_B) / sqrt(p_pool * (1-p_pool) * (1/n_A + 1/n_B))

где p_pool = (x_A + x_B) / (n_A + n_B)

Код

import numpy as np
from scipy import stats

def calculate_sample_size(p_baseline, mde, alpha=0.05, power=0.8):
    """Calculate required sample size per variant"""
    z_alpha = stats.norm.ppf(1 - alpha/2)  # 1.96
    z_beta = stats.norm.ppf(power)          # 0.84

    p_pooled = p_baseline + mde/2
    variance = 2 * p_pooled * (1 - p_pooled)

    n = variance * (z_alpha + z_beta)**2 / mde**2
    return int(np.ceil(n))

def ab_test_significance(control, treatment, alpha=0.05):
    """Run chi-square test for A/B test"""
    contingency = np.array([[control['success'], control['total'] - control['success']],
                           [treatment['success'], treatment['total'] - treatment['success']]])
    chi2, p_value, dof, expected = stats.chi2_contingency(contingency)
    return {
        'p_value': p_value,
        'significant': p_value < alpha,
        'lift': (treatment['success']/treatment['total'] - control['success']/control['total']) / (control['success']/control['total'])
    }

3. Drift Detection

Статьи

Model Drift in Production (2026): Detection, Monitoring & Response - URL: https://alldaystech.com/guides/artificial-intelligence/model-drift-detection-monitoring-response - Типы drift: data, concept, label - Метрики: PSI, KS, Wasserstein, JS/KL

Monitoring Model Drift in Production: A Step-by-Step Playbook - URL: https://srikanthdevarajan.substack.com/p/monitoring-model-drift-in-production - KL divergence, PSI, KS-test explanations

Data Drift: Key Detection and Monitoring Techniques in 2026 - URL: https://labelyourdata.com/articles/machine-learning/data-drift

Инструменты

  • Evidently AI - open-source drift detection
  • NannyML - post-deployment monitoring
  • WhyLabs - observability platform
  • Fiddler AI - model performance monitoring

Ключевые метрики

Population Stability Index (PSI):
PSI = sum((actual_i - expected_i) * ln(actual_i / expected_i))

Interpretation:
- PSI < 0.1: No significant change
- 0.1 <= PSI < 0.25: Moderate change
- PSI >= 0.25: Significant change (ACTION REQUIRED)

Kolmogorov-Smirnov Test:
KS = max|F_baseline(x) - F_production(x)|

Wasserstein Distance (Earth Mover's):
W = integral |F_baseline(x) - F_production(x)| dx

Код

import numpy as np
from scipy import stats

def calculate_psi(expected, actual, buckets=10):
    """Calculate Population Stability Index"""
    def scale_range(series, range_min, range_max):
        return (series - series.min()) / (series.max() - series.min()) * (range_max - range_min) + range_min

    breakpoints = np.arange(0, buckets + 1) / buckets * 100

    if len(np.unique(expected)) < buckets:
        breakpoints = np.unique(expected)
    else:
        breakpoints = np.percentile(expected, breakpoints)

    expected_percents = np.histogram(expected, breakpoints)[0] / len(expected)
    actual_percents = np.histogram(actual, breakpoints)[0] / len(actual)

    # Handle zero divisions
    expected_percents = np.where(expected_percents == 0, 0.0001, expected_percents)
    actual_percents = np.where(actual_percents == 0, 0.0001, actual_percents)

    psi_value = np.sum((actual_percents - expected_percents) * np.log(actual_percents / expected_percents))
    return psi_value

def ks_test_drift(baseline, production):
    """Kolmogorov-Smirnov test for drift"""
    statistic, p_value = stats.ks_2samp(baseline, production)
    return {
        'ks_statistic': statistic,
        'p_value': p_value,
        'drift_detected': p_value < 0.05
    }

4. Model Calibration

Статьи

The Complete Guide to Platt Scaling - URL: https://www.blog.trainindata.com/complete-guide-to-platt-scaling/ - Platt scaling deep dive

Model Calibration and Isotonic Regression - URL: https://omnetaplconline.com/model-calibration-and-isotonic-regression-adjusting-confidence-scores-in-machine-learning/

Calibration Techniques for Machine Learning - URL: https://www.numberanalytics.com/blog/calibration-techniques-for-machine-learning-models

Papers

Probabilistic Outputs for Support Vector Machines (Platt, 1999) - Original Platt scaling paper (Advances in Large Margin Classifiers, MIT Press) - URL: https://home.cs.colorado.edu/~mozer/Teaching/syllabi/6622/papers/Platt1999.pdf

Ключевые концепции

Platt Scaling (Parametric):
- Fits logistic regression: P(y=1|f(x)) = 1 / (1 + exp(A*f(x) + B))
- Good for sigmoid-shaped calibration curves
- Fast, 2 parameters to learn

Isotonic Regression (Non-parametric):
- Fits piecewise constant function
- More flexible, can handle any shape
- Requires more data (risk of overfitting)

Brier Score (Evaluation):
BS = (1/N) * sum((p_i - y_i)^2)

Lower = better calibrated

Код

from sklearn.calibration import CalibratedClassifierCV, calibration_curve
from sklearn.isotonic import IsotonicRegression
import matplotlib.pyplot as plt

def calibrate_model(model, X_val, y_val, method='isotonic'):
    """Calibrate a trained model"""
    calibrated = CalibratedClassifierCV(model, method=method, cv='prefit')
    calibrated.fit(X_val, y_val)
    return calibrated

def plot_calibration_curve(y_true, y_prob, n_bins=10):
    """Plot calibration curve (reliability diagram)"""
    prob_true, prob_pred = calibration_curve(y_true, y_prob, n_bins=n_bins)

    plt.figure(figsize=(8, 6))
    plt.plot([0, 1], [0, 1], 'k--', label='Perfectly calibrated')
    plt.plot(prob_pred, prob_true, 's-', label='Model')
    plt.xlabel('Mean predicted probability')
    plt.ylabel('Fraction of positives')
    plt.title('Calibration Curve')
    plt.legend()
    return plt

def brier_score_loss(y_true, y_prob):
    """Calculate Brier score"""
    return np.mean((y_prob - y_true) ** 2)

5. Ranking Metrics Advanced

Книги

Recommender Systems Handbook - Chapter on Evaluation Metrics - URL: https://www.springer.com/gp/book/9781489976355

Статьи

Learning to Rank - URL: https://en.wikipedia.org/wiki/Learning_to_rank - Pointwise, pairwise, listwise approaches

Ключевые метрики

Normalized Discounted Cumulative Gain (NDCG):
DCG@k = sum((2^rel_i - 1) / log2(i + 1)) for i in 1..k
NDCG@k = DCG@k / IDCG@k

Normalized Cross-Entropy (NCE):
NCE = -1/N * sum(y_i * log(p_i) + (1-y_i) * log(1-p_i)) / H(entropy_of_baseline)

CTR Lift:
Lift = (CTR_treatment - CTR_control) / CTR_control

Mean Reciprocal Rank (MRR):
MRR = 1/|Q| * sum(1/rank_i)

Expected Reciprocal Rank (ERR):
ERR = sum(1/r * prod(1 - r_j) * P(relevant|r))

Код

import numpy as np

def dcg_at_k(relevances, k):
    """Discounted Cumulative Gain at k"""
    relevances = np.array(relevances)[:k]
    if relevances.size == 0:
        return 0.0
    discounts = np.log2(np.arange(2, relevances.size + 2))
    return np.sum(relevances / discounts)

def ndcg_at_k(relevances, k):
    """Normalized DCG at k"""
    dcg = dcg_at_k(relevances, k)
    ideal_relevances = sorted(relevances, reverse=True)
    idcg = dcg_at_k(ideal_relevances, k)
    if idcg == 0:
        return 0.0
    return dcg / idcg

def mean_reciprocal_rank(ranks):
    """Mean Reciprocal Rank"""
    ranks = np.array(ranks)
    return np.mean(1.0 / ranks[ranks > 0]) if np.any(ranks > 0) else 0.0

def ctr_lift(treatment_ctr, control_ctr):
    """Calculate CTR lift"""
    return (treatment_ctr - control_ctr) / control_ctr

6. Recommendation Systems (RecSys)

Статьи

The Two-Tower Model for Recommendation Systems: A Deep Dive - URL: https://www.shaped.ai/blog/the-two-tower-model-for-recommendation-systems-a-deep-dive - Two-Tower architecture explained

Two Towers RecSys: Python Embedding Towers Serving 2025 - URL: https://www.johal.in/two-towers-recsys-python-embedding-towers-serving-2025/ - Production implementation

Two Tower Models in Industry - URL: https://machinelearningatscale.substack.com/p/recsys-part-2-two-tower-models-in - Industry examples

Papers

Suggest, Complement, Inspire: Story of Two-Tower Recommendations at Allegro (2025) - URL: https://arxiv.org/abs/2508.03702 - Production RecSys at Allegro.com

Архитектура Two-Tower

Query Tower              Item Tower
    |                        |
[User Features]        [Item Features]
    |                        |
[Dense Layers]        [Dense Layers]
    |                        |
[Embedding]           [Embedding]
    \                        /
     \                      /
      ------ Dot Product ---
                |
          [Similarity Score]
                |
            [Top-K Items]

Cold Start Solutions

  1. Content-based: Use item features directly
  2. Popularity: Default to popular items
  3. Exploration: Bandit algorithms (epsilon-greedy, UCB)
  4. Cross-domain: Transfer from related domain
  5. Side information: Use metadata, context

Код

import torch
import torch.nn as nn

class L2Normalize(nn.Module):
    """L2 normalize along dim; nn.L2Normalize does NOT exist in PyTorch."""
    def __init__(self, dim=1):
        super().__init__()
        self.dim = dim
    def forward(self, x):
        return nn.functional.normalize(x, p=2, dim=self.dim)

class TwoTowerModel(nn.Module):
    def __init__(self, user_dim, item_dim, embedding_dim=64):
        super().__init__()
        self.user_tower = nn.Sequential(
            nn.Linear(user_dim, 128),
            nn.ReLU(),
            nn.Linear(128, embedding_dim),
            L2Normalize(dim=1)
        )
        self.item_tower = nn.Sequential(
            nn.Linear(item_dim, 128),
            nn.ReLU(),
            nn.Linear(128, embedding_dim),
            L2Normalize(dim=1)
        )

    def forward(self, user_features, item_features):
        user_emb = self.user_tower(user_features)
        item_emb = self.item_tower(item_features)
        return torch.sum(user_emb * item_emb, dim=1, keepdim=True)

    def get_user_embedding(self, user_features):
        return self.user_tower(user_features)

    def get_item_embedding(self, item_features):
        return self.item_tower(item_features)

7. ML Trade-offs Quiz

Типичные trade-offs

Scenario Trade-off Recommendation
Low latency vs High accuracy Latency budget Quantize model, reduce ensemble size
Interpretability vs Performance Black-box models Use SHAP/LIME for interpretability
Cold start vs Popularity bias New users/items Hybrid approach, exploration
Batch vs Real-time Freshness vs Cost Lambda architecture (batch + streaming)
Feature complexity vs Maintenance Feature bloat Regular feature pruning, feature stores
Model complexity vs Inference cost Deep models Distillation, pruning, efficient architectures
Data freshness vs Stability Feature drift Version features, gradual rollout
Personalization vs Privacy User data Federated learning, differential privacy
Exploration vs Exploitation Bandits epsilon-greedy, Thompson sampling
Precision vs Recall Threshold Business metric optimization
Online vs Offline evaluation A/B test cost Proxy metrics, gradual rollout
Model size vs Latency Mobile deployment Quantization, knowledge distillation
Training time vs Model quality Resources Early stopping, hyperparameter search
Feature engineering vs Deep learning Data vs Compute Start simple, add complexity
Monolith vs Microservices Architecture Domain-driven design, service boundaries

8. LLM Production

Guardrails

OWASP Top 10 for LLM Applications (2025) - LLM01: Prompt Injection - LLM02: Insecure Output Handling - LLM03: Training Data Poisoning - LLM04: Model Denial of Service - LLM05: Supply Chain Vulnerabilities - LLM06: Sensitive Information Disclosure - LLM07: Insecure Plugin Design - LLM08: Excessive Agency - LLM09: Overreliance - LLM10: Model Theft

Инструменты

  • NeMo Guardrails - NVIDIA
  • Guardrails AI - Open source
  • Lakera - Enterprise security

Prompt Injection Defense

def sanitize_input(user_input: str) -> str:
    """Basic input sanitization"""
    # Remove potential injection patterns
    patterns = [
        r'ignore (all )?(previous|above) instructions',
        r'disregard (all )?(previous|above)',
        r'you are now',
        r'system:',
        r'<\|.*?\|>',
    ]
    for pattern in patterns:
        user_input = re.sub(pattern, '', user_input, flags=re.IGNORECASE)
    return user_input.strip()

def structured_output_guard(output: str, schema: dict) -> bool:
    """Validate output against schema"""
    try:
        parsed = json.loads(output)
        validate(parsed, schema)
        return True
    except (json.JSONDecodeError, ValidationError):
        return False

9. Feature Stores

Источники: Aerospike Blog (July 2025), Reintech.io Feature Store Comparison (Jan 2026)

What is a Feature Store?

Feature Store — centralized repository для ML features, который обеспечивает: - Единый источник правды для features - Consistency между training и inference - Reusability features между командами - Low-latency serving для real-time predictions

Why Feature Stores Matter

Проблема без Feature Store: - Data scientists тратят ~80% времени на feature engineering - Дублирование transformation logic - Training-serving skew (features отличаются между train/inference) - Нет versioning'а features

Architecture Components

graph TD
    RAW["Raw Data Sources"] --> ING["Ingestion Pipelines"]
    ING --> REG["Feature Registry<br/>(Metadata)"]
    REG --> OFF["Offline Store<br/>(Data Lake/Warehouse)<br/>For Training"]
    REG --> ON["Online Store<br/>(Redis/DynamoDB)<br/>For Real-time"]
    OFF --> TRAIN["Model Training<br/>(Batch)"]
    ON --> INF["Real-time Inference"]

    style RAW fill:#e8eaf6,stroke:#3f51b5
    style ING fill:#fff3e0,stroke:#ef6c00
    style REG fill:#f3e5f5,stroke:#9c27b0
    style OFF fill:#e8eaf6,stroke:#3f51b5
    style ON fill:#e8f5e9,stroke:#4caf50
    style TRAIN fill:#e8eaf6,stroke:#3f51b5
    style INF fill:#e8f5e9,stroke:#4caf50

Offline vs Online Store

Store Purpose Latency Storage Use Case
Offline Training data Seconds-minutes Data Lake (S3, BigQuery) Model training, analytics
Online Real-time serving <10ms KV Store (Redis, DynamoDB) Real-time predictions

Point-in-Time Correctness

Problem: Training data leakage when features are computed using future data.

Solution: Point-in-time joins ensure features reflect state at prediction time.

-- Point-in-time correct feature retrieval
SELECT
    e.entity_id,
    e.event_time,
    f.feature_value
FROM events e
JOIN feature_history f
    ON e.entity_id = f.entity_id
    AND f.feature_time <= e.event_time
    AND f.feature_time > e.event_time - INTERVAL '7 days'

Feature Store Comparison

Feature Feast Tecton SageMaker
Type Open-source Managed AWS-native
Cost Infra only Usage-based Pay-per-use
Streaming Limited Excellent Via Kinesis
Best For Full control Enterprise scale AWS ecosystem

Python: Feast Example

from feast import Entity, FeatureView, Field
from feast.types import Float32, Int64
from datetime import timedelta

# Define entity
user = Entity(
    name="user_id",
    join_keys=["user_id"],
)

# Define feature view
user_features = FeatureView(
    name="user_transaction_features",
    entities=[user],
    ttl=timedelta(days=365),
    schema=[
        Field(name="total_transactions", dtype=Int64),
        Field(name="avg_transaction_value", dtype=Float32),
    ],
    online=True,
    source=batch_source,  # BigQuery, Snowflake, etc.
)

# Retrieve features for inference
from feast import FeatureStore

store = FeatureStore(repo_path=".")

# Online serving (real-time)
features = store.get_online_features(
    features=[
        "user_transaction_features:total_transactions",
        "user_transaction_features:avg_transaction_value",
    ],
    entity_rows=[{"user_id": 12345}]
).to_dict()

# Offline training (batch)
training_df = store.get_historical_features(
    entity_df=entity_df,  # Contains user_id and event_time
    features=[
        "user_transaction_features:total_transactions",
        "user_transaction_features:avg_transaction_value",
    ],
).to_df()

Key Concepts

Concept Description
Feature View Logical grouping of related features
Entity Primary key for feature lookup (user_id, item_id)
TTL Time-to-live for feature freshness
Materialization Process of writing features to online store
Feature Service Collection of features for a specific model

Interview Questions

Q: Зачем нужен Feature Store?

A: Feature Store решает три проблемы: (1) Training-serving skew — одинаковые features для train и inference; (2) Reusability — features используются многократно разными командами; (3) Governance — versioning, lineage, access control для features.

Q: В чём разница между Online и Offline Store?

A: Offline Store — для training, хранит historical data в data lake/warehouse, оптимизирован для throughput. Online Store — для real-time inference, хранит только latest features в low-latency KV store (Redis), оптимизирован для latency (<10ms).

Q: Что такое Point-in-Time Correctness?

A: При обучении нужно использовать features такими, какими они были на момент события (prediction time), а не текущими. Иначе — data leakage: модель видит "future" информацию. Feature Store автоматически делает point-in-time joins.

Q: Feast vs Tecton vs SageMaker — когда что выбирать?

A: Feast — для команд с limited budget, нужен full control over infrastructure. Tecton — для enterprise с heavy streaming needs, managed service. SageMaker — если уже в AWS ecosystem, нужна tight integration с SageMaker pipelines.


10. ML Infrastructure (Model Registry & Experiment Tracking)

Источники: ML Journey "Model Versioning Strategies" (Sep 2025), Conduktor "Real-Time ML Pipelines" (Feb 2026)

Why ML Infrastructure Matters

Проблема: ML development is inherently experimental — constant changes to data, hyperparameters, architectures. Without proper infrastructure: - Невозможно reproduce results - Нет audit trail для compliance - Training-serving mismatch - Команды дублируют work

Core Components

Component Purpose Popular Tools
Experiment Tracking Log parameters, metrics, artifacts MLflow, W&B, Neptune, Comet
Model Registry Version models, stage transitions MLflow Registry, W&B Artifacts
Data Versioning Track datasets, lineage DVC, Delta Lake, LakeFS
Pipeline Orchestration Automate ML workflows Kubeflow, Airflow, Prefect, ZenML

Tool Comparison

Feature MLflow Weights & Biases DVC
Type Open-source SaaS Open-source
Experiment Tracking Excellent Excellent Basic
Model Registry Built-in Built-in Via Git
Visualization Good Excellent Basic
Pipeline Management Projects Limited Excellent
Cost Self-hosted Usage-based Free
Best For End-to-end lifecycle Team collaboration Git-based workflows

MLflow Architecture

MLflow Components (см. схему ниже)
graph TD
    ML["MLflow"]
    ML --> TR["Tracking<br/>(experiments)"]
    ML --> PR["Projects<br/>(packaging)"]
    ML --> MO["Models<br/>(deployment)"]
    ML --> RE["Registry<br/>(lifecycle)"]
    TR --> BS["Backend Store<br/>(PostgreSQL)"]
    PR --> ENV["Conda/Docker<br/>Environments"]
    MO --> SRV["Serving Tools<br/>(REST, Batch)"]
    RE --> STG["Staging<br/>(Prod/Archived)"]

    style ML fill:#f3e5f5,stroke:#9c27b0
    style TR fill:#e8eaf6,stroke:#3f51b5
    style PR fill:#e8eaf6,stroke:#3f51b5
    style MO fill:#e8eaf6,stroke:#3f51b5
    style RE fill:#e8eaf6,stroke:#3f51b5
    style BS fill:#e8f5e9,stroke:#4caf50
    style ENV fill:#e8f5e9,stroke:#4caf50
    style SRV fill:#e8f5e9,stroke:#4caf50
    style STG fill:#e8f5e9,stroke:#4caf50

Python: MLflow Example

import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier

# Start experiment
mlflow.set_experiment("fraud_detection_v2")

with mlflow.start_run():
    # Log parameters
    mlflow.log_param("n_estimators", 100)
    mlflow.log_param("max_depth", 10)

    # Train model
    model = RandomForestClassifier(n_estimators=100, max_depth=10)
    model.fit(X_train, y_train)

    # Log metrics
    accuracy = model.score(X_test, y_test)
    mlflow.log_metric("accuracy", accuracy)
    mlflow.log_metric("f1_score", f1_score(y_test, model.predict(X_test)))

    # Log model
    mlflow.sklearn.log_model(model, "model")

    # Register model
    mlflow.register_model(
        f"runs:/{mlflow.active_run().info.run_id}/model",
        "fraud_detection_model"
    )

Model Registry Workflow

1. Experiment → 2. Staging → 3. Production → 4. Archived
        │              │              │              │
   Many runs      Manual/       Auto-scale    Keep for
   tracked        Auto approve   serving      rollback

Model Stages: - None/Archived — Initial or retired models - Staging — Ready for testing/validation - Production — Live serving - Challenger — A/B testing against champion

Python: Model Registry Operations

from mlflow.tracking import MlflowClient

client = MlflowClient()

# List all versions of a model
versions = client.search_model_versions("name='fraud_detection_model'")

# Transition model to production
client.transition_model_version_stage(
    name="fraud_detection_model",
    version=3,
    stage="Production"
)

# Get production model
prod_version = client.get_latest_versions(
    "fraud_detection_model",
    stages=["Production"]
)[0]

# Load production model
model = mlflow.sklearn.load_model(f"models:/fraud_detection_model/Production")

Decision Framework

Scenario Choose
Small team, budget-constrained MLflow (self-hosted)
Heavy visualization needs W&B
Git-centric workflow DVC + MLflow
Enterprise compliance MLflow + Databricks
Cloud-native, AWS SageMaker + Model Registry

Interview Questions

Q: Зачем нужен Model Registry?

A: Model Registry обеспечивает: (1) Versioning — каждая версия модели сохранена; (2) Staging — workflow от testing до production; (3) Lineage — связь с experiment, data, code; (4) Rollback — возможность вернуться к предыдущей версии; (5) Approval workflow — процесс promotion в production.

Q: MLflow vs Weights & Biases — когда что?

A: MLflow — open-source, self-hosted, для команд которые хотят full control, end-to-end lifecycle (tracking + registry + deployment). W&B — SaaS, superior visualization, team collaboration, quick setup, но ongoing cost. Для enterprise compliance — MLflow, для speed/collaboration — W&B.

Q: Что такое experiment tracking?

A: Experiment tracking — систематическое логирование всех aspects ML эксперимента: hyperparameters, metrics, artifacts (models, datasets), code version, environment. Позволяет сравнивать experiments, reproduce results, найти лучший model. Инструменты: MLflow Tracking, W&B, Neptune.

Q: Как организовать ML platform для команды из 20 ML engineers?

A: Ключевые компоненты: (1) Experiment Tracking (MLflow/W&B) — central server для всех команд; (2) Model Registry — единое место для production models; (3) Feature Store — shared features; (4) CI/CD — automated training и deployment pipelines; (5) Monitoring — model performance в production; (6) Access control — RBAC для compliance.


11. Multi-Armed Bandits (Exploration vs Exploitation)

Лучшие источники

Practical Guides: - Philipp Dubach: Bandits and Agents - Netflix/Spotify 2026 (Jan 2026) - Statsig: Thompson Sampling (June 2025)

Academic: - Russo et al.: Tutorial on Thompson Sampling

The Exploration-Exploitation Dilemma

Classic problem: Casino slot machines with unknown payouts. Do you keep playing the machine that's been paying out, or try others?

Regret formula: $\(R(T) = \mu^* \cdot T - \sum_{t=1}^{T} \mu(a_t)\)$

Where: - \(\mu^*\) = best possible average reward - \(\mu(a_t)\) = reward from action at time \(t\) - Goal: Make regret sublinear in \(T\) (learn fast enough)

Algorithm Comparison

Algorithm Strategy Pros Cons
ε-greedy Random exploration (ε% of time) Simple, easy to implement Explores blindly
UCB Prefer uncertain actions Guided exploration Can be too optimistic
Thompson Sampling Sample from posterior Bayesian, adaptive Computational overhead

ε-Greedy

Algorithm: 1. With probability ε: choose random action (explore) 2. With probability 1-ε: choose best known action (exploit)

import numpy as np

def epsilon_greedy(n_arms, n_rounds, true_rewards, epsilon=0.1):
    rewards = np.zeros(n_arms)
    counts = np.zeros(n_arms)

    for t in range(n_rounds):
        if np.random.random() < epsilon:
            arm = np.random.randint(n_arms)  # Explore
        else:
            arm = np.argmax(rewards / (counts + 1e-10))  # Exploit

        reward = np.random.binomial(1, true_rewards[arm])
        rewards[arm] += reward
        counts[arm] += 1

    return rewards / (counts + 1e-10)

Upper Confidence Bound (UCB)

Formula: $\(UCB_i = \bar{X}_i + \sqrt{\frac{2\ln n}{n_i}}\)$

Where: - \(\bar{X}_i\) = average reward for arm \(i\) - \(n\) = total rounds - \(n_i\) = times arm \(i\) was pulled

Intuition: "Optimism in the face of uncertainty" — prefer actions with high uncertainty.

def ucb1(n_arms, n_rounds, true_rewards):
    rewards = np.zeros(n_arms)
    counts = np.zeros(n_arms)

    # Initial exploration: pull each arm once
    for arm in range(n_arms):
        rewards[arm] = np.random.binomial(1, true_rewards[arm])
        counts[arm] = 1

    for t in range(n_arms, n_rounds):
        ucb_values = rewards / counts + np.sqrt(2 * np.log(t) / counts)
        arm = np.argmax(ucb_values)
        reward = np.random.binomial(1, true_rewards[arm])
        rewards[arm] += reward
        counts[arm] += 1

    return rewards / counts

Thompson Sampling (Bayesian)

Key idea: Maintain probability distributions over rewards, sample and pick the best.

For Bernoulli rewards (click/no-click): Use Beta distribution.

from scipy.stats import beta

def thompson_sampling(n_arms, n_rounds, true_rewards):
    # Beta distribution parameters: α (successes), β (failures)
    alphas = np.ones(n_arms)
    betas = np.ones(n_arms)

    for t in range(n_rounds):
        # Sample from each arm's posterior
        samples = beta.rvs(alphas, betas)
        arm = np.argmax(samples)

        reward = np.random.binomial(1, true_rewards[arm])

        # Update posterior
        alphas[arm] += reward
        betas[arm] += 1 - reward

    return alphas / (alphas + betas)

Why Thompson Sampling wins: - Explores more when uncertain, exploits more when confident - Adapts naturally to changing conditions - No manual tuning of exploration rate

Contextual Bandits (LinUCB)

When: Actions depend on context (user features, time of day, etc.)

LinUCB formula: $\(UCB_i = x_i^T \hat{\theta}_i + \alpha \sqrt{x_i^T A_i^{-1} x_i}\)$

Where: - \(x_i\) = context vector - \(\hat{\theta}_i\) = learned parameters - \(A_i\) = design matrix for arm \(i\)

When to Use Bandits vs A/B Tests

A/B Test Bandit
Goal Statistical significance Maximize reward
Duration Fixed period Continuous
Exploration Equal allocation Adaptive
Best for Learning, papers Production optimization
Regret Linear in T Sublinear in T

Use bandits when: - Continuous optimization (ads, recommendations) - Delayed feedback acceptable - Can't afford equal allocation to bad options

Use A/B tests when: - Need statistical rigor (publishing, compliance) - Comparing few variants - Need to understand why something works

Production Use Cases

Netflix: Multi-armed bandit for homepage recommendations - Offline: Train collaborative filtering models - Nearline: Update user embeddings after clicks - Online: Real-time personalization with bandits

Spotify AI DJ: Hybrid approach - "Agentic router" decides: LLM or classical bandit - Complex queries → expensive reasoning - Simple queries → fast bandit path

Interview Questions

Q: When would you use Thompson Sampling over ε-greedy?

Thompson Sampling adapts exploration based on uncertainty. It explores more when uncertain, exploits more when confident. ε-greedy explores blindly regardless of confidence. Use Thompson for non-stationary environments, ε-greedy for simplicity.

Q: What's the difference between bandits and A/B testing?

A/B tests allocate traffic equally for statistical rigor. Bandits adapt allocation based on performance. Bandits minimize regret, A/B tests find significance. Use bandits for continuous optimization, A/B tests for learning.

Q: What are contextual bandits?

Bandits where the reward depends on context (user features, time, device). LinUCB and Thompson Sampling with linear payoff are common algorithms. Netflix uses contextual bandits: recommendations depend on user history, time of day, device.

Q: How does Netflix use bandits in production?

Three-tier architecture: Offline (train deep models), Nearline (update embeddings after clicks), Online (real-time bandit decisions). The goal is "incrementality" — not what users will watch, but what they wouldn't have found without the recommendation.


12. Online Learning (Streaming ML)

Лучшие источники

Practical Guides: - ML Journey: Online Learning Algorithms for Streaming Data (Nov 2025) - Conduktor: Real-Time ML Pipelines (Feb 2026) - Medium: Real-Time ML with partial_fit (July 2025)

Libraries: - River: ML for Streaming Data — Python library for online/incremental learning - scikit-learn partial_fit estimators — incremental learning support

Apache Flink: - Confluent: Flink for Model Inference (Oct 2025) - Apache Flink 2.2.0 ML Release — online learning capabilities

Online vs Batch Learning

Batch Learning Online Learning
Training Retrain on all data Update with each sample
Memory O(dataset size) O(model size)
Adaptation Periodic Real-time
Best for Static distributions Streaming, concept drift

Online Gradient Descent

Key difference: Update weights after each sample (or mini-batch), not after epoch.

import numpy as np

class OnlineLinearRegression:
    def __init__(self, n_features, learning_rate=0.01):
        self.weights = np.zeros(n_features)
        self.lr = learning_rate

    def partial_fit(self, X, y):
        """Update model with one or more samples."""
        for xi, yi in zip(X, y):
            prediction = np.dot(xi, self.weights)
            error = yi - prediction
            self.weights += self.lr * error * xi
        return self

    def predict(self, X):
        return X @ self.weights

# Usage in streaming context
model = OnlineLinearRegression(n_features=10)
for batch in data_stream:
    X_batch, y_batch = preprocess(batch)
    model.partial_fit(X_batch, y_batch)
    predictions = model.predict(new_data)

Regret Framework

Goal: Minimize regret compared to the best fixed decision.

\[\text{Regret}(T) = \sum_{t=1}^{T} \ell_t(w_t) - \min_{w} \sum_{t=1}^{T} \ell_t(w)\]

Good online learners have sublinear regret: \(\text{Regret}(T) = O(\sqrt{T})\)

FTRL-Proximal (Follow-The-Regularized-Leader)

Used by: Google for large-scale click-through rate prediction.

Update rule: $\(w_{t+1} = \arg\min_w \left( \sum_{s=1}^{t} g_s \cdot w + \lambda_1 \|w\|_1 + \frac{\lambda_2}{2} \|w\|_2^2 \right)\)$

  • L1 regularization → sparsity (critical for high-dimensional features)
  • L2 regularization → stability
import numpy as np

class FTRLProximal:
    def __init__(self, n_features, alpha=0.1, beta=1.0, l1=1.0, l2=1.0):
        self.z = np.zeros(n_features)  # accumulated gradients
        self.n = np.zeros(n_features)  # accumulated squared gradients
        self.alpha = alpha
        self.beta = beta
        self.l1 = l1
        self.l2 = l2

    def predict(self, x):
        w = self._get_weights()
        return np.dot(x, w)

    def _get_weights(self):
        """Compute sparse weights from accumulated statistics."""
        w = np.zeros_like(self.z)
        mask = np.abs(self.z) > self.l1
        w[mask] = -(self.z[mask] - np.sign(self.z[mask]) * self.l1) / \
                   ((self.beta + np.sqrt(self.n[mask])) / self.alpha + self.l2)
        return w

    def update(self, x, y):
        """Update with one sample (y ∈ {0, 1} for logistic)."""
        w = self._get_weights()
        p = 1 / (1 + np.exp(-np.dot(x, w)))  # sigmoid
        g = (p - y) * x  # gradient
        sigma = (np.sqrt(self.n + g ** 2) - np.sqrt(self.n)) / self.alpha
        self.z += g - sigma * w
        self.n += g ** 2

Concept Drift Detection

Types of drift: 1. Sudden: Distribution changes abruptly (system update, bug) 2. Gradual: Slow shift over time (user preferences, seasonality) 3. Incremental: Continuous small changes 4. Recurring: Patterns repeat (seasonal)

Detection methods:

Method Type Idea
ADWIN Window-based Adaptive window size, detects change when cutpoint found
DDM Error-based Monitors error rate, alerts when error + std dev increase
EDDM Distance-based Tracks distance between classification errors
Page-Hinkley Statistical Detects mean change in Gaussian stream
from river import drift

# ADWIN for concept drift detection
adwin = drift.ADWIN(delta=0.002)

for i, (x, y) in enumerate(data_stream):
    # Make prediction
    y_pred = model.predict_one(x)

    # Update model
    model.learn_one(x, y)

    # Check for drift (using accuracy as metric)
    correct = 1 if y_pred == y else 0
    adwin.update(correct)

    if adwin.drift_detected:
        print(f"Drift detected at sample {i}")
        # Option 1: Reset model
        # model = reset_model()
        # Option 2: Incremental adaptation
        # model.adapt_to_drift()

Hoeffding Trees (VFDT)

For streaming data: Build decision trees that are "probably approximately correct" with high probability.

Hoeffding bound: $\(\epsilon = \sqrt{\frac{R^2 \ln(1/\delta)}{2n}}\)$

Where: - \(R\) = range of the attribute (e.g., 1 for Bernoulli) - \(\delta\) = confidence (1 - probability) - \(n\) = number of samples seen

Idea: If the best split attribute is \(\epsilon\) better than the second-best, split with confidence \(1-\delta\).

from river import tree

# Hoeffding Adaptive Tree (HAT)
model = tree.HoeffdingAdaptiveTreeClassifier(
    grace_period=100,      # Min samples before split evaluation
    max_depth=10,          # Tree depth limit
    split_confidence=1e-7, # Hoeffding bound δ
    tie_threshold=0.05     # Minimum gain difference
)

# Online training
for x, y in data_stream:
    y_pred = model.predict_one(x)
    model.learn_one(x, y)

Architecture:

Kafka → Flink DataStream → Feature Engineering → Model Inference → Kafka/DB
                         Online Learning (optional)

Flink ML example:

// Flink ML pipeline for streaming
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Read from Kafka
DataStream<Record> stream = env
    .addSource(new FlinkKafkaConsumer<>("topic", schema, props));

// Feature extraction
DataStream<Features> features = stream
    .process(new FeatureExtractor());

// Online model inference
DataStream<Prediction> predictions = features
    .map(new ModelInference(model));

// Write predictions
predictions.addSink(new FlinkKafkaProducer<>("predictions", schema, props));

Production Considerations

Challenges: 1. Data quality: Missing values, outliers in real-time 2. Feature engineering: Must be computed on-the-fly 3. Model versioning: A/B between model versions 4. Backpressure: Handle bursty traffic

Best practices: - Buffer accumulation: Accumulate features for mini-batch updates - Shadow mode: Deploy new model alongside old, compare - Canary deployment: Gradual rollout to subset of traffic - Rollback plan: Quick way to revert to previous model

Interview Questions

Q: When would you use online learning vs batch learning?

Online learning when: (1) Data arrives continuously (logs, sensors), (2) Concept drift is expected (user preferences), (3) Memory constraints (can't store all data), (4) Low latency requirements. Batch learning when: (1) Static distribution, (2) Need optimal accuracy, (3) Can afford periodic retraining, (4) Want to use all data.

Q: How do you detect concept drift in production?

Monitor model performance metrics over time. ADWIN (Adaptive Windowing) automatically adjusts window size when distribution changes. DDM (Drift Detection Method) tracks error rate and alerts when error significantly increases. EDDM (Early Drift Detection Method) monitors distance between errors. Set up alerts for: accuracy drops, feature distribution shifts (PSI), prediction confidence changes.

Q: What's the difference between online learning and incremental learning?

Often used interchangeably, but: Online learning = update after each sample (or small batch). Incremental learning = ability to learn from new data without catastrophic forgetting. All online learners are incremental, but not all incremental methods are online (e.g., fine-tuning on new data batch).

Q: How does River library compare to scikit-learn's partial_fit?

River is built specifically for streaming/online learning: (1) All algorithms support online learning, (2) Built-in drift detection (ADWIN, DDM), (3) Streaming metrics that update incrementally, (4) Better memory efficiency. scikit-learn's partial_fit: (1) Only some estimators support it, (2) No drift detection, (3) Same API but limited functionality. Use River for streaming, scikit-learn when batch training with occasional incremental updates.

Q: How would you implement real-time fraud detection with online learning?

(1) Stream: Kafka for transaction events, (2) Features: Real-time feature store for user history, (3) Model: FTRL-Proximal for sparse high-dimensional features, (4) Drift detection: ADWIN on prediction accuracy, (5) Serving: Flink for feature extraction + model inference, (6) Fallback: Rule-based system if model confidence low, (7) Monitoring: Track precision/recall on labeled feedback, (8) Update: Online model updates from delayed labels.


13. Multi-Stage Recommender Systems

Лучшие источники

Architecture Deep Dives: - Shaped.ai: Two-Tower Model Deep Dive (May 2025) - Fan Luo: Design a Modern Recommendation System (Oct 2025) - Shaped.ai: Anatomy of Modern Ranking Architectures (Oct 2025)

Production Case Studies: - YouTube Recommendation Paper (2016) — foundational multi-stage architecture - Allegro.com: Two Tower Recommendations (Jul 2025) — real-world production story - Netflix Research: Recommendations (2025)

Interview Prep: - Hello Interview: Design Recommender System - IGotAnOffer: ML System Design

The Funnel Architecture

Why multi-stage? Scale. YouTube has billions of videos, Netflix has millions. Can't rank all candidates in <200ms.

graph TD
    ALL["1B+ items"] --> S1["Stage 1: RETRIEVAL<br/>Two-Tower, MF, ANN<br/>Output: ~1000, ~10ms"]
    S1 --> S2["Stage 2: PRE-RANKING<br/>Light model (small DNN)<br/>Output: ~500, ~20ms"]
    S2 --> S3["Stage 3: RANKING<br/>Deep model (DIN, DCN, DeepFM)<br/>Output: ~100, ~100ms"]
    S3 --> S4["Stage 4: RE-RANKING<br/>Diversity, freshness, business rules<br/>Output: Final ~20, ~20ms"]

    style ALL fill:#e8eaf6,stroke:#3f51b5
    style S1 fill:#e8f5e9,stroke:#4caf50
    style S2 fill:#fff3e0,stroke:#ef6c00
    style S3 fill:#f3e5f5,stroke:#9c27b0
    style S4 fill:#fce4ec,stroke:#c62828

Total Latency: ~150ms (within 200ms budget)

Stage 1: Retrieval (Two-Tower Architecture)

Core idea: Learn embeddings for users and items, find nearest neighbors.

graph TD
    UF["User Features"] --> UT["User Tower (DNN)"]
    IF["Item Features"] --> IT["Item Tower (DNN)"]
    UT --> UE["User Embedding"]
    IT --> IE["Item Embedding"]
    UE --> DOT["Dot Product<br/>Similarity Score"]
    IE --> DOT

    style UF fill:#e8eaf6,stroke:#3f51b5
    style IF fill:#e8eaf6,stroke:#3f51b5
    style UT fill:#f3e5f5,stroke:#9c27b0
    style IT fill:#f3e5f5,stroke:#9c27b0
    style UE fill:#e8f5e9,stroke:#4caf50
    style IE fill:#e8f5e9,stroke:#4caf50
    style DOT fill:#fff3e0,stroke:#ef6c00

Training with in-batch negatives:

import torch
import torch.nn as nn

class TwoTowerModel(nn.Module):
    def __init__(self, user_dim, item_dim, embedding_dim=64):
        super().__init__()
        self.user_tower = nn.Sequential(
            nn.Linear(user_dim, 256),
            nn.ReLU(),
            nn.Linear(256, embedding_dim)
        )
        self.item_tower = nn.Sequential(
            nn.Linear(item_dim, 256),
            nn.ReLU(),
            nn.Linear(256, embedding_dim)
        )
        self.temperature = 0.1

    def encode_user(self, user_features):
        return self.user_tower(user_features)

    def encode_item(self, item_features):
        return self.item_tower(item_features)

    def forward(self, user_features, item_features):
        user_emb = self.encode_user(user_features)  # [batch, dim]
        item_emb = self.encode_item(item_features)  # [batch, dim]

        # Normalize for cosine similarity
        user_emb = nn.functional.normalize(user_emb, dim=-1)
        item_emb = nn.functional.normalize(item_emb, dim=-1)

        # In-batch negatives: [batch, batch]
        logits = torch.matmul(user_emb, item_emb.T) / self.temperature

        # Labels are diagonal (positive pairs)
        labels = torch.arange(user_features.size(0), device=user_features.device)

        return nn.CrossEntropyLoss()(logits, labels)

ANN Indexes (FAISS, ScaNN)

Why ANN? Exact nearest neighbor is O(N). ANN is O(log N) with ~95% recall.

FAISS (Facebook AI Similarity Search):

import faiss
import numpy as np

# Build index (item embeddings from Two-Tower)
n_items = 1_000_000
embedding_dim = 64
item_embeddings = np.random.randn(n_items, embedding_dim).astype('float32')

# IVF + HNSW index (good balance of speed/recall)
nlist = 100  # number of clusters
quantizer = faiss.IndexHNSWFlat(embedding_dim, 32)
index = faiss.IndexIVFFlat(quantizer, embedding_dim, nlist, faiss.METRIC_INNER_PRODUCT)

# Train on sample
index.train(item_embeddings[:10000])
index.add(item_embeddings)

# Search
user_emb = np.random.randn(1, embedding_dim).astype('float32')
k = 100  # retrieve top 100
distances, indices = index.search(user_emb, k)

ScaNN (Google):

# ScaNN often faster for inner product search
import scann

# Build searchable index
searcher = scann.scann_ops_pybind.builder(item_embeddings, 100, "dot_product") \
    .tree(num_leaves=100, num_leaves_to_search=10) \
    .score_ah(2, anisotropic_quantization_threshold=0.2) \
    .reorder(100) \
    .build()

# Query
neighbors, distances = searcher.search(user_emb[0])  # single vector

Index comparison:

Index Speed Recall Memory Best for
IVF Fast ~95% Low General purpose
HNSW Very fast ~97% High Low latency
ScaNN Fastest ~96% Medium Inner product
Flat Slow 100% Low Small scale

Stage 3: Ranking (Deep Models)

Input: ~500 candidates with rich features.

Architecture options:

Model Key Idea When to Use
DIN (Deep Interest Network) Attention over user history Long user history
DCN (Deep & Cross) Explicit feature crossing High-order interactions
DeepFM Factorization + Deep Sparse categorical
DCNv2 Low-rank crossing Large scale

Ranking model structure:

class RankingModel(nn.Module):
    def __init__(self, feature_dim):
        super().__init__()
        # Feature interactions
        self.cross_layer = CrossLayer(feature_dim, num_layers=3)
        # Deep part
        self.deep = nn.Sequential(
            nn.Linear(feature_dim, 512),
            nn.ReLU(),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Linear(256, 1)
        )

    def forward(self, features):
        cross_out = self.cross_layer(features)
        deep_out = self.deep(features)
        return torch.sigmoid(cross_out + deep_out)

Stage 4: Re-ranking

Goals beyond relevance: 1. Diversity: Don't show 10 similar items 2. Freshness: Promote new content 3. Business rules: Remove watched, banned content 4. Fairness: Don't dominate from one source

Diversity techniques: - MMR (Maximal Marginal Relevance): Balance relevance and diversity - DPP (Determinantal Point Process): Mathematically principled diversity

def mmr_rerank(scores, embeddings, lambda_param=0.5, k=20):
    """Maximal Marginal Relevance re-ranking."""
    selected = []
    remaining = list(range(len(scores)))

    for _ in range(k):
        if not remaining:
            break

        best_score = -np.inf
        best_idx = None

        for idx in remaining:
            # Relevance score
            relevance = scores[idx]

            # Diversity penalty (max similarity to already selected)
            if selected:
                similarities = [
                    np.dot(embeddings[idx], embeddings[s])
                    for s in selected
                ]
                diversity_penalty = max(similarities)
            else:
                diversity_penalty = 0

            # MMR score
            mmr_score = lambda_param * relevance - (1 - lambda_param) * diversity_penalty

            if mmr_score > best_score:
                best_score = mmr_score
                best_idx = idx

        selected.append(best_idx)
        remaining.remove(best_idx)

    return selected

Production Architecture (YouTube-scale)

graph TD
    FS["Feature Store<br/>User history, item features"] --> CG["Candidate Gen<br/>Two-Tower ANN, Collaborative, Trending"]
    CG -->|~1000 candidates| RM["Ranking Model<br/>(Deep model on GPU)"]
    RM -->|~100 ranked| RR["Re-ranking<br/>Diversity, freshness, filters"]
    RR -->|~20 final| RESP["Response to User"]

    style FS fill:#e8eaf6,stroke:#3f51b5
    style CG fill:#e8f5e9,stroke:#4caf50
    style RM fill:#f3e5f5,stroke:#9c27b0
    style RR fill:#fff3e0,stroke:#ef6c00
    style RESP fill:#e8f5e9,stroke:#4caf50

Interview Questions

Q: Why do we need multi-stage recommendation instead of a single model?

Scale and latency. If you have 1B items and need <200ms response, you can't score all items with a deep model. Multi-stage is a funnel: Retrieval quickly narrows to ~1000 candidates using simple models (Two-Tower + ANN). Ranking applies expensive deep models to ~500. Re-ranking enforces business rules. This is the only viable architecture at scale — used by YouTube, Netflix, TikTok.

Q: Explain Two-Tower architecture.

Two separate neural networks: User Tower encodes user features (history, demographics) into embedding. Item Tower encodes item features (ID, metadata) into embedding. Both towers share the same embedding dimension. Similarity = dot product of embeddings. Train with in-batch negatives (each user's positive item is contrasted with all other items in batch). At serving time, pre-compute item embeddings, build ANN index, query with user embedding.

Q: What's the difference between FAISS and ScaNN?

Both are ANN libraries for fast nearest neighbor search. FAISS (Meta) supports many index types (IVF, HNSW, PQ), more flexible, larger community. ScaNN (Google) optimized for inner product search, often faster for recommendation use cases. FAISS better for L2/cosine, ScaNN for dot product. Production choice often comes down to: (1) similarity metric, (2) latency requirements, (3) memory constraints.

Q: How do you handle cold start in recommendations?

For new users: (1) Ask for preferences on signup, (2) Use demographic/content-based features instead of history, (3) Show trending/popular items, (4) Bootstrap from similar users. For new items: (1) Use content features (title, category, creator), (2) Explore-exploit: show to small audience first, (3) Bandit algorithms for balancing new item exposure vs known good items.

Q: What is MMR and why is it used in re-ranking?

MMR (Maximal Marginal Relevance) balances relevance and diversity. Standard ranking gives similar items (e.g., 10 cat videos). MMR selects items that are both relevant AND different from already-selected items. Formula: MMR = λ × relevance - (1-λ) × max_similarity_to_selected. Higher λ = more relevance, lower = more diversity. Essential for user engagement (boredom from similar content) and discovery.

Q: How would you design a recommendation system for a new platform with no user history?

Start simple: (1) Trending/popular items for everyone, (2) Content-based filtering using item metadata, (3) Collaborative filtering as soon as you have interactions. Gradually add: (4) Two-Tower for retrieval when scale justifies, (5) Deep ranking models. Key insight: Don't over-engineer early. A simple popularity baseline + content-based often beats complex models with sparse data.


14. Causal Inference for ML

Лучшие источники

Tutorials: - ML Journey: Causal Inference with DoWhy and EconML (July 2025) - Medium: Building Causal Inference Project in Python (Nov 2025) - GitHub: Official DoWhy + EconML Tutorial

Interview Prep: - GrabNGoInfo: Top 10 Causal Inference Interview Questions - Medium: ATE vs CATE vs ATT vs ATC

The Fundamental Problem

Correlation ≠ Causation. We can never observe both potential outcomes for the same unit.

\[Y_i(1) - Y_i(0) = \text{Individual Treatment Effect (unobservable)}\]

Treatment Effect Definitions

Metric Definition Formula
ATE Average Treatment Effect \(E[Y(1) - Y(0)]\)
ATT Average Treatment Effect on Treated \(E[Y(1) - Y(0) \| T=1]\)
ATC Average Treatment Effect on Control \(E[Y(1) - Y(0) \| T=0]\)
CATE Conditional ATE (heterogeneous) \(E[Y(1) - Y(0) \| X=x]\)

Key Assumptions

1. SUTVA: No interference between units, one version of treatment 2. Unconfoundedness: \(Y(1), Y(0) \perp T \mid X\) — all confounders observed 3. Overlap: \(0 < P(T=1 \mid X) < 1\) — every unit can receive treatment 4. Consistency: Observed outcome = potential outcome under assigned treatment

Method 1: Propensity Score Matching

from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import NearestNeighbors
import numpy as np

def propensity_score_matching(X, treatment, outcome):
    # Step 1: Estimate propensity scores
    ps_model = LogisticRegression(max_iter=1000)
    ps_model.fit(X, treatment)
    propensity_scores = ps_model.predict_proba(X)[:, 1]

    # Step 2: Match treated to control units
    treated_idx = np.where(treatment == 1)[0]
    control_idx = np.where(treatment == 0)[0]

    nn = NearestNeighbors(n_neighbors=1)
    nn.fit(propensity_scores[control_idx].reshape(-1, 1))
    distances, matched_control_idx = nn.kneighbors(
        propensity_scores[treated_idx].reshape(-1, 1)
    )

    # Step 3: Calculate ATT
    treated_outcomes = outcome[treated_idx]
    matched_control_outcomes = outcome[control_idx[matched_control_idx.flatten()]]
    return np.mean(treated_outcomes - matched_control_outcomes)

Method 2: Difference-in-Differences (DiD)

Formula: $\(\text{DiD} = (Y_{treat, post} - Y_{treat, pre}) - (Y_{control, post} - Y_{control, pre})\)$

Assumption: Parallel trends — without treatment, both groups would follow same trend.

Method 3: Instrumental Variables (IV)

Requirements for instrument Z: 1. Relevance: \(Z\) affects \(T\) 2. Exclusion: \(Z\) affects \(Y\) only through \(T\) 3. Exogeneity: \(Z\) independent of confounders

Method 4: Uplift Modeling

Meta-learners: | Method | Description | Best for | |--------|-------------|----------| | S-Learner | Single model with treatment as feature | Simple cases | | T-Learner | Two models for T=0/1 | Heterogeneous effects | | X-Learner | T-Learner + correction | Imbalanced treatment |

When to Use Which Method

Situation Method
RCT data Direct comparison
Observational with confounders Propensity Score, DiD
Policy change over time Difference-in-Differences
Non-compliance in RCT Instrumental Variables
Personalization Uplift Modeling

Interview Questions

Q: What's the difference between ATE and ATT?

ATE = effect for entire population. ATT = effect for those who received treatment. They differ when there's selection bias. In RCTs, ATE = ATT. In observational data, ATT is often what we can credibly estimate.

Q: What is the parallel trends assumption in DiD?

Without treatment, treatment and control groups would follow same trend over time. Untestable but can assess pre-treatment trends. If groups were diverging before treatment, assumption violated.

Q: When would you use instrumental variables?

When there's unmeasured confounding but you have a valid instrument: (1) Random assignment with non-compliance, (2) Geographic variation, (3) Policy timing changes across states. Instrument must affect treatment (relevance) and affect outcome ONLY through treatment (exclusion).

Q: What is uplift modeling?

Predicts individual treatment effects to identify who to target. Goal: Find "persuadables" who respond positively, avoid "sleeping dogs" who respond negatively. Used in marketing, medicine, pricing.


15. Vector Databases for ML

Лучшие источники

Comparisons: - DataCamp: The 7 Best Vector Databases in 2026 - Markaicode: Pinecone vs Weaviate vs Milvus 2025 - Firecrawl: Best Vector Databases 2025

Index Deep Dives: - Medium: IVF vs HNSW vs PQ Index Guide (Jan 2026) - The Data Guy: Production Video Search Infrastructure (Dec 2025)

Hybrid Search: - TowardsAI: Hybrid Search RAG That Works (Jan 2026) - AILog: Hybrid Search for RAG Tutorial

What is a Vector Database?

Purpose: Store and search high-dimensional embeddings efficiently using Approximate Nearest Neighbor (ANN) algorithms.

Key operations: 1. Upsert: Add/update vectors with metadata 2. Query: Find k nearest neighbors to query vector 3. Delete: Remove vectors by ID or filter

ANN Index Types

Index Description Memory Recall Latency Best For
HNSW Hierarchical Navigable Small World graph High ~98% Very Low Real-time search, high recall
IVF Inverted File (cluster-based) Medium ~95% Low Large scale, batch workloads
IVF-PQ IVF + Product Quantization Low ~90% Low Memory-constrained, huge datasets
Flat Brute force High 100% High Small datasets, exact search

HNSW (Hierarchical Navigable Small World): - Graph-based: nodes = vectors, edges = nearest neighbors - Multi-layer: top layers are sparse, bottom layers are dense - Search: start from top, greedy descent to nearest neighbor

# HNSW parameters
hnsw_config = {
    "M": 16,              # Connections per node (higher = better recall, more memory)
    "ef_construction": 200, # Build-time neighbor list size
    "ef_search": 100,     # Search-time neighbor list size
    # Level multiplier ml = 1/ln(M), default for M=16: 1/ln(16) ~ 0.36
}

# Trade-offs:
# - Higher M: Better recall, more memory, slower build
# - Higher ef_construction: Better index quality, slower build
# - Higher ef_search: Better recall, slower queries

IVF (Inverted File Index): - Partition vectors into clusters using k-means - At query time: find nearest clusters, search only within them - nlist = number of clusters - nprobe = clusters to search (higher = better recall, slower)

# IVF parameters
ivf_config = {
    "nlist": 1000,    # Number of clusters (sqrt(N) is good starting point)
    "nprobe": 10,     # Clusters to search at query time
}

# Trade-offs:
# - Higher nlist: Faster search, more memory for centroids
# - Higher nprobe: Better recall, slower queries

Vector Database Comparison

Database Type Index Types Strengths Weaknesses
Pinecone Cloud-only Proprietary Zero ops, auto-scaling Expensive, vendor lock-in
Milvus Self-hosted/Cloud HNSW, IVF, PQ, GPU Open source, flexible Complex setup
Weaviate Self-hosted/Cloud HNSW GraphQL API, modules Higher memory
Qdrant Self-hosted/Cloud HNSW Rust-based, fast Smaller community
pgvector Postgres extension IVF, HNSW No new infra needed Limited features
Chroma Embedded HNSW Simple, Python-native Not for production scale

Python Examples

Qdrant (self-hosted/cloud):

from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct

client = QdrantClient(":memory:")  # or host="localhost", port=6333

# Create collection
client.create_collection(
    collection_name="documents",
    vectors_config=VectorParams(size=384, distance=Distance.COSINE)
)

# Upsert vectors
points = [
    PointStruct(id=i, vector=embeddings[i], payload={"text": texts[i]})
    for i in range(len(embeddings))
]
client.upsert(collection_name="documents", points=points)

# Query
results = client.search(
    collection_name="documents",
    query_vector=query_embedding,
    limit=10,
    score_threshold=0.7
)

Milvus (production scale):

from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType

connections.connect("default", host="localhost", port="19530")

# Define schema
fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True),
    FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=768)
]
schema = CollectionSchema(fields, "document collection")
collection = Collection("documents", schema)

# Create HNSW index
index_params = {
    "metric_type": "COSINE",
    "index_type": "HNSW",
    "params": {"M": 16, "efConstruction": 200}
}
collection.create_index("embedding", index_params)

# Search
collection.load()
results = collection.search(
    data=[query_embedding],
    anns_field="embedding",
    param={"metric_type": "COSINE", "params": {"ef": 100}},
    limit=10
)

Hybrid Search (Vector + Keyword)

Why hybrid? Pure vector search misses exact matches. BM25 misses semantics. Combine both.

Approach: 1. BM25 (keyword): Find documents with exact term matches 2. Vector (semantic): Find documents with similar meaning 3. RRF (Reciprocal Rank Fusion): Combine rankings

import numpy as np
from rank_bm25 import BM25Okapi

def hybrid_search(query, vector_db, documents, k=10, rrf_k=60):
    """
    Uses Reciprocal Rank Fusion (RRF) to combine vector and BM25 results.
    rrf_k: RRF constant (default 60), higher = more uniform weighting
    """
    # 1. Vector search
    query_embedding = embed(query)
    vector_results = vector_db.search(query_embedding, limit=k*2)
    vector_scores = {r.id: r.score for r in vector_results}

    # 2. BM25 keyword search
    tokenized_docs = [doc.split() for doc in documents]
    bm25 = BM25Okapi(tokenized_docs)
    bm25_scores = bm25.get_scores(query.split())

    # 3. Combine with RRF
    def rrf_score(rank):
        return 1 / (rrf_k + rank)

    # Get rankings
    vector_ranking = sorted(vector_scores.keys(), key=lambda x: vector_scores[x], reverse=True)
    bm25_ranking = sorted(range(len(documents)), key=lambda x: bm25_scores[x], reverse=True)

    # RRF fusion
    combined_scores = {}
    for rank, doc_id in enumerate(vector_ranking):
        combined_scores[doc_id] = combined_scores.get(doc_id, 0) + rrf_score(rank)
    for rank, doc_id in enumerate(bm25_ranking):
        combined_scores[doc_id] = combined_scores.get(doc_id, 0) + rrf_score(rank)

    # Return top k
    return sorted(combined_scores.keys(), key=lambda x: combined_scores[x], reverse=True)[:k]

Index Refresh Strategies

Challenge: How to update index when new documents arrive?

Strategy Description Use Case
Full rebuild Rebuild entire index periodically Small datasets, batch updates
Incremental Add new vectors to existing index Real-time, small updates
Dual index Maintain old + new index, switch atomically Zero-downtime updates

Best practices: - HNSW: Supports incremental updates but quality degrades. Rebuild periodically. - IVF: Requires full rebuild. Use dual-index strategy for zero downtime. - Batch updates: Collect updates, apply in bulk every N minutes/hours.

# Dual index pattern
class DualIndexManager:
    def __init__(self):
        self.current_index = create_index()
        self.next_index = None

    def search(self, query_vector):
        return self.current_index.search(query_vector)

    def start_rebuild(self, all_vectors):
        self.next_index = create_index()
        self.next_index.add(all_vectors)

    def swap_index(self):
        if self.next_index:
            old_index = self.current_index
            self.current_index = self.next_index
            self.next_index = None
            del old_index  # Cleanup

Interview Questions

Q: When would you use HNSW vs IVF?

HNSW for real-time search with high recall requirements (recommendations, chatbots). Lower latency (~1-5ms) but higher memory (~2-4x vector size). IVF for large-scale batch workloads or memory-constrained environments. IVF-PQ for billion-scale vectors where memory is the bottleneck. Trade-off: HNSW = speed + recall + memory, IVF-PQ = memory efficiency + lower recall.

Q: What is hybrid search and why is it important for RAG?

Hybrid search combines vector similarity (semantic) with keyword matching (BM25). Pure vector search can miss exact matches (product names, acronyms). Pure keyword search misses semantic similarity. Hybrid gives best of both: vector finds "laptop" when querying "computer", BM25 finds "GPT-4" exactly. Production RAG systems almost always use hybrid + reranking.

Q: How do you handle index updates in production?

Three strategies: (1) Incremental: Add vectors to existing HNSW index in real-time (quality degrades over time), (2) Periodic rebuild: Rebuild entire index every N hours (brief downtime or use dual index), (3) Dual index: Build new index in background, atomic swap when ready. Choice depends on update frequency, latency requirements, and acceptable staleness.

Q: Pinecone vs Milvus vs pgvector — how do you choose?

Pinecone: Zero ops, fastest to production, expensive at scale, vendor lock-in. Milvus: Full control, complex setup, open source, best for large scale. pgvector: No new infrastructure, limited features, good for adding vector search to existing Postgres app. Start with pgvector if already using Postgres, Pinecone for quick prototype, Milvus/Qdrant for production scale with cost control.

Q: What is RRF (Reciprocal Rank Fusion) in hybrid search?

RRF combines rankings from multiple retrievers by scoring each document as sum of 1/(k + rank). Formula: \(RRF(d) = \sum_{r \in R} \frac{1}{k + rank_r(d)}\). Intuition: Being ranked #1 by any retriever gives significant boost, but multiple retrievers agreeing gives even more. k=60 is common default. Simple, parameter-free, often beats weighted score combination.


16. Cost Optimization for ML Inference

Источники: Lambda Labs GPU Pricing (2026), Neptune.ai "Inference Cost Optimization" (Jan 2026), Runhouse "LLM Cost Optimization" (2025)

The Problem: Inference Dominates Costs

Key Insight: - Inference costs = 15-20x training costs for production models - LLM inference costs declined 10x annually (2023-2025) but remain significant - GPU utilization in most orgs: 10-30% (70-90% waste)

Cost Breakdown

Component % of Total Optimization Potential
GPU compute 60-70% High (spot, batching, quantization)
Memory/Storage 15-20% Medium (model compression, caching)
Network 5-10% Low (region selection)
Orchestration 5-10% Low (right-sizing)

Strategy 1: GPU Utilization

Problem: GPUs idle most of the time.

Solutions:

# 1. Request Batching (critical for GPUs)
from transformers import pipeline

# BAD: Sequential requests
for text in texts:
    result = pipe(text)  # GPU sits idle between requests

# GOOD: Batched requests
results = pipe(texts, batch_size=32)  # Full GPU utilization

# 2. Dynamic Batching (Triton Inference Server)
# Accumulate requests until batch_size or timeout
config = {
    "max_batch_size": 32,
    "batch_timeout_micros": 5000,  # 5ms max wait
}

GPU Utilization Metrics:

import pynvml

def get_gpu_utilization():
    pynvml.nvmlInit()
    handle = pynvml.nvmlDeviceGetHandleByIndex(0)
    util = pynvml.nvmlDeviceGetUtilizationRates(handle)
    return {
        "gpu": util.gpu,  # GPU compute %
        "memory": util.memory  # GPU memory %
    }

# Target: GPU utilization > 70%, Memory > 80%

Strategy 2: Spot Instances

What: Spare cloud capacity at 60-90% discount.

Provider Spot Discount Use Case
AWS Spot 70-90% Training, batch inference
GCP Preemptible 60-80% Training jobs
Lambda Labs 50-70% GPU-intensive workloads
Azure Spot 60-80% Stateful + stateless

Spot Instance Strategy:

# Spot instance with checkpointing
import torch

class SpotAwareTrainer:
    def __init__(self, checkpoint_interval=100):
        self.checkpoint_interval = checkpoint_interval

    def train_step(self, batch_idx):
        # Check for preemption signal
        if self.check_preemption():
            self.save_checkpoint("emergency")
            return False

        if batch_idx % self.checkpoint_interval == 0:
            self.save_checkpoint(f"step_{batch_idx}")

        # ... training logic
        return True

    def check_preemption(self):
        # AWS: http://169.254.169.254/latest/meta-data/spot/termination-time
        # GCP: http://metadata.google.internal/computeMetadata/v1/instance/preempted
        import requests
        try:
            resp = requests.get(
                "http://169.254.169.254/latest/meta-data/spot/termination-time",
                timeout=1
            )
            return resp.status_code == 200
        except:
            return False

When to Use Spot: - ✅ Training jobs (checkpointable) - ✅ Batch inference (resumable) - ✅ Data preprocessing - ❌ Real-time inference (need guaranteed capacity) - ❌ Distributed training with tight coupling

Strategy 3: Model Right-Sizing

Match Model to Task:

Task Type Recommended Model Cost/1M tokens
Simple classification DistilBERT, RoBERTa-tiny $0.01-0.05
Summarization T5-small, BART-base $0.05-0.10
RAG embeddings all-MiniLM-L6-v2 $0.01
Complex reasoning Llama-3-8B, Mistral-7B $0.20-0.50
Enterprise grade GPT-4, Claude Opus $15-30

Decision Framework:

def select_model(task_complexity, latency_requirement, budget):
    """
    Select optimal model based on constraints.

    task_complexity: 1-5 (1=simple, 5=very complex)
    latency_requirement: latency in ms
    budget: cost per 1M tokens
    """
    if task_complexity <= 2 and latency_requirement < 50:
        return "distilbert-base"  # $0.01/1M
    elif task_complexity <= 3 and budget < 1:
        return "llama-3-8b"  # $0.20/1M
    elif task_complexity >= 4:
        return "gpt-4" if budget > 10 else "llama-3-70b"
    else:
        return "mistral-7b"  # Default middle ground

Strategy 4: Inference Cost Per Prediction

Metrics to Track:

# Cost per prediction formula
cost_per_prediction = (
    gpu_hourly_cost * inference_time_seconds / 3600
) / batch_size

# Example calculation
# A100 GPU: $2.50/hour
# Inference time: 100ms
# Batch size: 1
cost_per_prediction = 2.50 * 0.1 / 3600 / 1  # = $0.000069 ≈ $0.07/1000

# With batching (batch_size=32)
cost_per_prediction = 2.50 * 0.1 / 3600 / 32  # = $0.000002 ≈ $0.002/1000

LLM Token Cost Calculator:

def calculate_llm_cost(
    input_tokens: int,
    output_tokens: int,
    model: str
) -> float:
    """Calculate cost for LLM API call."""

    pricing = {
        "gpt-4-turbo": {"input": 0.01, "output": 0.03},  # per 1K tokens
        "gpt-3.5-turbo": {"input": 0.0005, "output": 0.0015},
        "claude-3-opus": {"input": 0.015, "output": 0.075},
        "claude-3-sonnet": {"input": 0.003, "output": 0.015},
        "llama-3-8b-self-hosted": {"input": 0.0002, "output": 0.0002},
    }

    rates = pricing.get(model, pricing["llama-3-8b-self-hosted"])
    return (
        input_tokens * rates["input"] / 1000 +
        output_tokens * rates["output"] / 1000
    )

# Example: 500 input + 200 output tokens with GPT-4
cost = calculate_llm_cost(500, 200, "gpt-4-turbo")  # = $0.011

Strategy 5: Caching

Semantic Caching for LLMs:

import hashlib
from sentence_transformers import SentenceTransformer
import numpy as np

class SemanticCache:
    def __init__(self, similarity_threshold=0.95):
        self.encoder = SentenceTransformer('all-MiniLM-L6-v2')
        self.cache = {}  # embedding -> response
        self.threshold = similarity_threshold

    def get(self, query: str):
        query_emb = self.encoder.encode(query)

        for cached_emb, (response, count) in self.cache.items():
            similarity = np.dot(query_emb, cached_emb) / (
                np.linalg.norm(query_emb) * np.linalg.norm(cached_emb)
            )
            if similarity > self.threshold:
                count += 1  # Track cache hit
                return response, True

        return None, False

    def set(self, query: str, response: str):
        query_emb = self.encoder.encode(query)
        self.cache[tuple(query_emb)] = (response, 1)

# Usage
cache = SemanticCache()

def query_llm(prompt: str) -> str:
    cached, hit = cache.get(prompt)
    if hit:
        return cached  # Skip expensive API call

    response = call_llm_api(prompt)
    cache.set(prompt, response)
    return response

Strategy 6: Auto-Scaling

# Kubernetes HPA for inference
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: llm-inference-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: llm-inference
  minReplicas: 1
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: nvidia.com/gpu-utilization
      target:
        type: Utilization
        averageUtilization: 70  # Scale up at 70% GPU util
  - type: External
    external:
      metric:
        name: queue_length
      target:
        type: AverageValue
        averageValue: 100  # Scale up when queue > 100

Cost Optimization Decision Matrix

Technique Effort Savings When to Use
Batching Low 2-5x Always
Spot instances Low 60-80% Non-critical workloads
Model quantization Medium 2-4x Latency-sensitive
Model distillation High 5-10x High volume, fixed task
Semantic caching Medium 10-30% Repetitive queries
Right-sizing Low 2-5x Over-provisioned

Interview Questions

Q: How do you reduce inference costs by 70%?

(1) Batching: Dynamic batching increases GPU utilization from 20% to 80% (4x improvement). (2) Quantization: INT8 quantization halves memory and compute (2x). (3) Model selection: Use smaller models when possible (2-5x). Combined: 4 * 2 * 2 = 16x cost reduction, exceeding 70%.

Q: When should you use spot instances for ML?

Use spot for fault-tolerant workloads: training jobs (with checkpointing), batch inference, data preprocessing, hyperparameter tuning. Avoid for real-time inference where latency SLA matters, or tightly coupled distributed training without checkpointing. Always implement preemption detection and graceful shutdown.

Q: How do you calculate cost per prediction?

Cost/prediction = (GPU_hourly_cost * inference_time / 3600) / batch_size. For A100 ($2.50/hr), 100ms inference, batch=1: $0.000069/prediction. With batch=32: $0.000002/prediction. Track this metric over time to catch cost regressions.

Q: What is semantic caching and when is it useful?

Semantic caching stores LLM responses keyed by meaning rather than exact string match. Uses embedding similarity (>0.95) to identify semantically equivalent queries. Useful when users ask similar questions differently ("What is Python?" vs "Explain Python"). Typically achieves 10-30% cache hit rate in production, directly reducing API costs.


17. Multi-Model Serving and Model Routing

Источники: TrueFoundry "LLM Load Balancing" (2025), LogRocket "LLM Routing in Production" (2026), arXiv "Universal Model Routing" (2025)

The Multi-Model Problem

Why Multi-Model? - Different tasks need different capabilities - Cost optimization: small models for simple tasks - Redundancy: avoid single point of failure - A/B testing: compare model versions

Key Insight: Most production systems use 5-10 models simultaneously, with intelligent routing between them.

Routing Strategies Comparison

Strategy Description Use Case Complexity
Weighted Round-Robin Static % split per model A/B testing, canary Low
Latency-Based Route to fastest responding Real-time apps Medium
Cost-Aware Cheap models for simple tasks Cost optimization Medium
Confidence-Based Try cheap, escalate if uncertain Classification High
Cascade Sequential fallback chain Resilience Low
ML-Based Router Learned routing policy Production scale High

Strategy 1: Weighted Round-Robin

import random
from dataclasses import dataclass
from typing import List

@dataclass
class ModelEndpoint:
    name: str
    url: str
    weight: float  # 0.0 to 1.0

class WeightedRouter:
    def __init__(self, endpoints: List[ModelEndpoint]):
        self.endpoints = endpoints
        # Normalize weights
        total = sum(e.weight for e in endpoints)
        for e in endpoints:
            e.weight /= total

    def route(self) -> ModelEndpoint:
        """Select endpoint based on weight distribution."""
        r = random.random()
        cumulative = 0.0
        for endpoint in self.endpoints:
            cumulative += endpoint.weight
            if r <= cumulative:
                return endpoint
        return self.endpoints[-1]  # Fallback

# Usage: 80% to GPT-4, 20% to Claude
router = WeightedRouter([
    ModelEndpoint("gpt-4", "https://api.openai.com/v1/chat", 0.8),
    ModelEndpoint("claude", "https://api.anthropic.com/v1/messages", 0.2),
])

endpoint = router.route()

Strategy 2: Confidence-Based Routing (Cascade)

from dataclasses import dataclass

@dataclass
class ModelResponse:
    content: str
    confidence: float
    model: str

class CascadeRouter:
    """Try cheap model first, escalate to expensive if uncertain."""

    def __init__(self, cheap_model, expensive_model, threshold=0.85):
        self.cheap = cheap_model
        self.expensive = expensive_model
        self.threshold = threshold

    async def route(self, prompt: str) -> ModelResponse:
        # Try cheap model first
        response = await self.cheap.generate(prompt)

        if response.confidence >= self.threshold:
            # Good enough, return it
            return response

        # Low confidence, escalate to expensive model
        return await self.expensive.generate(prompt)

# Example setup
cascade = CascadeRouter(
    cheap_model=LLMClient("llama-3-8b"),
    expensive_model=LLMClient("gpt-4"),
    threshold=0.85
)

# If 70% of requests are confident, saves 70% of expensive calls

Strategy 3: Latency-Based Routing

import time
from collections import deque
from dataclasses import dataclass
from typing import Dict, Deque

@dataclass
class LatencyStats:
    samples: Deque[float]
    avg_latency: float

class LatencyAwareRouter:
    """Route to model with best recent latency."""

    def __init__(self, models: Dict[str, str], window_size=100):
        self.models = models
        self.stats = {
            name: LatencyStats(deque(maxlen=window_size), 0.0)
            for name in models
        }

    def record_latency(self, model: str, latency_ms: float):
        """Update rolling average latency."""
        stats = self.stats[model]
        stats.samples.append(latency_ms)
        stats.avg_latency = sum(stats.samples) / len(stats.samples)

    def route(self, fairness_factor: float = 1.2) -> str:
        """
        Select model with best latency, with fairness band.

        fairness_factor: choose any model within X% of fastest
        """
        best_model = None
        best_latency = float('inf')

        for name, stats in self.stats.items():
            if stats.avg_latency < best_latency:
                best_latency = stats.avg_latency
                best_model = name

        # Find all models within fairness band
        threshold = best_latency * fairness_factor
        candidates = [
            name for name, stats in self.stats.items()
            if stats.avg_latency <= threshold
        ]

        # Random choice among candidates (load balancing)
        import random
        return random.choice(candidates)

Strategy 4: Fallback Chain (Circuit Breaker)

from enum import Enum
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import List, Optional

class CircuitState(Enum):
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Failing, reject traffic
    HALF_OPEN = "half_open"  # Testing recovery

@dataclass
class CircuitBreaker:
    model: str
    state: CircuitState = CircuitState.CLOSED
    failure_count: int = 0
    last_failure: Optional[datetime] = None
    threshold: int = 5
    cooldown_minutes: int = 5

    def record_success(self):
        self.failure_count = 0
        self.state = CircuitState.CLOSED

    def record_failure(self):
        self.failure_count += 1
        self.last_failure = datetime.now()

        if self.failure_count >= self.threshold:
            self.state = CircuitState.OPEN

    def should_try(self) -> bool:
        """Check if we should attempt this model."""
        if self.state == CircuitState.CLOSED:
            return True

        if self.state == CircuitState.OPEN:
            # Check if cooldown passed
            if datetime.now() - self.last_failure > timedelta(minutes=self.cooldown_minutes):
                self.state = CircuitState.HALF_OPEN
                return True
            return False

        # HALF_OPEN: allow one test request
        return True

class FallbackRouter:
    """Chain of models with circuit breakers."""

    def __init__(self, models: List[str]):
        self.circuits = {m: CircuitBreaker(m) for m in models}
        self.order = models

    async def route(self, prompt: str) -> str:
        """Try models in order, skip failing ones."""
        for model in self.order:
            circuit = self.circuits[model]

            if not circuit.should_try():
                continue  # Model is in cooldown

            try:
                response = await self.call_model(model, prompt)
                circuit.record_success()
                return response
            except Exception as e:
                circuit.record_failure()
                continue

        raise Exception("All models failed")

    async def call_model(self, model: str, prompt: str) -> str:
        # Implementation depends on your model client
        pass

Strategy 5: A/B Testing Between Models

import hashlib
from dataclasses import dataclass
from typing import Dict

@dataclass
class ABTest:
    experiment_id: str
    variants: Dict[str, float]  # variant_name -> traffic %

class ABTestRouter:
    """A/B testing with consistent user assignment."""

    def __init__(self, experiments: Dict[str, ABTest]):
        self.experiments = experiments

    def get_variant(self, experiment_id: str, user_id: str) -> str:
        """
        Deterministically assign user to variant.
        Same user always gets same variant.
        """
        exp = self.experiments[experiment_id]

        # Hash user_id for consistent assignment
        hash_input = f"{experiment_id}:{user_id}"
        hash_value = int(hashlib.md5(hash_input.encode()).hexdigest(), 16)
        bucket = (hash_value % 100) / 100

        # Assign to variant based on bucket
        cumulative = 0.0
        for variant, traffic in sorted(exp.variants.items()):
            cumulative += traffic
            if bucket < cumulative:
                return variant

        return list(exp.variants.keys())[0]

    def route(self, experiment_id: str, user_id: str) -> str:
        variant = self.get_variant(experiment_id, user_id)
        # Map variant to model
        variant_model_map = {
            "control": "gpt-4-turbo",
            "treatment_a": "gpt-4o",
            "treatment_b": "claude-3-opus",
        }
        return variant_model_map[variant]

# Usage
ab_router = ABTestRouter({
    "chatbot_v2": ABTest(
        experiment_id="chatbot_v2",
        variants={"control": 0.5, "treatment_a": 0.25, "treatment_b": 0.25}
    )
})

# User 123 always gets same variant
model = ab_router.route("chatbot_v2", user_id="123")

Model Router Decision Matrix

Scenario Recommended Strategy Why
A/B testing new model Weighted Round-Robin Simple, deterministic split
Cost optimization Confidence-Based Cheap first, expensive only if needed
Latency SLA Latency-Based Always fastest available
High availability Fallback Chain + Circuit Breaker Graceful degradation
Multi-tenant Metadata-based routing Per-tenant model pools

Interview Questions

Q: What is cascade routing and when would you use it?

Cascade routing tries a cheap model first, then escalates to an expensive model if the cheap model's confidence is below threshold. Use it for classification tasks where models can return confidence scores. If 70% of requests are handled confidently by a cheap model, you reduce expensive model usage by 70% while maintaining quality. The overhead is the initial cheap request for the remaining 30%.

Q: How do you implement A/B testing between ML models?

Use deterministic hashing based on user_id to assign users to variants. Hash(user_id + experiment_id) mod 100 gives consistent bucket assignment — same user always sees same variant. Track metrics per variant (latency, quality, cost). Statistical significance requires enough samples per variant (typically 1000+ conversions).

Q: What is a circuit breaker and why is it important for multi-model serving?

Circuit breaker tracks failures per model and temporarily stops sending traffic to failing models. After N consecutive failures, circuit opens and rejects all traffic for a cooldown period (typically 5 minutes). Then transitions to half-open state for a test request. This prevents cascading failures and allows graceful degradation when providers have outages.

Q: How do you choose between latency-based vs cost-aware routing?

Latency-based for interactive applications (chat, autocomplete) where user experience depends on response time. Route to fastest responding endpoint. Cost-aware for batch processing or when margins matter. Classify request complexity and route simple requests to cheap models. Can combine both: cost-aware for classification, latency-based as tiebreaker.


18. Data Quality for ML in Production

Источники: Uplatz "Data Validation and Quality in MLOps" (Nov 2025), ML Journey "Data Lineage Tracking" (Sep 2025), Gartner Data Quality Report (2025)

The Data Quality Problem

Key Statistics: - 60% of ML project failures attributed to poor data quality - $12.9M average annual cost of poor data quality per organization - 80% of ML project time spent on data preparation

GIGO Principle: Models precisely learn the noise and errors in training data.

Data Quality Dimensions for ML

Dimension Definition ML Impact
Accuracy Data conforms to reality Incorrect patterns learned
Completeness Absence of missing values Biased models, discarded records
Consistency Standard format/structure Fragmented categories, diluted signals
Timeliness Data is up-to-date Stale models, irrelevant predictions
Relevance Data applies to the problem Noise, computational waste
Uniqueness No duplicate records Inflated importance of certain instances
Validity Conforms to business rules Type errors, out-of-range values

Validation Types by Pipeline Stage

Stage Validation Focus Key Checks
Ingestion Raw data integrity Schema, freshness, row counts, source integrity
Preparation Cleanliness Missing values, duplicates, outliers, format
Training Fitness for model Train-test split, feature validation, distribution
Production Ongoing quality Drift detection, skew detection, schema evolution

Schema Validation with Great Expectations

import great_expectations as gx
from great_expectations.dataset import PandasDataset

# Define expectations (data contract)
def create_expectations(df: PandasDataset) -> PandasDataset:
    # Schema checks
    df.expect_column_to_exist("user_id")
    df.expect_column_to_exist("timestamp")
    df.expect_column_to_exist("features")

    # Type checks
    df.expect_column_values_to_be_of_type("user_id", "int")
    df.expect_column_values_to_be_of_type("timestamp", "datetime64[ns]")

    # Range checks
    df.expect_column_values_to_be_between("age", min_value=0, max_value=120)
    df.expect_column_values_to_be_between("score", min_value=0.0, max_value=1.0)

    # Completeness checks
    df.expect_column_values_to_not_be_null("user_id")
    df.expect_column_values_to_not_be_null("timestamp")

    # Uniqueness checks
    df.expect_column_values_to_be_unique("user_id")

    # Categorical checks
    df.expect_column_values_to_be_in_set("status", ["active", "inactive", "pending"])

    return df

# Validate data
validation_result = df.validate()

if not validation_result.success:
    print(f"Validation failed: {validation_result.statistics}")
    # Halt pipeline or raise alert

TensorFlow Data Validation (TFDV)

import tensorflow_data_validation as tfdv

# Generate statistics from training data
train_stats = tfdv.generate_statistics_from_csv(train_data_path)

# Infer schema from statistics
schema = tfdv.infer_schema(train_stats)

# Customize schema constraints
tfdv.set_domain(schema, 'age', min=0, max=120)
tfdv.set_domain(schema, 'category', ['A', 'B', 'C'])

# Validate new data against schema
validation_result = tfdv.validate_statistics(
    statistics=new_data_stats,
    schema=schema
)

# Check for anomalies
if validation_result.anomaly_info:
    for feature, anomaly in validation_result.anomaly_info.items():
        print(f"Anomaly in {feature}: {anomaly.description}")

# Detect drift between training and serving
tfdv.visualize_statistics(
    lhs_statistics=train_stats,
    rhs_statistics=serving_stats,
    lhs_name='Training',
    rhs_name='Serving'
)

Schema Evolution Strategies

Strategy Description When to Use
Additive Add new optional columns Non-breaking changes
Backward Compatible New code reads old data Gradual rollout
Forward Compatible Old code reads new data Multi-version serving
Dual-Write Write to old + new schemas Migration period
class SchemaEvolution:
    """Handle schema versioning in ML pipelines."""

    def __init__(self, current_version: str):
        self.version = current_version
        self.schemas = {
            "v1": {"features": ["age", "income", "location"]},
            "v2": {"features": ["age", "income", "location", "education", "tenure"]},
            "v3": {"features": ["age", "income", "region", "education", "tenure", "segment"]},
        }

    def migrate(self, df, from_version: str, to_version: str):
        """Migrate data between schema versions."""
        source_features = self.schemas[from_version]["features"]
        target_features = self.schemas[to_version]["features"]

        # Add missing columns with defaults
        for feature in target_features:
            if feature not in df.columns:
                df[feature] = self.get_default(feature)

        # Rename columns (e.g., location -> region)
        if from_version == "v2" and to_version == "v3":
            df = df.rename(columns={"location": "region"})

        return df[target_features]

    def get_default(self, feature: str):
        defaults = {
            "education": "unknown",
            "tenure": 0,
            "segment": "default"
        }
        return defaults.get(feature, None)

Data Lineage Tracking

from dataclasses import dataclass
from datetime import datetime
from typing import List, Dict, Optional
import hashlib

@dataclass
class LineageNode:
    """A node in the data lineage graph."""
    node_id: str
    node_type: str  # "source", "transform", "model", "prediction"
    name: str
    timestamp: datetime
    metadata: Dict
    inputs: List[str]
    outputs: List[str]

class DataLineageTracker:
    """Track data lineage through ML pipeline."""

    def __init__(self):
        self.nodes: Dict[str, LineageNode] = {}
        self.edges: List[tuple] = []

    def track_source(self, name: str, source_path: str, **metadata):
        """Track data source ingestion."""
        node_id = self._generate_id(f"source:{name}")
        node = LineageNode(
            node_id=node_id,
            node_type="source",
            name=name,
            timestamp=datetime.now(),
            metadata={"path": source_path, **metadata},
            inputs=[],
            outputs=[]
        )
        self.nodes[node_id] = node
        return node_id

    def track_transform(
        self,
        name: str,
        input_ids: List[str],
        transform_code: str,
        **metadata
    ):
        """Track data transformation."""
        node_id = self._generate_id(f"transform:{name}")
        node = LineageNode(
            node_id=node_id,
            node_type="transform",
            name=name,
            timestamp=datetime.now(),
            metadata={"code_hash": hashlib.md5(transform_code.encode()).hexdigest(), **metadata},
            inputs=input_ids,
            outputs=[]
        )
        self.nodes[node_id] = node

        # Create edges
        for input_id in input_ids:
            self.edges.append((input_id, node_id))

        return node_id

    def track_model(self, name: str, training_data_id: str, model_params: Dict):
        """Track model training."""
        node_id = self._generate_id(f"model:{name}")
        node = LineageNode(
            node_id=node_id,
            node_type="model",
            name=name,
            timestamp=datetime.now(),
            metadata=model_params,
            inputs=[training_data_id],
            outputs=[]
        )
        self.nodes[node_id] = node
        self.edges.append((training_data_id, node_id))
        return node_id

    def get_lineage(self, node_id: str) -> Dict:
        """Get full lineage for a node (upstream and downstream)."""
        upstream = self._trace_upstream(node_id)
        downstream = self._trace_downstream(node_id)
        return {
            "node": self.nodes[node_id],
            "upstream": [self.nodes[n] for n in upstream],
            "downstream": [self.nodes[n] for n in downstream]
        }

    def _generate_id(self, base: str) -> str:
        return hashlib.md5(f"{base}:{datetime.now().isoformat()}".encode()).hexdigest()[:12]

    def _trace_upstream(self, node_id: str, visited: set = None) -> List[str]:
        if visited is None:
            visited = set()
        if node_id in visited:
            return []
        visited.add(node_id)

        node = self.nodes[node_id]
        result = list(node.inputs)
        for input_id in node.inputs:
            result.extend(self._trace_upstream(input_id, visited))
        return result

    def _trace_downstream(self, node_id: str, visited: set = None) -> List[str]:
        if visited is None:
            visited = set()
        if node_id in visited:
            return []
        visited.add(node_id)

        result = []
        for src, dst in self.edges:
            if src == node_id:
                result.append(dst)
                result.extend(self._trace_downstream(dst, visited))
        return result

# Usage
lineage = DataLineageTracker()

# Track pipeline
source_id = lineage.track_source("user_data", "/data/users.parquet", rows=100000)
features_id = lineage.track_transform("feature_engineering", [source_id], "features.py")
model_id = lineage.track_model("churn_model", features_id, {"algorithm": "xgboost", "auc": 0.85})

# Query lineage
print(lineage.get_lineage(model_id))

Data Quality Tools Comparison

Tool Strengths Use Case
Great Expectations Rich expectations, docs, integrations General validation
TFDV TensorFlow ecosystem, drift detection TF/TFX pipelines
Deepchecks ML-specific checks, model validation Model quality
Pandera Type-safe schemas, DataFrame contracts Python validation
Evidently AI Drift dashboards, reports Monitoring

Interview Questions

Q: What are the key dimensions of data quality for ML?

Accuracy (no factual errors), Completeness (no missing values), Consistency (standard formats), Timeliness (fresh data), Relevance (applicable features), Uniqueness (no duplicates), Validity (conforms to rules). Each dimension directly impacts model behavior - incomplete data causes bias, stale data causes drift, inconsistent data fragments categories.

Q: How do you handle schema evolution in production ML?

Three strategies: (1) Additive: Only add optional columns (backward compatible). (2) Dual-write: Write to both old and new schemas during migration. (3) Version-based routing: Route requests based on schema version metadata. Always maintain schema registry and validation at pipeline entry points. Test with both schemas before deprecating old.

Q: What is data lineage and why is it important?

Data lineage tracks the complete path of data through ML pipelines - from source ingestion through transformations to model outputs. Critical for: (1) Debugging - trace root cause of failures, (2) Compliance - GDPR/CCPA data subject requests, (3) Reproducibility - recreate experiments with exact data versions, (4) Impact analysis - understand downstream effects of upstream changes. Implement with graph-based lineage models and metadata catalogs.

Q: How do you detect data drift vs training-serving skew?

Data drift: Statistical distribution of production data shifts from training baseline (e.g., user demographics change). Detect with KS test, PSI, or divergence metrics. Training-serving skew: Same data processed differently in training vs inference (e.g., different preprocessing code). Detect by comparing feature statistics between training pipeline and serving endpoint. Drift requires retraining; skew requires code fix.


19. Foundation Models in Production

Источники: Collabnix "Multi-Tenant LLM Platform" (2025), Lilian Li "Prompt Caching" (2026), Zylos Research "LLM Caching Strategies" (2025)

Multi-Tenant LLM Serving

Key Challenges: - Resource Isolation: GPU-intensive inference requires strict quotas to prevent noisy neighbors - Data Privacy: Tenant requests/responses must remain isolated - Cost Attribution: Track GPU usage, tokens, API calls per tenant - Performance SLAs: Different tenants need different latency guarantees - Model Versioning: Support different model versions per tenant

Multi-Tenant Architecture on Kubernetes

┌─────────────────────────────────────────────────────────────┐
│                    API Gateway / Istio                       │
│              (Auth, Rate Limiting, Routing)                  │
└─────────────────────┬───────────────────────────────────────┘
       ┌──────────────┼──────────────┐
       │              │              │
       ▼              ▼              ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ tenant-acme  │ │ tenant-beta  │ │ tenant-gamma │
│  Namespace   │ │  Namespace   │ │  Namespace   │
│              │ │              │ │              │
│ ┌──────────┐ │ │ ┌──────────┐ │ │ ┌──────────┐ │
│ │  vLLM    │ │ │ │  vLLM    │ │ │ │  vLLM    │ │
│ │ Llama-2  │ │ │ │ Mistral  │ │ │ │ GPT-4    │ │
│ └──────────┘ │ │ └──────────┘ │ │ └──────────┘ │
│              │ │              │ │              │
│  Quota: 2GPU │ │  Quota: 4GPU │ │  Quota: 1GPU │
└──────────────┘ └──────────────┘ └──────────────┘
            ┌──────────────────┐
            │  Shared GPU Pool  │
            │  (A100/H100)      │
            └──────────────────┘

Tenant Resource Quotas (Kubernetes)

apiVersion: v1
kind: Namespace
metadata:
  name: tenant-acme
  labels:
    tenant: acme
    istio-injection: enabled
---
apiVersion: v1
kind: ResourceQuota
metadata:
  name: tenant-acme-quota
  namespace: tenant-acme
spec:
  hard:
    requests.cpu: "32"
    requests.memory: 128Gi
    requests.nvidia.com/gpu: "2"
    limits.nvidia.com/gpu: "2"

Prompt Caching Economics

How It Works: Reuse Key-Value (KV) Cache from previous requests. When a model processes text, it generates KV tensors. Caching skips expensive computation for identical prefix text.

Pricing Comparison (2026):

Provider Discount Duration Strategy
OpenAI 50% Auto (24h) Zero config, same price
Anthropic 90% 5min-1h Explicit cache_control
Google 75% Configurable Vertex AI caching

Decision Framework:

Step 1: Is prompt >= 1024 tokens?
  NO → Skip caching (won't activate)
  YES → Continue

Step 2: Expected reuses >= 3 times?
  NO → Skip caching (not cost-effective)
  YES → Continue

Step 3: Match strategy to time span:
  Minutes → Anthropic ephemeral
  Hours   → OpenAI auto / Anthropic 1h
  Days    → Persistent cache + TTL

Prompt Caching Implementation

# OpenAI: Automatic caching (same price)
response = openai.chat.completions.create(
    model="gpt-4-turbo",
    messages=[
        {"role": "system", "content": "Your 10k token system prompt..."},
        {"role": "user", "content": "User query"}
    ]
    # Caching happens automatically for identical prefixes
)

# Anthropic: Explicit caching (90% discount)
response = client.messages.create(
    model="claude-3-sonnet",
    system=[{
        "type": "text",
        "text": "Your 10k token system prompt...",
        "cache_control": {"type": "ephemeral"}  # 5min cache
    }],
    messages=[{"role": "user", "content": "User query"}]
)

# Multi-layer caching architecture
class MultiLayerCache:
    """L1: Exact match (Redis) → L2: Semantic (Vector) → L3: Provider cache"""

    def __init__(self):
        self.redis = redis.Redis()  # L1: <10ms
        self.vector_store = QdrantClient()  # L2: 50-150ms
        # L3: Provider's built-in caching

    async def get_or_generate(self, prompt: str, generate_fn):
        # L1: Exact match
        cache_key = hashlib.sha256(prompt.encode()).hexdigest()
        if cached := self.redis.get(cache_key):
            return cached

        # L2: Semantic similarity
        similar = await self.find_similar(prompt, threshold=0.95)
        if similar:
            return similar.response

        # L3: Generate with provider caching
        response = await generate_fn(prompt)

        # Cache for future
        self.redis.setex(cache_key, 3600, response)  # 1 hour TTL
        return response

Structuring Prompts for Caching

# GOOD: Static prefix enables caching
SYSTEM_PROMPT = """
You are a helpful assistant with access to the following knowledge base:
[50k token knowledge base content here]

Always follow these rules:
1. Be concise
2. Cite sources
3. Ask clarifying questions if needed
"""

def query(user_question: str):
    return client.messages.create(
        model="claude-3-sonnet",
        system=[{
            "type": "text",
            "text": SYSTEM_PROMPT,  # Cached!
            "cache_control": {"type": "ephemeral"}
        }],
        messages=[{"role": "user", "content": user_question}]
    )

# BAD: Dynamic prefix breaks cache
def query_bad(user_id: str, timestamp: str, question: str):
    return client.messages.create(
        model="claude-3-sonnet",
        messages=[{
            "role": "user",
            "content": f"[{timestamp}] User {user_id} asks: {question}"
            # Cache misses every time!
        }]
    )

Token Economics and Cost Optimization

Optimization Savings Effort Use Case
Prompt caching 50-90% Low Repeated prefixes
Model routing 30-60% Medium Variable complexity
Semantic caching 10-30% Medium Similar queries
Prompt compression 20-50% Low Long contexts
Response streaming Latency only Low User experience

Fallback Strategies

class FallbackChain:
    """Graceful degradation when primary fails."""

    def __init__(self):
        self.providers = [
            {"name": "openai", "model": "gpt-4", "priority": 1},
            {"name": "anthropic", "model": "claude-3", "priority": 2},
            {"name": "local", "model": "llama-3-70b", "priority": 3},
        ]
        self.circuit_breakers = {p["name"]: CircuitBreaker() for p in self.providers}

    async def generate(self, prompt: str) -> str:
        for provider in sorted(self.providers, key=lambda x: x["priority"]):
            cb = self.circuit_breakers[provider["name"]]
            if not cb.should_try():
                continue

            try:
                response = await self.call_provider(provider, prompt)
                cb.record_success()
                return response
            except Exception as e:
                cb.record_failure()
                logging.warning(f"{provider['name']} failed: {e}")
                continue

        raise Exception("All providers failed")

Interview Questions

Q: What are the key challenges in multi-tenant LLM serving?

(1) Resource isolation: GPU-intensive inference needs strict quotas to prevent noisy neighbors. (2) Data privacy: Tenant requests/responses must stay isolated with zero cross-contamination. (3) Cost attribution: Track GPU usage, tokens, API calls per tenant for billing. (4) Performance SLAs: Different tenants need different latency guarantees. (5) Model versioning: Support different model versions or fine-tuned variants per tenant. Solution: Namespace-based isolation with Kubernetes ResourceQuotas, Istio for network policies, separate vLLM deployments per tenant.

Q: How does prompt caching work and when should you use it?

Prompt caching reuses Key-Value (KV) Cache from previous requests. When a model processes text, it generates KV tensors; caching skips this computation for identical prefixes. When to use: (1) Prompt >= 1024 tokens, (2) Expected reuses >= 3 times, (3) Static prefix structure. Pricing: OpenAI 50% auto, Anthropic 90% explicit. Critical limitation: Prefix-based - changing first character invalidates entire cache. Structure prompts with static prefix at top, dynamic content at bottom.

Q: How do you structure prompts for optimal caching?

Put static content at the beginning, dynamic content at the end. Cache is prefix-based, so [Static 50k knowledge base] [Examples] [User query] works, but [Timestamp] [User ID] [Static content] breaks cache every request. Use system prompts for static content (knowledge bases, instructions), user messages for dynamic queries. Monitor hit rate: cache_read / (cache_read + cache_creation), target >70%.

Q: What's the fallback strategy when LLM providers fail?

Implement circuit breaker pattern with provider chain: (1) Primary provider (GPT-4/Claude) with circuit breaker, (2) Secondary provider as backup, (3) Local model as final fallback. Circuit breaker opens after N failures, closes after cooldown. Track error rates per provider, route around degraded services. Always have local inference as final fallback for zero-downtime SLAs.


20. AI Agents in Production

Источники: Iain Harper "Security for Production AI Agents in 2026", UiPath "10 Best Practices for Building Reliable AI Agents 2025", n8n "Best Practices for Deploying AI Agents", OWASP Top 10 for LLM Applications 2025

Understanding AI Agents

Agent Definition: Software that uses an LLM to autonomously perform tasks on behalf of users. Unlike chatbots (respond to questions), agents can take actions: browsing web, sending emails, querying databases, executing code, interacting with other systems.

Production Criteria (when enterprise-grade guardrails needed): - Autonomy: Execute actions with real-world consequences, operate with delegated authority, chain multiple tool calls - Data: Process untrusted external content, access sensitive systems/PII, operate across trust boundaries - Consequences: Errors are costly/difficult to reverse, regulatory/reputational risk, customer-facing

Threat Landscape (OWASP 2025)

Risk Description Mitigation
LLM01: Prompt Injection Manipulating model through malicious inputs Defence-in-depth, input validation
LLM02: Sensitive Data Leakage Exposing PII, financial details Output filtering, PII detection
LLM05: Improper Output Handling Insufficient validation Schema validation with Pydantic
LLM06: Excessive Agency Too much capability without controls Tool allowlists, permission gating
LLM10: Unbounded Consumption Resource exhaustion Rate limits, budget controls

Prompt Injection Stats: - Appears in 73%+ of production deployments - 5 carefully crafted documents can manipulate AI 90% of time (RAG poisoning) - CVE-2025-53773: GitHub Copilot RCE (CVSS 9.6)

Defence-in-Depth Architecture

Six Layers: 1. Input Sanitization - Clean/validate data before AI 2. Injection Detection - Identify manipulation attempts 3. Agent Execution - Control decisions/actions 4. Tool Call Interception - Review/approve actions before execution 5. Output Validation - Check responses before users 6. Observability & Audit - Monitor everything

# Tool Allowlist with Permission Gating
class ToolGatekeeper:
    """Principle of least privilege for agent tools."""

    ALLOWED_TOOLS = {
        "customer_service": ["get_order_status", "search_faq", "create_ticket"],
        "research": ["web_search", "read_document", "summarize"],
        # NO: "delete_data", "send_email", "make_payment"
    }

    def validate_tool_call(self, agent_role: str, tool_name: str, params: dict) -> bool:
        if tool_name not in self.ALLOWED_TOOLS.get(agent_role, []):
            logging.warning(f"Blocked unauthorized tool: {tool_name}")
            return False

        # Validate parameters against schema
        return self._validate_params(tool_name, params)

    def _validate_params(self, tool_name: str, params: dict) -> bool:
        # Schema-based validation
        schema = TOOL_SCHEMAS.get(tool_name, {})
        # ... validate params match schema
        return True

Human-in-the-Loop Patterns

When to Require Human Approval: - Irreversible operations (sending emails, payments, deleting data) - High-cost actions (API calls exceeding threshold, bulk operations) - Novel situations (significantly different from training) - Regulated domains (healthcare, financial, legal)

# LangGraph HITL with interrupt()
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph

def build_hitl_agent():
    workflow = StateGraph(AgentState)

    # ... add nodes ...

    # Interrupt before high-risk action
    workflow.add_node("request_approval", request_human_approval)
    workflow.add_edge("agent_decides", "request_approval")

    # After interrupt, resume with human input
    checkpointer = MemorySaver()
    app = workflow.compile(checkpointer=checkpointer, interrupt_before=["execute_action"])

    return app

# Human approval workflow
async def human_approval_handler(thread_id: str, action: dict):
    """Human reviews and approves/rejects action."""
    # Send to Slack/Email with action details
    notification = format_action_request(action)
    await send_to_approval_channel(notification)

    # Wait for human response
    response = await wait_for_approval(thread_id, timeout_minutes=30)

    if response.approved:
        # Resume workflow with approval
        app.update_state(thread_id, {"approved": True, "approver": response.user})
    else:
        # Handle rejection
        app.update_state(thread_id, {"rejected": True, "reason": response.reason})

LLM-as-Judge Evaluation

# Judge model evaluates agent outputs
from pydantic import BaseModel

class JudgeVerdict(BaseModel):
    score: float  # 0.0 - 1.0
    reasoning: str
    issues: list[str]
    passed: bool

async def evaluate_with_judge(output: str, criteria: list[str]) -> JudgeVerdict:
    """Use separate model to evaluate agent output."""

    judge_prompt = f"""
    Evaluate this agent output against criteria.

    Criteria: {criteria}

    Agent Output: {output}

    Provide:
    1. Score (0.0-1.0)
    2. Reasoning for score
    3. List of specific issues found
    4. Whether output passes quality threshold
    """

    response = await judge_model.generate(judge_prompt, response_format=JudgeVerdict)
    return JudgeVerdict(**response)

# Best practices from research:
# - Few-shot prompting with good/bad examples
# - Chain-of-thought reasoning before scoring
# - Separate judge model from generation model
# - Calibrate against human labels

Production Best Practices (10 Commandments)

1. Design Agents That Fail Safe - Start small with single-responsibility agents - Modularize into specialized agents for complex workflows - For deterministic tasks, use tools (not LLM reasoning)

2. Configure Context Properly - Index enterprise context (knowledge bases, docs) - Choose right model for task (GPT-5 for complex, Flash for simple) - Maintain clarity in tool definitions

3. Treat Every Capability as a Tool - Tools should have tight input/output contracts - Schema-driven prompts with validation - Build tools for deterministic operations (math, date comparison)

4. Write Prompts Like Product Specs - Define: Role, Instructions, Goal, Success metrics, Guardrails - Use structured, multi-step reasoning - Describe what SHOULD happen, not what shouldn't

5. Evaluate for Real World - Build robust evaluation datasets (30+ cases per agent) - Test breadth AND depth (accuracy, reasoning, tool-use) - End-to-end testing in full automation context

6. Build-in Safety, Governance, Compliance - Deploy via Orchestrator/Maestro for lifecycle management - Apply AI Trust Layer (PII redaction, audit logs, throttling) - Human-in-the-loop for high-risk decisions

7. Version on Purpose and Gate Releases - Version everything: prompts, tools, datasets, evaluations - Gate production release on evaluation thresholds - Attach evaluations to version tags

8. Design Trust-Building Conversations - Set clear expectations about capabilities - Confirm irreversible actions deterministically - Show context/reasoning where appropriate

9. Control Cost and Performance - Right-size model choice per task - Limit token use, cache stable responses - Batch low-risk calls, escalate only when necessary

10. Continuous Improvement - Trace and learn from execution logs - Human feedback loop into design updates - Scale incrementally after proving stability

Agent Observability with OpenTelemetry

# OpenTelemetry GenAI semantic conventions
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode

tracer = trace.get_tracer(__name__)

@tracer.start_as_current_span("agent.execution")
async def execute_agent_with_telemetry(agent, input_data):
    span = trace.get_current_span()

    # GenAI semantic conventions
    span.set_attribute("gen_ai.system", "openai")
    span.set_attribute("gen_ai.request.model", "gpt-4")
    span.set_attribute("gen_ai.request.max_tokens", 4096)

    try:
        response = await agent.run(input_data)

        # Track token usage
        span.set_attribute("gen_ai.usage.input_tokens", response.usage.prompt_tokens)
        span.set_attribute("gen_ai.usage.output_tokens", response.usage.completion_tokens)
        span.set_attribute("gen_ai.response.finish_reason", response.finish_reason)

        span.set_status(Status(StatusCode.OK))
        return response

    except Exception as e:
        span.set_status(Status(StatusCode.ERROR, str(e)))
        span.record_exception(e)
        raise

Key Observability Metrics: - Token consumption per request/session - Guardrail trigger rates - Human escalation rates - Error rates by type - Response latency percentiles (p50, p95, p99) - Cost per task

Interview Questions

Q: What makes AI agents different from chatbots, and when do you need production guardrails?

Agents autonomously execute tasks (browse web, send emails, modify databases) vs chatbots just respond. Production guardrails needed when: (1) Autonomy: Real-world consequences, delegated authority, multiple tool chains; (2) Data: Untrusted external content, sensitive/PII access, cross-boundary operations; (3) Consequences: Costly errors, regulatory risk, customer-facing. Key insight: The gap between "works in demo" and "production at scale" is where projects die.

Q: What is the OWASP Top 10 for LLM Applications and which are most critical?

2025 Top 10: LLM01 (Prompt Injection - 73% of deployments), LLM02 (Data Leakage), LLM05 (Output Handling), LLM06 (Excessive Agency), LLM10 (Unbounded Consumption). Critical: Prompt injection (no universal solution, use defence-in-depth), Excessive Agency (limit tools with allowlists). Unlike SQL injection (solved by parameterized queries), prompt injection may be inherent to LLMs - assume some attacks will succeed, limit blast radius.

Q: Explain defence-in-depth for AI agents.

Six layers: (1) Input Sanitization - clean data before AI; (2) Injection Detection - identify manipulation; (3) Agent Execution - control decisions; (4) Tool Interception - approve actions before execution; (5) Output Validation - check responses; (6) Observability - monitor everything. No single guardrail is sufficient - deterministic validators + LLM evaluation + human oversight + comprehensive logging. Each layer catches different failure categories.

Q: When should you use human-in-the-loop for agents?

Require human approval for: (1) Irreversible operations - sending emails, payments, deleting data; (2) High-cost actions - API calls exceeding threshold, bulk operations; (3) Novel situations - significantly different from training scenarios; (4) Regulated domains - healthcare, financial, legal decisions. Implementation: LangGraph interrupt() for pause/resume, notification channels (Slack/Email), timeout handling, escalation workflows. Monitor escalation rates - high rates indicate agent needs improvement.


21. Model Compression (Knowledge Distillation, Pruning, Edge Deployment)

Как сжать модель в 10x без потери качества для edge/mobile deployment

Почему Model Compression важен

Проблема: LLM растут до триллионов параметров, но edge устройства ограничены. Решение: Compression techniques - pruning, distillation, quantization.

Цифры 2025: - 65% mobile ML моделей используют pruning (MLPerf 2025) - DistilBERT: 40% smaller, 60% faster, 97% accuracy retention - Pruning + quantization stack: 2x compression vs distillation alone


Knowledge Distillation

Concept: Teacher-student framework - компактная модель учится у большой.

Teacher (Large)        Student (Small)
    [7B params]    →       [350M params]
    [High accuracy]        [Fast inference]

Temperature Scaling

Проблема: Softmax outputs слишком "peaked" - [0.95, 0.02, 0.01, 0.01, 0.01]

Решение: Temperature T > 1 смягчает распределение:

\[P_i^{(T)} = \frac{e^{z_i / T}}{\sum_j e^{z_j / T}}\]

С T=5: [0.4, 0.2, 0.15, 0.15, 0.1] - студент учит class relationships!

Distillation Loss

import torch
import torch.nn.functional as F

def distillation_loss(student_logits, teacher_logits, labels,
                      temperature=4.0, alpha=0.5):
    """
    Combined loss for knowledge distillation.

    L_total = α * CE(y_true, y_student) + β * KL(p_teacher^T || p_student^T)
    """
    # Hard loss: Cross-entropy with ground truth
    hard_loss = F.cross_entropy(student_logits, labels)

    # Soft loss: KL divergence between teacher and student
    soft_teacher = F.softmax(teacher_logits / temperature, dim=1)
    soft_student = F.log_softmax(student_logits / temperature, dim=1)

    # Scale by T^2 to match gradient magnitudes
    soft_loss = F.kl_div(soft_student, soft_teacher, reduction='batchmean')
    soft_loss *= (temperature ** 2)

    # Combined
    return alpha * hard_loss + (1 - alpha) * soft_loss

Types of Distillation

Type What's Transferred Best For
Response-Based Output logits Classification
Feature-Based Intermediate activations CNNs, Vision
Attention-Based Attention maps Transformers (DistilBERT)
Multi-Teacher Ensemble outputs Critical accuracy

Complete Training Loop

def train_with_distillation(teacher, student, train_loader, epochs=10,
                            temperature=4.0, alpha=0.5, lr=1e-4):
    """
    Train student model with knowledge distillation.
    """
    teacher.eval()  # Freeze teacher
    optimizer = torch.optim.Adam(student.parameters(), lr=lr)

    for epoch in range(epochs):
        student.train()
        total_loss = 0

        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            with torch.no_grad():
                teacher_logits = teacher(inputs)

            student_logits = student(inputs)

            loss = distillation_loss(
                student_logits, teacher_logits, labels,
                temperature=temperature, alpha=alpha
            )

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        print(f"Epoch {epoch+1}: Loss = {total_loss/len(train_loader):.4f}")

    return student

Distilled Models (Production-Ready)

Model Size Reduction Speedup Accuracy
DistilBERT 40% 60% 97% of BERT
TinyBERT 72% 80% 96% of BERT
MobileBERT 4x 5.5x Optimized for mobile
DistilWhisper 50% 60% Speech recognition

Neural Network Pruning

Concept: Remove redundant weights while preserving accuracy.

Lottery Ticket Hypothesis (Frankle & Carbin 2019):

Dense networks contain sparse subnetworks ("winning tickets") that achieve full accuracy when trained in isolation.

Pruning Types

Type What's Removed Hardware Impact
Unstructured Individual weights Needs sparse kernels
Structured Entire filters/channels GPU-friendly speedup
Global Cross-layer ranking Balanced compression
Iterative Gradual increase Better accuracy

Pruning Criteria

import torch
import torch.nn.utils.prune as prune

# 1. Magnitude Pruning (L1 norm)
# Remove weights where |w_i| < θ
prune.l1_unstructured(model.conv1, name='weight', amount=0.5)

# 2. Global Pruning (cross-layer)
prune.global_unstructured(
    [(module, 'weight') for module in model.modules()
     if isinstance(module, (nn.Conv2d, nn.Linear))],
    pruning_method=prune.L1Unstructured,
    amount=0.8  # 80% sparsity
)

# 3. Structured Pruning (channel removal)
prune.ln_structured(model.conv1, name='weight', amount=0.3, n=2, dim=0)

# Make permanent
prune.remove(model.conv1, 'weight')

Iterative Pruning Pipeline

def iterative_prune(model, train_loader, sparsity_schedule=[0.2, 0.5, 0.8],
                    fine_tune_epochs=5):
    """
    Gradually increase sparsity with retraining between steps.

    Key insight: 90% sparsity with iterative approach retains 99% accuracy
    on Vision Transformers (NeurIPS 2024 findings).
    """
    for target_sparsity in sparsity_schedule:
        # Prune
        params_to_prune = [
            (m, 'weight') for m in model.modules()
            if isinstance(m, (nn.Conv2d, nn.Linear))
        ]
        prune.global_unstructured(
            params_to_prune,
            pruning_method=prune.L1Unstructured,
            amount=target_sparsity
        )

        # Fine-tune to recover accuracy
        model.train()
        optimizer = torch.optim.SGD(model.parameters(), lr=0.01)

        for epoch in range(fine_tune_epochs):
            for data, target in train_loader:
                optimizer.zero_grad()
                output = model(data)
                loss = F.cross_entropy(output, target)
                loss.backward()
                optimizer.step()

        # Permanentize pruning
        for m, _ in params_to_prune:
            if hasattr(m, 'weight_mask'):
                prune.remove(m, 'weight')

    return model

Pruning Benchmarks (ResNet-50, ImageNet, RTX 4090)

Model Inference (ms) Top-1 Acc Size (MB) FLOPs (G)
Dense Baseline 35.4 76.1% 98 4.1
Prune 90% 8.2 75.8% 12 2.1
INT8 Quant 12.5 74.2% 25 4.1
Distilled 15.1 75.3% 49 2.8

Hybrid Compression Pipeline

Best practice 2025: Pruning → Quantization → Distillation stack

Original Model (7B, 28GB)
    ↓ Pruning (90% sparsity)
Compressed (700M params, 2.8GB)
    ↓ Quantization (INT8)
Smaller (350MB)
    ↓ Distillation (student model)
Final (100MB, 10x faster)
def hybrid_compress(teacher_model, train_loader, val_loader):
    """
    P-Q-D pipeline: Pruning → Quantization → Distillation
    Achieves 100x compression with <5% accuracy loss.
    """
    # Step 1: Prune teacher
    pruned = iterative_prune(
        teacher_model, train_loader,
        sparsity_schedule=[0.3, 0.6, 0.9]
    )

    # Step 2: Quantize (INT8)
    quantized = torch.quantization.quantize_dynamic(
        pruned, {nn.Linear}, dtype=torch.qint8
    )

    # Step 3: Distill to student
    student = SmallModel()  # e.g., 10x smaller
    distilled = train_with_distillation(
        teacher=quantized,
        student=student,
        train_loader=train_loader
    )

    return distilled

Edge Deployment Optimization

Target Devices: - Smartphones (5.4 billion users, 2025) - IoT sensors (TinyML) - Edge servers (Jetson Orin)

Model Size Targets

Device RAM Limit Model Size Target Techniques
Mobile CPU 2-4 GB <100 MB Pruning + INT8
IoT/TinyML 256 KB <1 MB Quantization + Pruning
Edge GPU 8-16 GB <500 MB Structured pruning

Mobile Deployment (TensorFlow Lite)

import tensorflow as tf

def optimize_for_mobile(keras_model):
    """Optimize model for mobile deployment."""

    # Convert to TFLite
    converter = tf.lite.TFLiteConverter.from_keras_model(keras_model)

    # Enable optimizations
    converter.optimizations = [tf.lite.Optimize.DEFAULT]

    # INT8 quantization with representative dataset
    def representative_dataset():
        for data in train_dataset.take(100):
            yield [tf.cast(data, tf.float32)]

    converter.representative_dataset = representative_dataset
    converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
    converter.inference_input_type = tf.uint8
    converter.inference_output_type = tf.uint8

    tflite_model = converter.convert()

    # Save
    with open('model_int8.tflite', 'wb') as f:
        f.write(tflite_model)

    return tflite_model

ONNX Export for Cross-Platform

import torch.onnx

def export_to_onnx(model, input_shape=(1, 3, 224, 224), output_path="model.onnx"):
    """Export PyTorch model to ONNX for TensorRT/CoreML deployment."""

    model.eval()
    dummy_input = torch.randn(input_shape)

    torch.onnx.export(
        model,
        dummy_input,
        output_path,
        export_params=True,
        opset_version=14,
        do_constant_folding=True,
        input_names=['input'],
        output_names=['output'],
        dynamic_axes={
            'input': {0: 'batch_size'},
            'output': {0: 'batch_size'}
        }
    )

    print(f"Exported to {output_path}")

Cost Savings (2025 Benchmarks)

Metric Dense Pruned 90% Savings
GPU Hours (Fine-tune) $4,000 $1,500 62%
Storage/Month $300 $50 83%
Edge Inference $500 $100 80%
Total $4,800 $1,650 66%

Interview Questions

Q: Explain knowledge distillation and why temperature matters.

Teacher-student framework where compact model learns from large model's outputs. Temperature T > 1 softens softmax outputs: softmax(z_i/T) spreads probability mass across classes, revealing class relationships. T=2-5 typical; higher T = softer targets = more "dark knowledge" transferred. Without temperature, student only learns the top prediction, not the nuanced relationships between classes.

Q: What is the Lottery Ticket Hypothesis and how does it relate to pruning?

Frankle & Carbin (2019): Dense networks contain sparse subnetworks ("winning tickets") that match full accuracy when trained in isolation. Implication: Most parameters are redundant. Key insight - must reset remaining weights to initialization before retraining. Iterative magnitude pruning (IMP) achieves 90% sparsity with <2% accuracy loss by gradually pruning and retraining.

Q: When would you use structured vs unstructured pruning?

Structured (remove entire filters/channels) when: GPU inference needed, no sparse kernel support, want actual speedup on stock PyTorch. Unstructured (remove individual weights) when: Maximum compression needed, sparse tensor cores available (Ampere+ GPUs), storage-constrained (model size). Structured gives direct speedup but may lose more accuracy; unstructured achieves higher sparsity but needs hardware support.

Q: How do you deploy a 7B parameter LLM on a smartphone?

Multi-stage pipeline: (1) Pruning - 90% unstructured sparsity → 700M params; (2) Quantization - INT8/INT4 → 350-175MB; (3) Distillation - Student model 10x smaller → ~100MB. Target: <100MB for mobile. Use TFLite/ONNX Mobile for deployment. Consider model sharding for larger models. Test latency targets: <50ms for interactive, <200ms for background tasks.


Sources

  • LabelYourData "Knowledge Distillation" (2025)
  • Arik Poz "Knowledge Distillation in PyTorch" (Apr 2025)
  • Johal.in "Neural Network Pruning with Torch-Prune" (Nov 2025)
  • Frontiers in Robotics and AI "Survey of Model Compression" (2025)
  • Hinton et al. "Distilling the Knowledge in a Neural Network" (2015, canonical)

22. Monitoring & Observability for ML

Prometheus + Grafana + Structured Logging для production ML

Почему ML Monitoring критичен

Проблема: 70% AI deployments fail из-за undetected performance drifts (Gartner 2025) Уникальные ML вызовы: Data drift, concept drift, model degradation - молча эродируют performance

Four Pillars of Observability: 1. Metrics - Prometheus (latency, accuracy, throughput) 2. Logs - Structured JSON (errors, predictions, audit) 3. Traces - Distributed tracing (Jaeger) 4. Dashboards - Grafana visualizations


Key Metrics to Monitor

Model Performance Metrics

Metric Type Prometheus Type Purpose
Accuracy Gauge ml_accuracy Track model quality
Prediction latency Histogram ml_latency_seconds P50/P95/P99
Prediction count Counter ml_predictions_total Throughput
Confidence scores Histogram ml_confidence_scores Distribution
Error rate Counter ml_errors_total By error type

Data Quality Metrics

Metric Detection Method
Feature Drift KS test, PSI
Missing Values % per feature
Outliers Isolation Forest
Data Freshness Timestamp check

System Metrics

Metric Threshold
API Latency P99 < 500ms
CPU/GPU Utilization < 80%
Memory < 85%
Request Rate Track trends

Prometheus Instrumentation

from prometheus_client import Counter, Gauge, Histogram, generate_latest, REGISTRY
from flask import Flask, Response
import numpy as np

# Define ML-specific metrics
predictions_total = Counter(
    'ml_predictions_total',
    'Total predictions',
    ['model_name', 'model_version', 'status']
)
prediction_accuracy = Gauge(
    'ml_prediction_accuracy',
    'Rolling accuracy',
    ['model_name']
)
inference_latency = Histogram(
    'ml_inference_latency_seconds',
    'Inference latency',
    buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0]
)
feature_drift_score = Gauge(
    'ml_feature_drift_score',
    'Drift score per feature',
    ['feature_name']
)

class MLModelMonitor:
    def __init__(self, model, model_name, model_version):
        self.model = model
        self.model_name = model_name
        self.model_version = model_version
        self.app = Flask(__name__)
        self._setup_routes()

    def _setup_routes(self):
        @self.app.route('/metrics')
        def metrics():
            return Response(generate_latest(REGISTRY), mimetype='text/plain')

        @self.app.route('/predict', methods=['POST'])
        def predict():
            import time
            data = request.json

            with inference_latency.time():
                try:
                    prediction = self.model.predict(data['features'])
                    predictions_total.labels(
                        model_name=self.model_name,
                        model_version=self.model_version,
                        status='success'
                    ).inc()
                    return {'prediction': prediction.tolist()}
                except Exception as e:
                    predictions_total.labels(
                        model_name=self.model_name,
                        model_version=self.model_version,
                        status='error'
                    ).inc()
                    raise

    def run(self, port=5000):
        self.app.run(host='0.0.0.0', port=port)

Prometheus Configuration

# prometheus.yml
global:
  scrape_interval: 15s
  evaluation_interval: 15s

scrape_configs:
  - job_name: 'ml-models'
    static_configs:
      - targets: ['model-server:5000']
    metrics_path: '/metrics'

rule_files:
  - 'alerts.yml'

alerting:
  alertmanagers:
    - static_configs:
        - targets: ['alertmanager:9093']

Alerting Rules

# alerts.yml
groups:
  - name: ml-model-alerts
    rules:
      # Accuracy degradation
      - alert: ModelAccuracyDropped
        expr: ml_prediction_accuracy < 0.85
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Model accuracy dropped below 85%"
          description: "{{ $labels.model_name }} accuracy is {{ $value }}"

      # Latency spike
      - alert: HighInferenceLatency
        expr: histogram_quantile(0.99, rate(ml_inference_latency_seconds_bucket[5m])) > 0.5
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "P99 latency exceeds 500ms"

      # Data drift detected
      - alert: DataDriftDetected
        expr: ml_feature_drift_score > 0.1
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "Feature {{ $labels.feature_name }} showing drift"

      # Error rate spike
      - alert: HighErrorRate
        expr: rate(ml_predictions_total{status="error"}[5m]) / rate(ml_predictions_total[5m]) > 0.05
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "Error rate exceeds 5%"

Grafana Dashboard Design

Dashboard Hierarchy

Overview Dashboard
├── Model Health Panel (status indicators)
├── Key Metrics Row (accuracy, latency, throughput)
└── Alert Summary

Model Detail Dashboard
├── Accuracy Trend (time series)
├── Prediction Distribution (histogram)
├── Feature Drift Heatmap
└── Confusion Matrix

Troubleshooting Dashboard
├── Error Logs
├── Resource Utilization
└── Request Traces

PromQL Queries

# Prediction rate (requests/second)
rate(ml_predictions_total[5m])

# P95 latency
histogram_quantile(0.95, rate(ml_inference_latency_seconds_bucket[5m]))

# Accuracy trend (if logged)
avg_over_time(ml_prediction_accuracy[1h])

# Error rate percentage
100 * sum(rate(ml_predictions_total{status="error"}[5m])) / sum(rate(ml_predictions_total[5m]))

# Feature drift score
ml_feature_drift_score > 0.1

Structured Logging Best Practices

import structlog
import json
from datetime import datetime

# Configure structured logging
structlog.configure(
    processors=[
        structlog.stdlib.filter_by_level,
        structlog.stdlib.add_logger_name,
        structlog.stdlib.add_log_level,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.JSONRenderer()
    ],
    wrapper_class=structlog.stdlib.BoundLogger,
    context_class=dict,
    logger_factory=structlog.stdlib.LoggerFactory(),
)

logger = structlog.get_logger()

def log_prediction(model_name: str, model_version: str,
                   input_features: dict, prediction: any,
                   latency_ms: float, confidence: float):
    """Structured log for each prediction."""
    logger.info(
        "prediction",
        model_name=model_name,
        model_version=model_version,
        input_hash=hash(frozenset(input_features.items())) % 10000,  # Anonymized
        prediction=str(prediction),
        latency_ms=latency_ms,
        confidence=confidence,
        timestamp=datetime.utcnow().isoformat()
    )

def log_error(model_name: str, error_type: str,
              error_message: str, stack_trace: str = None):
    """Structured error logging."""
    logger.error(
        "model_error",
        model_name=model_name,
        error_type=error_type,
        error_message=error_message,
        stack_trace=stack_trace,
        timestamp=datetime.utcnow().isoformat()
    )

Log Schema Example

{
  "timestamp": "2025-09-14T14:30:00.123Z",
  "level": "INFO",
  "event": "prediction",
  "model_name": "fraud_detector_v2",
  "model_version": "2.3.1",
  "input_hash": 4521,
  "prediction": "not_fraud",
  "latency_ms": 23.5,
  "confidence": 0.92,
  "request_id": "req-abc123",
  "user_id": "user-456"
}

Drift Detection Implementation

from scipy import stats
import numpy as np

class DriftMonitor:
    """Statistical drift detection for feature monitoring."""

    def __init__(self, reference_data: np.ndarray, feature_names: list):
        self.reference = reference_data
        self.feature_names = feature_names
        self.baseline_stats = self._compute_baseline()

    def _compute_baseline(self):
        """Compute reference statistics."""
        return {
            'mean': np.mean(self.reference, axis=0),
            'std': np.std(self.reference, axis=0),
            'percentiles': np.percentile(self.reference, [5, 50, 95], axis=0)
        }

    def ks_test_drift(self, current_data: np.ndarray) -> dict:
        """
        Kolmogorov-Smirnov test for drift detection.
        Returns p-value and statistic per feature.
        """
        drift_scores = {}
        for i, name in enumerate(self.feature_names):
            stat, p_value = stats.ks_2samp(
                self.reference[:, i],
                current_data[:, i]
            )
            drift_scores[name] = {
                'statistic': stat,
                'p_value': p_value,
                'drift_detected': p_value < 0.05
            }
            # Update Prometheus gauge
            feature_drift_score.labels(feature_name=name).set(stat)

        return drift_scores

    def psi_score(self, current_data: np.ndarray, bins=10) -> dict:
        """
        Population Stability Index (PSI) for drift.
        PSI < 0.1: No drift
        PSI 0.1-0.25: Moderate drift
        PSI > 0.25: Significant drift
        """
        psi_scores = {}
        for i, name in enumerate(self.feature_names):
            ref_hist, bin_edges = np.histogram(self.reference[:, i], bins=bins)
            cur_hist, _ = np.histogram(current_data[:, i], bins=bin_edges)

            # Normalize
            ref_pct = ref_hist / len(self.reference)
            cur_pct = cur_hist / len(current_data)

            # PSI formula
            psi = np.sum((cur_pct - ref_pct) * np.log(cur_pct / ref_pct + 1e-10))
            psi_scores[name] = psi

        return psi_scores

Multi-Level Alerting Strategy

Level Severity Response Time Example
P0 Critical <5 min Service down, accuracy <70%
P1 High <30 min Latency P99 > 1s, error rate >10%
P2 Warning <4 hours Drift detected, accuracy <85%
P3 Info Next business day Minor degradation trends

Alert Fatigue Prevention: 1. Group related alerts - Single notification for related issues 2. Set appropriate for: durations - Avoid transient spikes 3. Use composite alerts - Multiple conditions must be true 4. Implement acknowledgments - Track response 5. Regular threshold review - Tune based on SLOs


Monitoring Stack Comparison

Tool Purpose Strengths
Prometheus Metrics collection Pull model, PromQL, alerting
Grafana Visualization Multi-source, dashboards, alerts
MLflow Experiment tracking Model registry, versioning
Evidently AI ML-specific monitoring Drift dashboards, reports
Datadog Full-stack observability Logs+metrics+traces

Interview Questions

Q: What metrics would you monitor for a production ML model?

Three categories: Model Performance (accuracy, F1, confidence distribution, prediction latency histogram), Data Quality (feature drift via KS test/PSI, missing value rates, outlier frequency, data freshness), System Health (API latency P50/P95/P99, CPU/GPU utilization, memory, request throughput). Key insight: Monitor prediction confidence distributions - shifts indicate input changes even before accuracy drops.

Q: How do you detect data drift in production?

Two main methods: KS Test (Kolmogorov-Smirnov) - compares CDFs of reference vs current data, D = sup_x |F_1(x) - F_2(x)|, alert if p < 0.05. PSI (Population Stability Index) - bin-based comparison, PSI < 0.1 no drift, 0.1-0.25 moderate, >0.25 significant. Implementation: Store reference distributions from training, compute rolling statistics on production data, update Prometheus gauges, trigger alerts when thresholds exceeded.

Q: What's the difference between monitoring and observability?

Monitoring = "Is it working?" (dashboards, alerts, known unknowns). Observability = "Why isn't it working?" (logs + metrics + traces, unknown unknowns). Monitoring answers predefined questions; observability enables ad-hoc debugging. For ML: monitoring tracks accuracy, latency; observability helps diagnose WHY accuracy dropped through correlated logs, traces, and metric drill-downs.

Q: How do you design alerts to avoid alert fatigue?

(1) Multi-level severity - P0 critical <5min, P3 info next day. (2) Composite alerts - require multiple conditions (accuracy low AND latency high). (3) Appropriate for: durations - avoid transient spikes. (4) Group related alerts - single notification for cascading failures. (5) Regular tuning - review false positive rates weekly. Target: <5 actionable alerts/day, <1 false positive/week.


Sources

  • ML Journey "Monitoring ML Models with Prometheus and Grafana" (Sep 2025)
  • Johal.in "MLOps Monitoring with Prometheus and Grafana" (Sep 2025)
  • Diousoft "Model Monitoring & Logging" (2025)
  • Grafana Labs "Observability Survey" (Mar 2025)
  • UpCloud "Observability with Prometheus" (2025)

23. Security for ML (Adversarial, Model Extraction, Privacy)

Защита ML моделей от атак: adversarial examples, model stealing, privacy breaches

Почему ML Security критичен

Проблема: 41% enterprises reported AI security incidents by late 2024 (Gartner) Первый agentic AI attack: September 2025 - AI performed 80-90% of attack work autonomously

ML-specific attack surface: - Models learn from data → attackers manipulate data or learning - Black-box access via API → can probe and extract - Confidence scores leak information → enable inversion attacks


Attack Taxonomy

Attack Type Phase Target Access Required
Evasion Inference Model predictions Query access
Data Poisoning Training Training data Data pipeline
Model Extraction Inference Model IP Query access
Model Inversion Inference Training data Query access
Membership Inference Inference Training set membership Query access

1. Adversarial Attacks (Evasion)

Concept: Imperceptible input modifications that fool ML models.

Original Image: [cat] → Model: "cat" (99%)
+ Adversarial Noise (invisible to human)
Adversarial Image: [cat+ε] → Model: "dog" (99%)

Attack Methods

FGSM (Fast Gradient Sign Method): $\(x_{adv} = x + \epsilon \cdot \text{sign}(\nabla_x J(\theta, x, y))\)$

Where: - \(x\) = original input - \(\epsilon\) = perturbation magnitude - \(J\) = loss function - \(\nabla_x J\) = gradient of loss w.r.t. input

PGD (Projected Gradient Descent): Iterative FGSM with projection back to ε-ball.

import torch
import torch.nn.functional as F

def fgsm_attack(model, x, y, epsilon=0.1):
    """Fast Gradient Sign Method attack."""
    x_adv = x.clone().detach().requires_grad_(True)

    output = model(x_adv)
    loss = F.cross_entropy(output, y)

    loss.backward()

    # Add perturbation in gradient direction
    x_adv = x_adv + epsilon * x_adv.grad.sign()

    # Clip to valid range
    x_adv = torch.clamp(x_adv, 0, 1)

    return x_adv.detach()

def pgd_attack(model, x, y, epsilon=0.1, alpha=0.01, iterations=40):
    """Projected Gradient Descent attack (stronger)."""
    x_adv = x.clone().detach()

    for _ in range(iterations):
        x_adv.requires_grad_(True)
        output = model(x_adv)
        loss = F.cross_entropy(output, y)
        loss.backward()

        # Step in gradient direction
        x_adv = x_adv + alpha * x_adv.grad.sign()

        # Project back to epsilon ball
        delta = torch.clamp(x_adv - x, -epsilon, epsilon)
        x_adv = torch.clamp(x + delta, 0, 1)
        x_adv = x_adv.detach()

    return x_adv

White-box vs Black-box

Setting Attacker Knowledge Attack Strength
White-box Architecture, weights, data FGSM, PGD, CW
Black-box Query access only Query-based, transfer attacks

2. Model Extraction Attacks

Concept: Recreate proprietary model through systematic queries.

Real case: DeepSeek used GPT-¾ API outputs for distillation (Dec 2024), OpenAI revoked access.

class ModelExtractionDefense:
    """Defenses against model stealing."""

    def __init__(self, rate_limit=1000, noise_scale=0.01):
        self.rate_limit = rate_limit
        self.noise_scale = noise_scale
        self.query_counts = {}  # Track per-user queries

    def check_rate_limit(self, user_id):
        """Prevent mass querying."""
        count = self.query_counts.get(user_id, 0)
        if count > self.rate_limit:
            raise Exception("Rate limit exceeded")
        self.query_counts[user_id] = count + 1

    def perturb_output(self, probs):
        """Add noise to obscure exact model behavior."""
        noise = torch.randn_like(probs) * self.noise_scale
        perturbed = probs + noise
        return F.softmax(perturbed, dim=-1)

    def limit_output_precision(self, probs, top_k=5):
        """Only return top-k probabilities, not full distribution."""
        top_probs, top_indices = torch.topk(probs, k=top_k)
        return {idx.item(): prob.item() for idx, prob in zip(top_indices, top_probs)}

3. Model Inversion Attacks

Concept: Reconstruct training data from model outputs.

Example: Medical imaging model → reconstruct patient SSNs, names from confidence scores.

NIST Classification (Mar 2025): - Confidence score attacks: Analyze probability distributions - Label-only attacks: Use only hard labels (more queries needed) - Attribute inference: Infer specific sensitive features

class ModelInversionDefense:
    """Defenses against training data reconstruction."""

    def __init__(self, model, epsilon=1.0):
        self.model = model
        self.epsilon = epsilon  # Differential privacy parameter

    def apply_differential_privacy(self, gradients):
        """
        DP-SGD: Clip gradients + add Gaussian noise.
        Prevents any single training sample from dominating.
        """
        # Clip per-sample gradients
        max_norm = 1.0
        total_norm = torch.norm(gradients)
        if total_norm > max_norm:
            gradients = gradients * (max_norm / total_norm)

        # Add calibrated noise
        noise = torch.randn_like(gradients) * self.epsilon
        return gradients + noise

    def reduce_output_precision(self, logits, decimals=2):
        """Round logits to reduce information leakage."""
        return torch.round(logits * (10 ** decimals)) / (10 ** decimals)

4. Membership Inference Attacks (MIA)

Concept: Determine if a specific sample was in training data.

Risk: HIPAA/GDPR violations - reveals if patient data was used.

class MembershipInferenceDefense:
    """Prevent determination of training set membership."""

    def __init__(self, threshold=0.5):
        self.threshold = threshold

    def regularize_confidence(self, probs):
        """
        Reduce overconfidence on training samples.
        Models often show higher confidence on training data.
        """
        # Label smoothing effect
        smoothed = probs * 0.9 + 0.1 / probs.size(-1)
        return smoothed

    def detect_attack_pattern(self, query_history):
        """Detect MIA attack patterns: many similar queries."""
        # MIA attackers query similar inputs to probe membership
        # Monitor for suspicious query patterns
        pass

5. Differential Privacy

Definition: A mechanism M satisfies (ε, δ)-DP if for all datasets D, D' differing by one record:

\[P[M(D) \in S] \leq e^{\epsilon} \cdot P[M(D') \in S] + \delta\]

Key idea: Output doesn't change significantly if any one record is removed.

import torch
from torch import nn

class DPSGDTrainer:
    """Differentially Private Stochastic Gradient Descent."""

    def __init__(self, model, epsilon=1.0, delta=1e-5, max_grad_norm=1.0):
        self.model = model
        self.epsilon = epsilon
        self.delta = delta
        self.max_grad_norm = max_grad_norm

        # Calculate noise scale
        self.noise_scale = self._compute_noise_scale()

    def _compute_noise_scale(self):
        """Compute Gaussian noise scale for (ε, δ)-DP."""
        # Simplified: sigma = sqrt(2 * ln(1.25/delta)) / epsilon
        import math
        return math.sqrt(2 * math.log(1.25 / self.delta)) / self.epsilon

    def step(self, loss, optimizer):
        """Perform one DP-SGD step."""
        # Compute per-sample gradients
        loss.backward()

        # Clip gradients
        torch.nn.utils.clip_grad_norm_(
            self.model.parameters(),
            self.max_grad_norm
        )

        # Add Gaussian noise to gradients
        for param in self.model.parameters():
            if param.grad is not None:
                noise = torch.randn_like(param.grad) * self.noise_scale * self.max_grad_norm
                param.grad += noise

        optimizer.step()
        optimizer.zero_grad()

Defense Summary Table

Attack Primary Defense Secondary Defense
Evasion Adversarial training Input preprocessing, detection
Poisoning Data validation, provenance Robust aggregation, DP-SGD
Extraction Rate limiting, output perturbation Watermarking, query monitoring
Inversion Differential privacy Limit output precision
MIA Regularization, DP Confidence calibration

Multi-Layer Defense Architecture

Layer 1: Data Security
├── Validate all inputs
├── Track data provenance
├── Apply differential privacy to sensitive data

Layer 2: Training Security
├── Adversarial training
├── DP-SGD for privacy
├── Model versioning with attestations

Layer 3: Access Control
├── Zero trust for model APIs
├── Rate limiting per user
├── Authentication required

Layer 4: Output Protection
├── Limit confidence score precision
├── Add calibrated noise to outputs
├── Return top-k only, not full distributions

Layer 5: Monitoring
├── Detect unusual query patterns
├── Alert on extraction attempts
├── Audit trail for compliance

Interview Questions

Q: What is an adversarial attack and how does FGSM work?

Adversarial attacks add imperceptible perturbations to inputs that fool ML models. FGSM (Fast Gradient Sign Method) computes: x_adv = x + ε·sign(∇_x J(θ, x, y)) - adds perturbation in the direction that maximizes loss. ε controls perturbation magnitude (typically 0.01-0.1). Works in white-box setting where attacker has model access. Defenses include adversarial training (include adversarial examples in training data), input preprocessing (feature squeezing), and detection methods.

Q: How do model extraction attacks work and how do you prevent them?

Attackers query the model API systematically with diverse inputs, collect (input, output) pairs, then train a "copycat" model to mimic the original. Defenses: (1) Rate limiting - prevent mass queries; (2) Output perturbation - add noise to confidence scores; (3) Limit information - return only top-k predictions, not full distributions; (4) Watermarking - embed unique patterns in predictions to prove theft; (5) Query monitoring - detect suspicious patterns (high volume, diverse inputs probing decision boundaries).

Q: What is differential privacy and why does it help against model inversion?

DP ensures model output doesn't change significantly if any single training record is removed - formally: P[M(D)∈S] ≤ e^ε · P[M(D')∈S] + δ. Implementation: DP-SGD clips per-sample gradients and adds Gaussian noise calibrated to (ε, δ). This prevents attackers from reconstructing training data because no individual record can significantly influence the model. Trade-off: higher ε = less privacy, better utility; typical ε = 1-10.

Q: How would you secure a model deployed as a public API?

Multi-layer defense: (1) Authentication - API keys, JWT tokens; (2) Rate limiting - prevent extraction attacks, e.g., 1000 queries/day per key; (3) Output limiting - return predictions only, not confidence distributions; (4) Monitoring - alert on unusual patterns (many similar queries, probing behavior); (5) Logging - audit trail for compliance; (6) Versioning - ability to rollback if compromised; (7) Input validation - reject malformed or suspicious inputs; (8) Consider differential privacy if training data is sensitive.


Sources

  • SentinelOne "Model Inversion Attacks" (Jan 2026)
  • YASH Technologies "Adversarial Attack Defenses" (Jan 2026)
  • ByteJournal "Model Security" (Feb 2025)
  • NIST Adversarial ML Taxonomy (Mar 2025)
  • ResearchGate "Security and Privacy-Preserving for ML" (Jul 2025)

Видео-ресурсы

YouTube

Stanford CS329S: Machine Learning Systems Design - URL: https://stanford-cs329s.github.io/ - Lectures on ML deployment, monitoring

Full Stack LLM Bootcamp - Production LLM applications - URL: https://fullstackdeeplearning.com/llm-bootcamp/


Типичные заблуждения

Заблуждение: Feature Store нужен только для больших компаний

Training-serving skew -- причина #1 silent model degradation в production. Даже в команде из 3 ML-инженеров, без feature store features вычисляются по-разному в training (Spark/Pandas) и serving (Python/SQL), что даёт -5-15% accuracy drop в production. Feast (open-source) можно развернуть за 1 день.

Заблуждение: A/B тест ML-модели -- это просто random split пользователей

При network effects (соцсети, маркетплейсы) random split нарушает SUTVA -- поведение пользователя A влияет на пользователя B. Для RecSys: пользователь в treatment видит новые рекомендации, делится ими с пользователем в control. Решение: cluster-based randomization, geo-based split, или switchback experiments.

Заблуждение: drift detected = нужно переобучить модель

PSI > 0.25 на фичах означает сдвиг распределения, но НЕ обязательно падение quality. Сезонность в e-commerce (Black Friday) вызывает PSI spike, но модель может работать отлично. Правильный порядок: (1) зафиксировать drift, (2) проверить model quality на свежих данных, (3) переобучить только если quality упала. Автоматический retrain по PSI без проверки quality -- дорогая ошибка.

Вопросы с оценкой ответов

Зачем нужен multi-stage recommender вместо одной модели?

❌ "Для улучшения accuracy через каскад моделей" -- не объясняет главную причину

✅ "Scale и latency. При 1B items и SLA < 200ms невозможно скоринг всех items тяжёлой моделью. Multi-stage funnel: Retrieval (Two-Tower + ANN) сужает до ~1000 за ~10ms, Ranking (deep model) скорит ~500 за ~100ms, Re-ranking применяет diversity и business rules за ~20ms. Total ~150ms. Это единственная viable архитектура при YouTube/Netflix масштабе."

Как выбрать между Feast (OSS) и Tecton (managed) feature store?

❌ "Feast бесплатный, поэтому всегда лучше" -- игнорирует total cost of ownership

✅ "Зависит от scale и team: Feast (OSS) -- бесплатный, но self-managed: подходит для startups, обучения, бюджетных constraints. Нужен DevOps для Redis + Spark. Tecton (managed) -- дорого, но enterprise-grade: rich real-time transformations, advanced monitoring, full support. Окупается при 5+ ML-инженерах, когда time-to-production важнее cost. Hopsworks -- middle ground: open-core, хороший real-time."


Связанные темы

  • dl_004_optimizers - Training optimization
  • llm_001_rag_pipeline - RAG architecture
  • llm_012_llm_prod - LLM production (детальнее)

    Section 11-13 cross-refs:

    Section 14-15 cross-refs:

    Section 16 cross-refs:

    Section 17-19 cross-refs:

    Section 20 cross-refs: