Перейти к содержанию

Дата-инженерия: учебные материалы

~3 минуты чтения

Предварительно: Подготовка к интервью дата-инженерия | Классический ML

Data leakage -- причина номер один провала ML-моделей в production: по данным Kaggle (2025), 23% соревновательных решений содержат ту или иную утечку данных. PySpark обрабатывает петабайты данных, но 80% ошибок связаны со shuffle и skew. Ниже -- материалы для 2 задач: Data Leakage и Spark (RDD/DataFrame) с кодом обнаружения утечек и оптимизации пайплайнов.

Обновлено: 2026-02-11


Обзор задач

ID Задача Сложность Ключевые темы
de_001 Data Leakage Medium Target leakage, train-test contamination, temporal leakage
de_002 Spark (RDD/DataFrame) Medium PySpark, distributed computing, ML pipelines

1. Data Leakage

Лучшие источники

Статьи: - Data Leakage in Machine Learning: Prevention Guide — North Haven Analytics (Dec 2025) - Will You Spot the Leaks? A Data Science Challenge — Towards Data Science (May 2025) - 3 Subtle Ways Data Leakage Can Ruin Your Models — ML Mastery (Dec 2025) - Interview Questions On Data Leakage — Medium (Feb 2025)

Papers: - Don't push the button! Exploring data leakage risks in ML — Springer (2025)

Типы Data Leakage

1. Target Leakage
   - Признак коррелирует с target, но недоступен в production
   - Пример: "is_fraud" в данных для fraud detection

2. Train-Test Contamination
   - Информация из test влияет на training
   - Пример: imputation/scaling на всём датасете

3. Temporal Leakage
   - Будущие данные попадают в train
   - Пример: случайный split для time series

4. Group Leakage
   - Одинаковые entities в train и test
   - Пример: разные записи одного клиента

Код: Обнаружение Data Leakage

import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split

class DataLeakageDetector:
    """Detect common data leakage patterns"""

    @staticmethod
    def check_target_correlation(df, target, threshold=0.9):
        """Find features highly correlated with target"""
        numeric_cols = df.select_dtypes(include=[np.number]).columns
        correlations = df[numeric_cols].corrwith(df[target]).abs()

        suspicious = correlations[correlations > threshold]
        if len(suspicious) > 0:
            print(f"Potential target leakage: {suspicious}")
        return suspicious

    @staticmethod
    def check_train_test_overlap(train, test, id_column):
        """Check for duplicate IDs in train and test"""
        train_ids = set(train[id_column])
        test_ids = set(test[id_column])
        overlap = train_ids & test_ids

        if overlap:
            print(f"Found {len(overlap)} overlapping IDs")
        return overlap

    @staticmethod
    def check_temporal_ordering(df, date_column, target=None):
        """Check if data is properly ordered"""
        dates = pd.to_datetime(df[date_column])
        if not dates.is_monotonic_increasing:
            print("Warning: Data not temporally ordered")
        return dates.is_monotonic_increasing

# Пример использования
detector = DataLeakageDetector()
detector.check_target_correlation(df, 'target', threshold=0.95)

Код: Правильный Pipeline без Leakage

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score

# НЕПРАВИЛЬНО: fit на всём датасете
# scaler = StandardScaler()
# X_scaled = scaler.fit_transform(X)  # LEAKAGE!
# X_train, X_test, y_train, y_test = train_test_split(X_scaled, y)

# ПРАВИЛЬНО: Pipeline с правильным порядком
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

pipeline = Pipeline([
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler()),
    ('model', RandomForestClassifier())
])

# fit только на train, transform применяется к обоим
pipeline.fit(X_train, y_train)
score = pipeline.score(X_test, y_test)

Temporal Split для Time Series

def temporal_train_test_split(df, date_column, test_size=0.2):
    """
    Proper temporal split for time series data
    Prevents temporal leakage
    """
    df = df.sort_values(date_column)
    split_idx = int(len(df) * (1 - test_size))

    train = df.iloc[:split_idx]
    test = df.iloc[split_idx:]

    return train, test

# Пример
train, test = temporal_train_test_split(df, 'date', test_size=0.2)
print(f"Train: {train['date'].min()} to {train['date'].max()}")
print(f"Test: {test['date'].min()} to {test['date'].max()}")

2. Spark (RDD/DataFrame)

Лучшие источники

Официальная документация: - PySpark 4.1.0 Documentation - Spark MLlib Guide

