Перейти к содержанию

MLOps: учебные материалы

~4 минуты чтения

Предварительно: Подготовка к интервью MLOps | LLMOps vs MLOps

По данным Gartner (2025), только 53% ML-моделей добираются до production. MLOps -- набор практик, который сокращает time-to-production с 6 месяцев до 2 недель. На интервью спрашивают model serving (FastAPI + A/B), experiment tracking (MLflow vs W&B), CI/CD для ML, drift detection и feature stores. Ниже -- материалы с production-ready кодом.

Обновлено: 2026-02-11


Обзор задачи

ID Задача Сложность Ключевые темы
mlops_001 Model Serving (FastAPI + A/B) Medium API design, A/B routing, monitoring

Model Serving с FastAPI

Лучшие источники

Статьи: - The Complete MLOps/LLMOps Roadmap for 2026 — Medium (2026) - Model Deployment with Docker and FastAPI — ML Mastery (July 2025) - MLOps at Scale: Serving Transformers — Donald Simpson (Dec 2025) - From Notebook to Production: MLOps Guide 2025 — Medium

Код: FastAPI Model Serving

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import joblib
import numpy as np
from contextlib import asynccontextmanager

# Model loading at startup
@asynccontextmanager
async def lifespan(app: FastAPI):
    # Load models
    app.state.model_a = joblib.load("model_a.pkl")
    app.state.model_b = joblib.load("model_b.pkl")
    yield
    # Cleanup
    pass

app = FastAPI(lifespan=lifespan)

class PredictRequest(BaseModel):
    features: list[float]

class PredictResponse(BaseModel):
    prediction: float
    model_version: str

@app.post("/predict", response_model=PredictResponse)
async def predict(request: PredictRequest):
    features = np.array([request.features])
    prediction = app.state.model_a.predict(features)[0]
    return PredictResponse(prediction=prediction, model_version="a")

@app.get("/health")
async def health():
    return {"status": "healthy"}

Код: A/B Testing Router

import random
from fastapi import Request

@app.post("/predict_ab")
async def predict_ab(request: PredictRequest, req: Request):
    """A/B test routing between two models"""
    # Get user ID from header or generate
    user_id = req.headers.get("X-User-ID", str(random.random()))

    # Deterministic routing based on user_id
    use_model_b = hash(user_id) % 100 < 20  # 20% traffic to model B

    features = np.array([request.features])

    if use_model_b:
        prediction = app.state.model_b.predict(features)[0]
        model_version = "b"
    else:
        prediction = app.state.model_a.predict(features)[0]
        model_version = "a"

    # Log for A/B analysis
    log_prediction(user_id, model_version, prediction)

    return PredictResponse(prediction=prediction, model_version=model_version)

Dockerfile

FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY model_a.pkl model_b.pkl .
COPY main.py .

EXPOSE 8000

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

Monitoring

from prometheus_client import Counter, Histogram, generate_latest
from fastapi import Response

# Metrics
PREDICTION_COUNT = Counter('predictions_total', 'Total predictions', ['model_version'])
PREDICTION_LATENCY = Histogram('prediction_latency_seconds', 'Prediction latency')

@app.get("/metrics")
async def metrics():
    return Response(content=generate_latest(), media_type="text/plain")

@app.post("/predict")
@PREDICTION_LATENCY.time()
async def predict(request: PredictRequest):
    PREDICTION_COUNT.labels(model_version="a").inc()
    # ... prediction logic

Experiment Tracking (MLflow vs W&B)

Лучшие источники

Статьи: - MLflow Official Docs — LF AI Foundation - Weights & Biases Docs — W&B - MLflow vs W&B vs Neptune Comparison — Neptune.ai (2025)

Сравнение фреймворков

Feature MLflow W&B Neptune
Self-hosted Yes No (SaaS) Hybrid
Open-source Yes (Apache 2.0) Client only No
UI quality Basic Excellent Good
Model registry Built-in Artifacts Built-in
Pricing Free Free tier, paid teams Paid
GPU monitoring No Yes (nvidia-smi) Yes

Код: MLflow Tracking

import mlflow
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score

mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("fraud-detection-v2")

