MLOps: учебные материалы¶
~4 минуты чтения
Предварительно: Подготовка к интервью MLOps | LLMOps vs MLOps
По данным Gartner (2025), только 53% ML-моделей добираются до production. MLOps -- набор практик, который сокращает time-to-production с 6 месяцев до 2 недель. На интервью спрашивают model serving (FastAPI + A/B), experiment tracking (MLflow vs W&B), CI/CD для ML, drift detection и feature stores. Ниже -- материалы с production-ready кодом.
Обновлено: 2026-02-11
Обзор задачи¶
| ID | Задача | Сложность | Ключевые темы |
|---|---|---|---|
| mlops_001 | Model Serving (FastAPI + A/B) | Medium | API design, A/B routing, monitoring |
Model Serving с FastAPI¶
Лучшие источники¶
Статьи: - The Complete MLOps/LLMOps Roadmap for 2026 — Medium (2026) - Model Deployment with Docker and FastAPI — ML Mastery (July 2025) - MLOps at Scale: Serving Transformers — Donald Simpson (Dec 2025) - From Notebook to Production: MLOps Guide 2025 — Medium
Код: FastAPI Model Serving¶
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import joblib
import numpy as np
from contextlib import asynccontextmanager
# Model loading at startup
@asynccontextmanager
async def lifespan(app: FastAPI):
# Load models
app.state.model_a = joblib.load("model_a.pkl")
app.state.model_b = joblib.load("model_b.pkl")
yield
# Cleanup
pass
app = FastAPI(lifespan=lifespan)
class PredictRequest(BaseModel):
features: list[float]
class PredictResponse(BaseModel):
prediction: float
model_version: str
@app.post("/predict", response_model=PredictResponse)
async def predict(request: PredictRequest):
features = np.array([request.features])
prediction = app.state.model_a.predict(features)[0]
return PredictResponse(prediction=prediction, model_version="a")
@app.get("/health")
async def health():
return {"status": "healthy"}
Код: A/B Testing Router¶
import random
from fastapi import Request
@app.post("/predict_ab")
async def predict_ab(request: PredictRequest, req: Request):
"""A/B test routing between two models"""
# Get user ID from header or generate
user_id = req.headers.get("X-User-ID", str(random.random()))
# Deterministic routing based on user_id
use_model_b = hash(user_id) % 100 < 20 # 20% traffic to model B
features = np.array([request.features])
if use_model_b:
prediction = app.state.model_b.predict(features)[0]
model_version = "b"
else:
prediction = app.state.model_a.predict(features)[0]
model_version = "a"
# Log for A/B analysis
log_prediction(user_id, model_version, prediction)
return PredictResponse(prediction=prediction, model_version=model_version)
Dockerfile¶
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY model_a.pkl model_b.pkl .
COPY main.py .
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
Monitoring¶
from prometheus_client import Counter, Histogram, generate_latest
from fastapi import Response
# Metrics
PREDICTION_COUNT = Counter('predictions_total', 'Total predictions', ['model_version'])
PREDICTION_LATENCY = Histogram('prediction_latency_seconds', 'Prediction latency')
@app.get("/metrics")
async def metrics():
return Response(content=generate_latest(), media_type="text/plain")
@app.post("/predict")
@PREDICTION_LATENCY.time()
async def predict(request: PredictRequest):
PREDICTION_COUNT.labels(model_version="a").inc()
# ... prediction logic
Experiment Tracking (MLflow vs W&B)¶
Лучшие источники¶
Статьи: - MLflow Official Docs — LF AI Foundation - Weights & Biases Docs — W&B - MLflow vs W&B vs Neptune Comparison — Neptune.ai (2025)
Сравнение фреймворков¶
| Feature | MLflow | W&B | Neptune |
|---|---|---|---|
| Self-hosted | Yes | No (SaaS) | Hybrid |
| Open-source | Yes (Apache 2.0) | Client only | No |
| UI quality | Basic | Excellent | Good |
| Model registry | Built-in | Artifacts | Built-in |
| Pricing | Free | Free tier, paid teams | Paid |
| GPU monitoring | No | Yes (nvidia-smi) | Yes |
Код: MLflow Tracking¶
import mlflow
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score
mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("fraud-detection-v2")
with mlflow.start_run(run_name="rf-baseline"):
# Log hyperparameters
params = {"n_estimators": 200, "max_depth": 10, "min_samples_leaf": 5}
mlflow.log_params(params)
# Train
model = RandomForestClassifier(**params)
model.fit(X_train, y_train)
# Evaluate
y_pred = model.predict(X_test)
metrics = {
"accuracy": accuracy_score(y_test, y_pred),
"f1": f1_score(y_test, y_pred),
}
mlflow.log_metrics(metrics)
# Log model with signature
from mlflow.models import infer_signature
signature = infer_signature(X_train, y_pred)
mlflow.sklearn.log_model(model, "model", signature=signature)
# Register model
mlflow.register_model(
f"runs:/{mlflow.active_run().info.run_id}/model",
"fraud-detection"
)
Код: W&B Tracking¶
import wandb
from sklearn.ensemble import RandomForestClassifier
wandb.init(project="fraud-detection", name="rf-baseline", config={
"n_estimators": 200,
"max_depth": 10,
"min_samples_leaf": 5,
})
model = RandomForestClassifier(**wandb.config)
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
wandb.log({"accuracy": accuracy_score(y_test, y_pred), "f1": f1_score(y_test, y_pred)})
# Log confusion matrix
wandb.log({"confusion_matrix": wandb.plot.confusion_matrix(
y_true=y_test, preds=y_pred, class_names=["legit", "fraud"]
)})
wandb.finish()
ML CI/CD Pipeline¶
Лучшие источники¶
Статьи: - CI/CD for Machine Learning — Made With ML - Testing ML Systems — Made With ML - ML Testing: Beyond Unit Tests — Eugene Yan (2025)
GitHub Actions Pipeline¶
# .github/workflows/ml-pipeline.yml
name: ML CI/CD
on:
push:
branches: [main]
paths: ['models/**', 'features/**', 'tests/**']
pull_request:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.11'
- run: pip install -r requirements.txt
# Unit tests
- name: Unit tests
run: pytest tests/unit/ -v
# Data validation
- name: Data quality checks
run: pytest tests/data/ -v
# Model tests
- name: Model quality tests
run: pytest tests/model/ -v --timeout=300
train:
needs: test
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v4
- name: Train model
run: python train.py --config configs/prod.yaml
- name: Validate metrics
run: python validate.py --min-accuracy 0.85 --min-f1 0.80
- name: Register model
run: python register.py --stage staging
Код: ML-Specific Tests¶
# tests/data/test_data_quality.py
import pandas as pd
import pytest
def test_no_data_leakage(train_df, test_df):
"""Train and test sets must not overlap"""
overlap = set(train_df["user_id"]) & set(test_df["user_id"])
assert len(overlap) == 0, f"Leakage: {len(overlap)} shared users"
def test_feature_distributions(train_df, test_df):
"""Feature distributions should be similar (PSI < 0.25)"""
for col in train_df.select_dtypes(include="number").columns:
psi = calculate_psi(train_df[col], test_df[col])
assert psi < 0.25, f"PSI({col}) = {psi:.3f} > 0.25"
def test_no_nulls_in_required(df):
"""Required features must not have nulls"""
required = ["user_id", "amount", "timestamp"]
for col in required:
assert df[col].notna().all(), f"Nulls in {col}: {df[col].isna().sum()}"
# tests/model/test_model_quality.py
def test_model_not_random(model, X_test, y_test):
"""Model must beat random baseline"""
y_pred = model.predict(X_test)
acc = accuracy_score(y_test, y_pred)
random_baseline = max(y_test.mean(), 1 - y_test.mean())
assert acc > random_baseline, f"Model ({acc:.3f}) worse than random ({random_baseline:.3f})"
def test_model_latency(model, X_test):
"""Single prediction must be < 50ms"""
import time
start = time.perf_counter()
model.predict(X_test[:1])
latency_ms = (time.perf_counter() - start) * 1000
assert latency_ms < 50, f"Latency {latency_ms:.1f}ms > 50ms"
def test_model_invariance(model):
"""Doubling feature should not flip prediction"""
x = np.array([[100, 0.5, 1]])
pred1 = model.predict_proba(x)[0][1]
x_doubled = x * 2
pred2 = model.predict_proba(x_doubled)[0][1]
assert abs(pred1 - pred2) < 0.5, "Model too sensitive to scaling"
Monitoring & Drift Detection¶
Лучшие источники¶
Статьи: - ML Monitoring Best Practices — Evidently AI - Data Drift vs Concept Drift — Evidently AI (2025) - NannyML: Estimating Performance — NannyML Docs
Типы дрифта¶
| Тип | Что меняется | Пример | Метрика |
|---|---|---|---|
| Data drift | P(X) | Новая география пользователей | PSI, KS-test |
| Concept drift | P(Y|X) | Fraud tactics evolve | Performance drop |
| Label drift | P(Y) | Seasonal class imbalance | Label distribution |
| Upstream drift | Feature pipeline | API format changed | Schema validation |
Код: PSI (Population Stability Index)¶
import numpy as np
def calculate_psi(baseline: np.ndarray, current: np.ndarray, bins: int = 10) -> float:
"""
PSI < 0.1: No change
PSI 0.1-0.25: Moderate change
PSI > 0.25: Significant change (retrain!)
"""
# Create bins from baseline
breakpoints = np.percentile(baseline, np.linspace(0, 100, bins + 1))
breakpoints[-1] = np.inf
breakpoints[0] = -np.inf
# Calculate proportions
baseline_counts = np.histogram(baseline, bins=breakpoints)[0] / len(baseline)
current_counts = np.histogram(current, bins=breakpoints)[0] / len(current)
# Avoid division by zero
baseline_counts = np.clip(baseline_counts, 1e-4, None)
current_counts = np.clip(current_counts, 1e-4, None)
# PSI formula
psi = np.sum((current_counts - baseline_counts) * np.log(current_counts / baseline_counts))
return psi
Код: Evidently AI Monitoring¶
from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, TargetDriftPreset
column_mapping = ColumnMapping(
target="is_fraud",
prediction="prediction",
numerical_features=["amount", "hour", "velocity"],
categorical_features=["country", "device_type"],
)
# Data drift report
drift_report = Report(metrics=[DataDriftPreset()])
drift_report.run(reference_data=train_df, current_data=prod_df, column_mapping=column_mapping)
drift_report.save_html("drift_report.html")
# Extract results programmatically
result = drift_report.as_dict()
dataset_drift = result["metrics"][0]["result"]["dataset_drift"]
drifted_features = [
col for col, info in result["metrics"][0]["result"]["drift_by_columns"].items()
if info["drift_detected"]
]
print(f"Dataset drift: {dataset_drift}, Drifted features: {drifted_features}")
Model Registry & Governance¶
Лучшие источники¶
Статьи: - MLflow Model Registry — MLflow Docs - ML Model Governance — Neptune.ai
Заблуждение: если модель хорошо работает на test set, она будет работать в production
Data drift (изменение P(X)) и concept drift (изменение P(Y|X)) разрушают производительность. По данным Evidently AI (2025), 91% production моделей деградируют в первые 3 месяца. PSI > 0.25 = нужен retrain. Мониторинг обязателен.
Заблуждение: MLflow и W&B -- взаимозаменяемы
MLflow: open-source, self-hosted, сильный model registry, бесплатный. W&B: SaaS, лучшая визуализация и collaboration, GPU monitoring. Многие команды используют оба: W&B для экспериментов, MLflow для registry и deployment. На интервью ожидают знание trade-offs.
Заблуждение: feature store нужен только большим компаниям
Train-serving skew (разный код для offline/online features) -- причина 60% багов ML в production (Google, 2022). Feature store решает это с одним определением фичи для двух хранилищ. Feast -- open-source, поднимается за час.
Lifecycle Stages¶
graph LR
DEV[Development<br/>Train, Log, Test] --> STG[Staging<br/>Validate, Shadow, A/B]
STG --> PROD[Production<br/>Monitor, Alert, Retrain]
PROD --> ARCH[Archived]
style DEV fill:#e8eaf6,stroke:#3f51b5
style STG fill:#fff3e0,stroke:#ef6c00
style PROD fill:#e8f5e9,stroke:#4caf50
style ARCH fill:#f3e5f5,stroke:#9c27b0
Код: MLflow Model Promotion¶
from mlflow import MlflowClient
client = MlflowClient()
# Get latest staging model
latest = client.get_latest_versions("fraud-detection", stages=["Staging"])
staging_version = latest[0].version
# Compare staging vs production
prod_versions = client.get_latest_versions("fraud-detection", stages=["Production"])
if prod_versions:
prod_run = client.get_run(prod_versions[0].run_id)
prod_f1 = float(prod_run.data.metrics["f1"])
else:
prod_f1 = 0.0
staging_run = client.get_run(latest[0].run_id)
staging_f1 = float(staging_run.data.metrics["f1"])
# Promote if better
if staging_f1 > prod_f1 + 0.01: # 1% improvement threshold
client.transition_model_version_stage(
name="fraud-detection",
version=staging_version,
stage="Production",
archive_existing_versions=True,
)
print(f"Promoted v{staging_version}: F1 {staging_f1:.4f} > {prod_f1:.4f}")
else:
print(f"No promotion: staging F1 {staging_f1:.4f} vs prod {prod_f1:.4f}")
Feature Stores¶
Лучшие источники¶
Статьи: - Feast: Open Source Feature Store — Feast Docs - Feature Store Architecture — Tecton (2025) - Why Feature Stores Matter — Eugene Yan
Архитектура¶
graph TD
BJ[Batch Jobs<br/>Spark, Airflow] -->|write| OFF[Offline Store<br/>S3/BigQuery/Parquet]
OFF -->|materialize| ON[Online Store<br/>Redis/DynamoDB]
SJ[Stream Jobs<br/>Kafka + Flink] -->|write| ON
OFF -->|read| TR[Training]
ON -->|read| SRV[Serving]
style BJ fill:#e8eaf6,stroke:#3f51b5
style SJ fill:#e8eaf6,stroke:#3f51b5
style OFF fill:#fff3e0,stroke:#ef6c00
style ON fill:#e8f5e9,stroke:#4caf50
style TR fill:#f3e5f5,stroke:#9c27b0
style SRV fill:#f3e5f5,stroke:#9c27b0
Код: Feast Feature Store¶
# feature_repo/features.py
from feast import Entity, FeatureView, Field, FileSource
from feast.types import Float32, Int64
from datetime import timedelta
# Entity
user = Entity(name="user_id", join_keys=["user_id"])
# Source
user_features_source = FileSource(
path="data/user_features.parquet",
timestamp_field="event_timestamp",
)
# Feature view
user_features = FeatureView(
name="user_features",
entities=[user],
ttl=timedelta(days=1),
schema=[
Field(name="transaction_count_7d", dtype=Int64),
Field(name="avg_amount_30d", dtype=Float32),
Field(name="unique_merchants_7d", dtype=Int64),
Field(name="velocity_1h", dtype=Float32),
],
source=user_features_source,
online=True,
)
# Serving: fetch features for real-time inference
from feast import FeatureStore
store = FeatureStore(repo_path="feature_repo/")
# Online serving (low-latency)
features = store.get_online_features(
features=["user_features:transaction_count_7d", "user_features:avg_amount_30d"],
entity_rows=[{"user_id": 12345}],
).to_dict()
# Training: historical features with point-in-time join
training_df = store.get_historical_features(
entity_df=entity_df, # user_id + event_timestamp
features=["user_features:transaction_count_7d", "user_features:avg_amount_30d"],
).to_df()
Зачем нужен Feature Store¶
| Проблема | Без Feature Store | С Feature Store |
|---|---|---|
| Train-serve skew | Разный код для batch и online | Одно определение, два хранилища |
| Feature reuse | Copy-paste между командами | Централизованный каталог |
| Point-in-time correctness | Ручные joins с timestamp | Автоматический PIT join |
| Feature freshness | Ad-hoc проверки | Мониторинг + SLA |
Cost Optimization for ML Infrastructure¶
Лучшие источники¶
Статьи: - Reducing ML Inference Costs — Neptune.ai - Spot Instances for ML Training — AWS
Стратегии¶
| Стратегия | Экономия | Когда применять |
|---|---|---|
| Spot/Preemptible instances | 60-90% | Training (с checkpointing) |
| Model quantization (FP32 -> INT8) | ~4x compute | Inference |
| Knowledge distillation | 2-10x | Large model -> small |
| Batch inference | 3-5x throughput | Non-real-time predictions |
| Auto-scaling (scale to zero) | Variable | Low-traffic endpoints |
| Caching frequent predictions | 50-90% | Repeated inputs |
| Right-sizing GPU | 20-50% | Overprovisioned instances |
Код: Auto-scaling Config (K8s)¶
# HorizontalPodAutoscaler for ML serving
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: fraud-model-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: fraud-model
minReplicas: 1
maxReplicas: 10
metrics:
- type: Pods
pods:
metric:
name: inference_queue_length
target:
type: AverageValue
averageValue: "5" # Scale up when queue > 5
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
behavior:
scaleUp:
stabilizationWindowSeconds: 30
scaleDown:
stabilizationWindowSeconds: 300 # Wait 5min before scaling down
Обновлено: 2026-02-12