Статьи: - Unleashing ML with PySpark: A Comprehensive Guide — Medium - Spark MLlib Integration with Python PySpark — Johal - Building End-to-End Data Engineering Pipeline — MarkTechPost (Nov 2025)

RDD vs DataFrame

RDD (Resilient Distributed Dataset):
- Low-level API
- Type-safe (compile-time)
- Manual optimization
- Use case: unstructured data, custom partitioning

DataFrame:
- High-level API (SQL-like)
- Catalyst optimizer
- Lazy evaluation
- Use case: structured data, ML pipelines (preferred)

Код: PySpark DataFrame Operations

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Initialize Spark
spark = SparkSession.builder \
    .appName("ML_Pipeline") \
    .getOrCreate()

# Read data
df = spark.read.csv("data.csv", header=True, inferSchema=True)

# Basic operations
df = df.filter(F.col("value") > 0) \
       .withColumn("log_value", F.log(F.col("value"))) \
       .groupBy("category") \
       .agg(F.mean("log_value").alias("mean_log_value"))

# Show schema
df.printSchema()
df.show(5)

Код: ML Pipeline в PySpark

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import LogisticRegression

# Feature engineering
categorical_cols = ["category", "region"]
numeric_cols = ["age", "income", "score"]

# Create pipeline stages
stages = []

# 1. String indexing for categorical columns
for col in categorical_cols:
    indexer = StringIndexer(inputCol=col, outputCol=f"{col}_indexed")
    stages.append(indexer)

# 2. One-hot encoding
for col in categorical_cols:
    encoder = OneHotEncoder(inputCol=f"{col}_indexed", outputCol=f"{col}_vec")
    stages.append(encoder)

# 3. Assemble features
assembler_inputs = [f"{col}_vec" for col in categorical_cols] + numeric_cols
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")
stages.append(assembler)

# 4. Model
lr = LogisticRegression(featuresCol="features", labelCol="label")
stages.append(lr)

# Create pipeline
pipeline = Pipeline(stages=stages)

# Train-test split
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

# Fit pipeline
model = pipeline.fit(train_data)

# Predictions
predictions = model.transform(test_data)
predictions.select("label", "prediction", "probability").show(5)

Код: RDD Operations (Legacy)

# Create RDD
rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])

# Transformations (lazy)
mapped = rdd.map(lambda x: x * 2)
filtered = mapped.filter(lambda x: x > 4)

# Actions (trigger execution)
result = filtered.collect()  # [6, 8, 10]
count = filtered.count()      # 3

# RDD to DataFrame
df = rdd.map(lambda x: (x, x*2)).toDF(["value", "doubled"])

# DataFrame to RDD
rdd_from_df = df.rdd.map(lambda row: row.value)

Performance Tips

# Caching frequently used DataFrames
df.cache()
df.count()  # Trigger caching

# Broadcast join for small tables
from pyspark.sql.functions import broadcast
result = large_df.join(broadcast(small_df), "key")

# Repartition for skewed data
df = df.repartition(100, "key")

# Coalesce to reduce partitions (no shuffle)
df = df.coalesce(10)

Заблуждение: StandardScaler можно применить до train-test split

Fit scaler на всём датасете = information leakage из test в train. Средние и std из test данных влияют на transform train данных. Правильно: split -> fit scaler ТОЛЬКО на train -> transform обоих. Используйте sklearn.pipeline.Pipeline для автоматического контроля.

Заблуждение: random split подходит для time series

Random split для time series = temporal leakage. Модель видит будущие данные при обучении. Используйте temporal split: train на данных до точки T, test -- после T. Для cross-validation используйте TimeSeriesSplit.

Заблуждение: DataFrame API в Spark всегда быстрее RDD

DataFrame API с Catalyst optimizer быстрее для 95% задач. Но для custom partitioning, сложных UDF или unstructured data RDD даёт больше контроля. Broadcast join (<10MB default) ускоряет joins малых таблиц в 10-100x.

Cross-References

Связи с другими категориями: - Data Leakage <- ML System Design (A/B Testing) - Data Leakage <- Statistics (train/test split) - Spark <- Classical ML (distributed algorithms)


Видео-ресурсы

  1. Databricks Academy — PySpark fundamentals
  2. Spark Official YouTube — MLlib tutorials
  3. DataBricks ML Flow — end-to-end ML

Обновлено: 2026-02-11