with mlflow.start_run(run_name="rf-baseline"):
    # Log hyperparameters
    params = {"n_estimators": 200, "max_depth": 10, "min_samples_leaf": 5}
    mlflow.log_params(params)

    # Train
    model = RandomForestClassifier(**params)
    model.fit(X_train, y_train)

    # Evaluate
    y_pred = model.predict(X_test)
    metrics = {
        "accuracy": accuracy_score(y_test, y_pred),
        "f1": f1_score(y_test, y_pred),
    }
    mlflow.log_metrics(metrics)

    # Log model with signature
    from mlflow.models import infer_signature
    signature = infer_signature(X_train, y_pred)
    mlflow.sklearn.log_model(model, "model", signature=signature)

    # Register model
    mlflow.register_model(
        f"runs:/{mlflow.active_run().info.run_id}/model",
        "fraud-detection"
    )

Код: W&B Tracking

import wandb
from sklearn.ensemble import RandomForestClassifier

wandb.init(project="fraud-detection", name="rf-baseline", config={
    "n_estimators": 200,
    "max_depth": 10,
    "min_samples_leaf": 5,
})

model = RandomForestClassifier(**wandb.config)
model.fit(X_train, y_train)

y_pred = model.predict(X_test)
wandb.log({"accuracy": accuracy_score(y_test, y_pred), "f1": f1_score(y_test, y_pred)})

# Log confusion matrix
wandb.log({"confusion_matrix": wandb.plot.confusion_matrix(
    y_true=y_test, preds=y_pred, class_names=["legit", "fraud"]
)})
wandb.finish()

ML CI/CD Pipeline

Лучшие источники

Статьи: - CI/CD for Machine Learning — Made With ML - Testing ML Systems — Made With ML - ML Testing: Beyond Unit Tests — Eugene Yan (2025)

GitHub Actions Pipeline

# .github/workflows/ml-pipeline.yml
name: ML CI/CD

on:
  push:
    branches: [main]
    paths: ['models/**', 'features/**', 'tests/**']
  pull_request:
    branches: [main]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: '3.11'
      - run: pip install -r requirements.txt

      # Unit tests
      - name: Unit tests
        run: pytest tests/unit/ -v

      # Data validation
      - name: Data quality checks
        run: pytest tests/data/ -v

      # Model tests
      - name: Model quality tests
        run: pytest tests/model/ -v --timeout=300

  train:
    needs: test
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'
    steps:
      - uses: actions/checkout@v4
      - name: Train model
        run: python train.py --config configs/prod.yaml
      - name: Validate metrics
        run: python validate.py --min-accuracy 0.85 --min-f1 0.80
      - name: Register model
        run: python register.py --stage staging

Код: ML-Specific Tests

# tests/data/test_data_quality.py
import pandas as pd
import pytest

def test_no_data_leakage(train_df, test_df):
    """Train and test sets must not overlap"""
    overlap = set(train_df["user_id"]) & set(test_df["user_id"])
    assert len(overlap) == 0, f"Leakage: {len(overlap)} shared users"

def test_feature_distributions(train_df, test_df):
    """Feature distributions should be similar (PSI < 0.25)"""
    for col in train_df.select_dtypes(include="number").columns:
        psi = calculate_psi(train_df[col], test_df[col])
        assert psi < 0.25, f"PSI({col}) = {psi:.3f} > 0.25"

def test_no_nulls_in_required(df):
    """Required features must not have nulls"""
    required = ["user_id", "amount", "timestamp"]
    for col in required:
        assert df[col].notna().all(), f"Nulls in {col}: {df[col].isna().sum()}"

# tests/model/test_model_quality.py
def test_model_not_random(model, X_test, y_test):
    """Model must beat random baseline"""
    y_pred = model.predict(X_test)
    acc = accuracy_score(y_test, y_pred)
    random_baseline = max(y_test.mean(), 1 - y_test.mean())
    assert acc > random_baseline, f"Model ({acc:.3f}) worse than random ({random_baseline:.3f})"

def test_model_latency(model, X_test):
    """Single prediction must be < 50ms"""
    import time
    start = time.perf_counter()
    model.predict(X_test[:1])
    latency_ms = (time.perf_counter() - start) * 1000
    assert latency_ms < 50, f"Latency {latency_ms:.1f}ms > 50ms"

def test_model_invariance(model):
    """Doubling feature should not flip prediction"""
    x = np.array([[100, 0.5, 1]])
    pred1 = model.predict_proba(x)[0][1]
    x_doubled = x * 2
    pred2 = model.predict_proba(x_doubled)[0][1]
    assert abs(pred1 - pred2) < 0.5, "Model too sensitive to scaling"

Monitoring & Drift Detection

Лучшие источники

