Дата-инженерия: учебные материалы¶
~3 минуты чтения
Предварительно: Подготовка к интервью дата-инженерия | Классический ML
Data leakage -- причина номер один провала ML-моделей в production: по данным Kaggle (2025), 23% соревновательных решений содержат ту или иную утечку данных. PySpark обрабатывает петабайты данных, но 80% ошибок связаны со shuffle и skew. Ниже -- материалы для 2 задач: Data Leakage и Spark (RDD/DataFrame) с кодом обнаружения утечек и оптимизации пайплайнов.
Обновлено: 2026-02-11
Обзор задач¶
| ID | Задача | Сложность | Ключевые темы |
|---|---|---|---|
| de_001 | Data Leakage | Medium | Target leakage, train-test contamination, temporal leakage |
| de_002 | Spark (RDD/DataFrame) | Medium | PySpark, distributed computing, ML pipelines |
1. Data Leakage¶
Лучшие источники¶
Статьи: - Data Leakage in Machine Learning: Prevention Guide — North Haven Analytics (Dec 2025) - Will You Spot the Leaks? A Data Science Challenge — Towards Data Science (May 2025) - 3 Subtle Ways Data Leakage Can Ruin Your Models — ML Mastery (Dec 2025) - Interview Questions On Data Leakage — Medium (Feb 2025)
Papers: - Don't push the button! Exploring data leakage risks in ML — Springer (2025)
Типы Data Leakage¶
1. Target Leakage
- Признак коррелирует с target, но недоступен в production
- Пример: "is_fraud" в данных для fraud detection
2. Train-Test Contamination
- Информация из test влияет на training
- Пример: imputation/scaling на всём датасете
3. Temporal Leakage
- Будущие данные попадают в train
- Пример: случайный split для time series
4. Group Leakage
- Одинаковые entities в train и test
- Пример: разные записи одного клиента
Код: Обнаружение Data Leakage¶
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
class DataLeakageDetector:
"""Detect common data leakage patterns"""
@staticmethod
def check_target_correlation(df, target, threshold=0.9):
"""Find features highly correlated with target"""
numeric_cols = df.select_dtypes(include=[np.number]).columns
correlations = df[numeric_cols].corrwith(df[target]).abs()
suspicious = correlations[correlations > threshold]
if len(suspicious) > 0:
print(f"Potential target leakage: {suspicious}")
return suspicious
@staticmethod
def check_train_test_overlap(train, test, id_column):
"""Check for duplicate IDs in train and test"""
train_ids = set(train[id_column])
test_ids = set(test[id_column])
overlap = train_ids & test_ids
if overlap:
print(f"Found {len(overlap)} overlapping IDs")
return overlap
@staticmethod
def check_temporal_ordering(df, date_column, target=None):
"""Check if data is properly ordered"""
dates = pd.to_datetime(df[date_column])
if not dates.is_monotonic_increasing:
print("Warning: Data not temporally ordered")
return dates.is_monotonic_increasing
# Пример использования
detector = DataLeakageDetector()
detector.check_target_correlation(df, 'target', threshold=0.95)
Код: Правильный Pipeline без Leakage¶
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
# НЕПРАВИЛЬНО: fit на всём датасете
# scaler = StandardScaler()
# X_scaled = scaler.fit_transform(X) # LEAKAGE!
# X_train, X_test, y_train, y_test = train_test_split(X_scaled, y)
# ПРАВИЛЬНО: Pipeline с правильным порядком
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
pipeline = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler()),
('model', RandomForestClassifier())
])
# fit только на train, transform применяется к обоим
pipeline.fit(X_train, y_train)
score = pipeline.score(X_test, y_test)
Temporal Split для Time Series¶
def temporal_train_test_split(df, date_column, test_size=0.2):
"""
Proper temporal split for time series data
Prevents temporal leakage
"""
df = df.sort_values(date_column)
split_idx = int(len(df) * (1 - test_size))
train = df.iloc[:split_idx]
test = df.iloc[split_idx:]
return train, test
# Пример
train, test = temporal_train_test_split(df, 'date', test_size=0.2)
print(f"Train: {train['date'].min()} to {train['date'].max()}")
print(f"Test: {test['date'].min()} to {test['date'].max()}")
2. Spark (RDD/DataFrame)¶
Лучшие источники¶
Официальная документация: - PySpark 4.1.0 Documentation - Spark MLlib Guide
Статьи: - Unleashing ML with PySpark: A Comprehensive Guide — Medium - Spark MLlib Integration with Python PySpark — Johal - Building End-to-End Data Engineering Pipeline — MarkTechPost (Nov 2025)
RDD vs DataFrame¶
RDD (Resilient Distributed Dataset):
- Low-level API
- Type-safe (compile-time)
- Manual optimization
- Use case: unstructured data, custom partitioning
DataFrame:
- High-level API (SQL-like)
- Catalyst optimizer
- Lazy evaluation
- Use case: structured data, ML pipelines (preferred)
Код: PySpark DataFrame Operations¶
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# Initialize Spark
spark = SparkSession.builder \
.appName("ML_Pipeline") \
.getOrCreate()
# Read data
df = spark.read.csv("data.csv", header=True, inferSchema=True)
# Basic operations
df = df.filter(F.col("value") > 0) \
.withColumn("log_value", F.log(F.col("value"))) \
.groupBy("category") \
.agg(F.mean("log_value").alias("mean_log_value"))
# Show schema
df.printSchema()
df.show(5)
Код: ML Pipeline в PySpark¶
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import LogisticRegression
# Feature engineering
categorical_cols = ["category", "region"]
numeric_cols = ["age", "income", "score"]
# Create pipeline stages
stages = []
# 1. String indexing for categorical columns
for col in categorical_cols:
indexer = StringIndexer(inputCol=col, outputCol=f"{col}_indexed")
stages.append(indexer)
# 2. One-hot encoding
for col in categorical_cols:
encoder = OneHotEncoder(inputCol=f"{col}_indexed", outputCol=f"{col}_vec")
stages.append(encoder)
# 3. Assemble features
assembler_inputs = [f"{col}_vec" for col in categorical_cols] + numeric_cols
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")
stages.append(assembler)
# 4. Model
lr = LogisticRegression(featuresCol="features", labelCol="label")
stages.append(lr)
# Create pipeline
pipeline = Pipeline(stages=stages)
# Train-test split
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)
# Fit pipeline
model = pipeline.fit(train_data)
# Predictions
predictions = model.transform(test_data)
predictions.select("label", "prediction", "probability").show(5)
Код: RDD Operations (Legacy)¶
# Create RDD
rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])
# Transformations (lazy)
mapped = rdd.map(lambda x: x * 2)
filtered = mapped.filter(lambda x: x > 4)
# Actions (trigger execution)
result = filtered.collect() # [6, 8, 10]
count = filtered.count() # 3
# RDD to DataFrame
df = rdd.map(lambda x: (x, x*2)).toDF(["value", "doubled"])
# DataFrame to RDD
rdd_from_df = df.rdd.map(lambda row: row.value)
Performance Tips¶
# Caching frequently used DataFrames
df.cache()
df.count() # Trigger caching
# Broadcast join for small tables
from pyspark.sql.functions import broadcast
result = large_df.join(broadcast(small_df), "key")
# Repartition for skewed data
df = df.repartition(100, "key")
# Coalesce to reduce partitions (no shuffle)
df = df.coalesce(10)
Заблуждение: StandardScaler можно применить до train-test split
Fit scaler на всём датасете = information leakage из test в train. Средние и std из test данных влияют на transform train данных. Правильно: split -> fit scaler ТОЛЬКО на train -> transform обоих. Используйте sklearn.pipeline.Pipeline для автоматического контроля.
Заблуждение: random split подходит для time series
Random split для time series = temporal leakage. Модель видит будущие данные при обучении. Используйте temporal split: train на данных до точки T, test -- после T. Для cross-validation используйте TimeSeriesSplit.
Заблуждение: DataFrame API в Spark всегда быстрее RDD
DataFrame API с Catalyst optimizer быстрее для 95% задач. Но для custom partitioning, сложных UDF или unstructured data RDD даёт больше контроля. Broadcast join (<10MB default) ускоряет joins малых таблиц в 10-100x.
Cross-References¶
Связи с другими категориями: - Data Leakage <- ML System Design (A/B Testing) - Data Leakage <- Statistics (train/test split) - Spark <- Classical ML (distributed algorithms)
Видео-ресурсы¶
- Databricks Academy — PySpark fundamentals
- Spark Official YouTube — MLlib tutorials
- DataBricks ML Flow — end-to-end ML
Обновлено: 2026-02-11