Статьи: - ML Monitoring Best Practices — Evidently AI - Data Drift vs Concept Drift — Evidently AI (2025) - NannyML: Estimating Performance — NannyML Docs

Типы дрифта

Тип Что меняется Пример Метрика
Data drift P(X) Новая география пользователей PSI, KS-test
Concept drift P(Y|X) Fraud tactics evolve Performance drop
Label drift P(Y) Seasonal class imbalance Label distribution
Upstream drift Feature pipeline API format changed Schema validation

Код: PSI (Population Stability Index)

import numpy as np

def calculate_psi(baseline: np.ndarray, current: np.ndarray, bins: int = 10) -> float:
    """
    PSI < 0.1:  No change
    PSI 0.1-0.25: Moderate change
    PSI > 0.25: Significant change (retrain!)
    """
    # Create bins from baseline
    breakpoints = np.percentile(baseline, np.linspace(0, 100, bins + 1))
    breakpoints[-1] = np.inf
    breakpoints[0] = -np.inf

    # Calculate proportions
    baseline_counts = np.histogram(baseline, bins=breakpoints)[0] / len(baseline)
    current_counts = np.histogram(current, bins=breakpoints)[0] / len(current)

    # Avoid division by zero
    baseline_counts = np.clip(baseline_counts, 1e-4, None)
    current_counts = np.clip(current_counts, 1e-4, None)

    # PSI formula
    psi = np.sum((current_counts - baseline_counts) * np.log(current_counts / baseline_counts))
    return psi

Код: Evidently AI Monitoring

from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, TargetDriftPreset

column_mapping = ColumnMapping(
    target="is_fraud",
    prediction="prediction",
    numerical_features=["amount", "hour", "velocity"],
    categorical_features=["country", "device_type"],
)

# Data drift report
drift_report = Report(metrics=[DataDriftPreset()])
drift_report.run(reference_data=train_df, current_data=prod_df, column_mapping=column_mapping)
drift_report.save_html("drift_report.html")

# Extract results programmatically
result = drift_report.as_dict()
dataset_drift = result["metrics"][0]["result"]["dataset_drift"]
drifted_features = [
    col for col, info in result["metrics"][0]["result"]["drift_by_columns"].items()
    if info["drift_detected"]
]
print(f"Dataset drift: {dataset_drift}, Drifted features: {drifted_features}")

Model Registry & Governance

Лучшие источники

Статьи: - MLflow Model Registry — MLflow Docs - ML Model Governance — Neptune.ai

Заблуждение: если модель хорошо работает на test set, она будет работать в production

Data drift (изменение P(X)) и concept drift (изменение P(Y|X)) разрушают производительность. По данным Evidently AI (2025), 91% production моделей деградируют в первые 3 месяца. PSI > 0.25 = нужен retrain. Мониторинг обязателен.

Заблуждение: MLflow и W&B -- взаимозаменяемы

MLflow: open-source, self-hosted, сильный model registry, бесплатный. W&B: SaaS, лучшая визуализация и collaboration, GPU monitoring. Многие команды используют оба: W&B для экспериментов, MLflow для registry и deployment. На интервью ожидают знание trade-offs.

Заблуждение: feature store нужен только большим компаниям

Train-serving skew (разный код для offline/online features) -- причина 60% багов ML в production (Google, 2022). Feature store решает это с одним определением фичи для двух хранилищ. Feast -- open-source, поднимается за час.

Lifecycle Stages

graph LR
    DEV[Development<br/>Train, Log, Test] --> STG[Staging<br/>Validate, Shadow, A/B]
    STG --> PROD[Production<br/>Monitor, Alert, Retrain]
    PROD --> ARCH[Archived]

    style DEV fill:#e8eaf6,stroke:#3f51b5
    style STG fill:#fff3e0,stroke:#ef6c00
    style PROD fill:#e8f5e9,stroke:#4caf50
    style ARCH fill:#f3e5f5,stroke:#9c27b0

Код: MLflow Model Promotion

from mlflow import MlflowClient

client = MlflowClient()

# Get latest staging model
latest = client.get_latest_versions("fraud-detection", stages=["Staging"])
staging_version = latest[0].version

# Compare staging vs production
prod_versions = client.get_latest_versions("fraud-detection", stages=["Production"])
if prod_versions:
    prod_run = client.get_run(prod_versions[0].run_id)
    prod_f1 = float(prod_run.data.metrics["f1"])
else:
    prod_f1 = 0.0

staging_run = client.get_run(latest[0].run_id)
staging_f1 = float(staging_run.data.metrics["f1"])

# Promote if better
if staging_f1 > prod_f1 + 0.01:  # 1% improvement threshold
    client.transition_model_version_stage(
        name="fraud-detection",
        version=staging_version,
        stage="Production",
        archive_existing_versions=True,
    )
    print(f"Promoted v{staging_version}: F1 {staging_f1:.4f} > {prod_f1:.4f}")
else:
    print(f"No promotion: staging F1 {staging_f1:.4f} vs prod {prod_f1:.4f}")

Feature Stores

Лучшие источники

Статьи: - Feast: Open Source Feature Store — Feast Docs - Feature Store Architecture — Tecton (2025) - Why Feature Stores Matter — Eugene Yan

Архитектура

graph TD
    BJ[Batch Jobs<br/>Spark, Airflow] -->|write| OFF[Offline Store<br/>S3/BigQuery/Parquet]
    OFF -->|materialize| ON[Online Store<br/>Redis/DynamoDB]
    SJ[Stream Jobs<br/>Kafka + Flink] -->|write| ON
    OFF -->|read| TR[Training]
    ON -->|read| SRV[Serving]

    style BJ fill:#e8eaf6,stroke:#3f51b5
    style SJ fill:#e8eaf6,stroke:#3f51b5
    style OFF fill:#fff3e0,stroke:#ef6c00
    style ON fill:#e8f5e9,stroke:#4caf50
    style TR fill:#f3e5f5,stroke:#9c27b0
    style SRV fill:#f3e5f5,stroke:#9c27b0

Код: Feast Feature Store

# feature_repo/features.py
from feast import Entity, FeatureView, Field, FileSource
from feast.types import Float32, Int64
from datetime import timedelta

# Entity
user = Entity(name="user_id", join_keys=["user_id"])

# Source
user_features_source = FileSource(
    path="data/user_features.parquet",
    timestamp_field="event_timestamp",
)

# Feature view
user_features = FeatureView(
    name="user_features",
    entities=[user],
    ttl=timedelta(days=1),
    schema=[
        Field(name="transaction_count_7d", dtype=Int64),
        Field(name="avg_amount_30d", dtype=Float32),
        Field(name="unique_merchants_7d", dtype=Int64),
        Field(name="velocity_1h", dtype=Float32),
    ],
    source=user_features_source,
    online=True,
)
# Serving: fetch features for real-time inference
from feast import FeatureStore

store = FeatureStore(repo_path="feature_repo/")

# Online serving (low-latency)
features = store.get_online_features(
    features=["user_features:transaction_count_7d", "user_features:avg_amount_30d"],
    entity_rows=[{"user_id": 12345}],
).to_dict()

# Training: historical features with point-in-time join
training_df = store.get_historical_features(
    entity_df=entity_df,  # user_id + event_timestamp
    features=["user_features:transaction_count_7d", "user_features:avg_amount_30d"],
).to_df()

Зачем нужен Feature Store

Проблема Без Feature Store С Feature Store
Train-serve skew Разный код для batch и online Одно определение, два хранилища
Feature reuse Copy-paste между командами Централизованный каталог
Point-in-time correctness Ручные joins с timestamp Автоматический PIT join
Feature freshness Ad-hoc проверки Мониторинг + SLA

Cost Optimization for ML Infrastructure

Лучшие источники

Статьи: - Reducing ML Inference Costs — Neptune.ai - Spot Instances for ML Training — AWS

Стратегии

Стратегия Экономия Когда применять
Spot/Preemptible instances 60-90% Training (с checkpointing)
Model quantization (FP32 -> INT8) ~4x compute Inference
Knowledge distillation 2-10x Large model -> small
Batch inference 3-5x throughput Non-real-time predictions
Auto-scaling (scale to zero) Variable Low-traffic endpoints
Caching frequent predictions 50-90% Repeated inputs
Right-sizing GPU 20-50% Overprovisioned instances

Код: Auto-scaling Config (K8s)

# HorizontalPodAutoscaler for ML serving
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: fraud-model-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: fraud-model
  minReplicas: 1
  maxReplicas: 10
  metrics:
    - type: Pods
      pods:
        metric:
          name: inference_queue_length
        target:
          type: AverageValue
          averageValue: "5"  # Scale up when queue > 5
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 70
  behavior:
    scaleUp:
      stabilizationWindowSeconds: 30
    scaleDown:
      stabilizationWindowSeconds: 300  # Wait 5min before scaling down

Обновлено: 2026-02-12