CDP 실험 관리
모델링 실습
타이타닉 & Bike Sharing
ctx API로 실험을 관리하고, 결과를 S3와 MLflow에 자동 기록합니다.
모델링 실습
타이타닉 & Bike Sharing Demand — ctx API 실험 흐름을 익힙니다.
ℹ️ 샘플 노트북 안내: 저장소의
samples/awesome_sean/modeling/titanic_modeling.ipynb은 boilerplate312 방식(수동 yaml 로드, 수동 artifact 저장)으로 작성되어 있습니다. 이 문서는 실무 기준인 CDP 프레임워크(gsr_ml_lab_cdp)로 동일한 실험을 구성하는 방법을 다룹니다.실험 재현의 전제 조건
"모델을 만드는 것보다 다시 만들 수 있는 것이 더 중요합니다."
| 요소 | 없을 때 | CDP 프레임워크에서 |
|---|---|---|
| Config 파일 | 파라미터를 코드 안에서 직접 수정 | model.yaml 한 파일만 수정 |
| 실험 ID | 어떤 결과인지 헷갈림 | {YYYYMMDD}_{HHMMSS}_{6hex} 자동 생성 |
| Artifacts | 결과가 로컬에만 존재 | ctx.finish() 한 줄로 S3 + MLflow 동시 기록 |
ℹ️ 이 실습에서 여러분이 집중할 것: 모델 성능이 아니라 실험 흐름 구조입니다.
실습 1 · 타이타닉 생존 예측
| 항목 | 내용 |
|---|---|
| 문제 유형 | 이진 분류 (생존 여부 예측) |
| 타겟 변수 | Survived (0: 사망, 1: 생존) |
| 주요 피처 | Pclass, Sex, Age, SibSp, Parch, Fare, Embarked |
| 평가 지표 | Accuracy, ROC-AUC, F1-Score |
1단계 model.yaml 설정
⚠️ 디렉토리명은 반드시
titanic_survival_lightgbm/ 이어야 합니다. (experiment_topic + "_" + algorithm)📋 복사해서 붙여넣기
# project/titanic_survival_lightgbm/config/model.yaml
worker_id: YOUR_ID # ← 본인 ID로 수정
experiment_topic: titanic_survival
algorithm: lightgbm
env_nm: dev
data_sources:
- name: train
path: s3://gs-cdp-mllab-dev/titanic/data/train.csv
- name: test
path: s3://gs-cdp-mllab-dev/titanic/data/test.csv
algorithm_params: # 하이퍼파라미터 — 코드에 하드코딩 금지
objective: binary
boosting: gbdt
max_depth: 6
learning_rate: 0.05
num_leaves: 31
num_iterations: 200
feature_fraction: 0.8
bagging_fraction: 0.8
bagging_freq: 5
seed: 2026
data_params:
valid_ratio: 0.2
features:
target_col: Survived
numeric_cols: [Age, Fare, SibSp, Parch]
categorical_cols: [Sex, Pclass, Embarked]
task_type: classification
target_col: Survived
2단계 라이브러리 임포트
ℹ️
ctx 객체는 노트북 실행 환경에서 프레임워크가 자동으로 초기화합니다. 설정 값은 ctx.model[...] / ctx.project[...]로 접근합니다.📋 복사해서 붙여넣기
import warnings
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
accuracy_score, precision_score, recall_score, f1_score,
roc_auc_score, log_loss, confusion_matrix, roc_curve,
)
import lightgbm as lgb
warnings.filterwarnings('ignore')
plt.style.use('seaborn-v0_8-whitegrid')
3단계 데이터 로드 & 전처리
ℹ️ 파생 피처 3개(
FamilySize, IsAlone, FarePerPerson)가 이 실습의 핵심 피처 엔지니어링입니다.📋 복사해서 붙여넣기
# ── ctx에서 설정값 읽기 ────────────────────────────────────────────
target_col = ctx.model['target_col']
valid_ratio = ctx.model['data_params']['valid_ratio']
seed = ctx.model['algorithm_params']['seed']
numeric_cols = ctx.model['features']['numeric_cols']
# ── 데이터 로드 ──────────────────────────────────────────────────
train_path = ctx.model['data_sources'][0]['path']
train_df = pd.read_csv(train_path)
def preprocess(df):
df = df.copy()
# 결측치 처리
df['Age'] = df['Age'].fillna(df['Age'].median())
df['Embarked'] = df['Embarked'].fillna(df['Embarked'].mode()[0])
df['Fare'] = df['Fare'].fillna(df['Fare'].median())
# 파생 피처
df['FamilySize'] = df['SibSp'] + df['Parch'] + 1
df['IsAlone'] = (df['FamilySize'] == 1).astype(int)
df['FarePerPerson'] = df['Fare'] / df['FamilySize']
# 인코딩
df['Sex'] = df['Sex'].map({'male': 1, 'female': 0})
embarked_dummies = pd.get_dummies(df['Embarked'], prefix='Embarked')
df = pd.concat([df, embarked_dummies], axis=1)
feature_cols = numeric_cols + ['Sex', 'Pclass', 'FamilySize', 'IsAlone', 'FarePerPerson']
feature_cols += [c for c in df.columns if c.startswith('Embarked_')]
X = df[feature_cols]
y = df[target_col] if target_col in df.columns else None
return X, y, feature_cols
X_full, y_full, feature_cols = preprocess(train_df)
X_train, X_valid, y_train, y_valid = train_test_split(
X_full, y_full,
test_size=valid_ratio,
random_state=seed,
stratify=y_full,
)
print(f"Train: {len(X_train)}, Valid: {len(X_valid)}, Features: {len(feature_cols)}")
# ── 전처리 데이터 저장 (Tier 1) ──────────────────────────────────
ctx.save_data(X_train, "X_train.parquet")
ctx.save_data(X_valid, "X_valid.parquet")
ctx.save_data(y_train.to_frame(), "y_train.parquet")
ctx.save_data(y_valid.to_frame(), "y_valid.parquet")
4단계 모델 학습
ℹ️
lgb.train() API를 사용합니다. evals_result에 iteration별 AUC / Log Loss가 기록되어 학습 곡선을 그릴 수 있습니다.📋 복사해서 붙여넣기
# ── Tier 1에서 데이터 읽기 ──────────────────────────────────────
X_train = ctx.read_data("X_train.parquet")
X_valid = ctx.read_data("X_valid.parquet")
y_train = ctx.read_data("y_train.parquet")[target_col]
y_valid = ctx.read_data("y_valid.parquet")[target_col]
# ── ctx에서 하이퍼파라미터 읽기 ────────────────────────────────
hp = ctx.model['algorithm_params']
train_data = lgb.Dataset(X_train, label=y_train)
valid_data = lgb.Dataset(X_valid, label=y_valid, reference=train_data)
params = {k: v for k, v in hp.items() if k != 'num_iterations'}
params['metric'] = ['auc', 'binary_logloss']
params['verbosity'] = -1
evals_result = {} # iteration별 지표 기록용
model = lgb.train(
params,
train_data,
num_boost_round=hp['num_iterations'],
valid_sets=[train_data, valid_data],
valid_names=['train', 'valid'],
callbacks=[
lgb.log_evaluation(period=50),
lgb.record_evaluation(evals_result),
],
)
print(f"Best iteration: {model.best_iteration}")
# ── 모델 저장 (Tier 2) ────────────────────────────────────────
ctx.save_step(model, "model.pkl")
model.save_model("/tmp/model.txt")
ctx.save_step(open("/tmp/model.txt").read(), "model.txt")
5단계 평가 & 아티팩트 저장
📋 복사해서 붙여넣기
def evaluate(model, X, y, name=''):
y_prob = model.predict(X)
y_pred = (y_prob >= 0.5).astype(int)
metrics = {
'accuracy': round(accuracy_score(y, y_pred), 4),
'precision': round(precision_score(y, y_pred), 4),
'recall': round(recall_score(y, y_pred), 4),
'f1_score': round(f1_score(y, y_pred), 4),
'auc_roc': round(roc_auc_score(y, y_prob), 4),
'log_loss': round(log_loss(y, y_prob), 4),
}
print(f"\n[{name}]", *[f" {k}: {v}" for k, v in metrics.items()], sep='\n')
return metrics, y_pred, y_prob
train_metrics, _, _ = evaluate(model, X_train, y_train, 'Train')
valid_metrics, y_valid_pred, y_valid_proba = evaluate(model, X_valid, y_valid, 'Valid')
cm = confusion_matrix(y_valid, y_valid_pred)
# ── 피처 중요도 ──────────────────────────────────────────────────
fi = pd.DataFrame({
'feature': feature_cols,
'importance': model.feature_importance(importance_type='gain'),
}).sort_values('importance', ascending=False)
fi['importance'] = fi['importance'] / fi['importance'].sum()
# ── 차트 4종 생성 & output 저장 (Tier 3) ─────────────────────
# 1. Feature Importance
fig, ax = plt.subplots(figsize=(10, 8))
sns.barplot(data=fi.head(10), x='importance', y='feature', palette='Blues_d', ax=ax)
ax.set_title('Feature Importance (Top 10)')
plt.tight_layout()
ctx.save_output(fig, "feature_importance.png", folder="charts")
plt.close()
# 2. ROC Curve
fpr, tpr, _ = roc_curve(y_valid, y_valid_proba)
fig, ax = plt.subplots(figsize=(8, 8))
ax.plot(fpr, tpr, label=f"AUC = {valid_metrics['auc_roc']:.4f}")
ax.plot([0, 1], [0, 1], 'k--')
ax.set_xlabel('FPR'); ax.set_ylabel('TPR')
ax.set_title('ROC Curve'); ax.legend()
plt.tight_layout()
ctx.save_output(fig, "roc_curve.png", folder="charts")
plt.close()
# 3. Confusion Matrix
tn, fp, fn, tp = cm.ravel()
fig, ax = plt.subplots(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', ax=ax,
xticklabels=['Died', 'Survived'], yticklabels=['Died', 'Survived'])
ax.set_xlabel('Predicted'); ax.set_ylabel('Actual')
ax.set_title('Confusion Matrix')
plt.tight_layout()
ctx.save_output(fig, "confusion_matrix.png", folder="charts")
plt.close()
# 4. Learning Curve
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
for ax, metric, label in zip(axes, ['auc', 'binary_logloss'], ['AUC', 'Log Loss']):
ax.plot(evals_result['train'][metric], label='Train')
ax.plot(evals_result['valid'][metric], label='Valid')
ax.set_title(f'Learning Curve — {label}')
ax.set_xlabel('Iteration'); ax.set_ylabel(label)
ax.legend(); ax.grid(alpha=0.3)
plt.tight_layout()
ctx.save_output(fig, "learning_curve.png", folder="charts")
plt.close()
6단계 ctx.finish() — 실험 마무리
📋 복사해서 붙여넣기
ctx.finish(
metrics={
"auc": valid_metrics['auc_roc'],
"f1": valid_metrics['f1_score'],
"accuracy": valid_metrics['accuracy'],
"precision": valid_metrics['precision'],
"recall": valid_metrics['recall'],
},
y_true=y_valid,
y_pred=y_valid_pred,
y_prob=y_valid_proba,
extra={
"best_iteration": model.best_iteration,
"n_features": len(feature_cols),
"train_samples": len(X_train),
"valid_samples": len(X_valid),
},
)
ctx.finish() 호출 후 자동으로 처리되는 것:
| 자동 처리 항목 | 저장 위치 |
|---|---|
metrics.json | output/metrics/ |
| 표준 차트 (ROC, Confusion Matrix, Pred Distribution 등) | output/charts/ |
experiment_report.html | output/reports/ |
| MLflow run 등록 | MLflow 서버 |
run_manifest.yaml | output/ |
| S3 전체 업로드 | runs/{run_id}/output/ |
MLflow 로깅 이해
ctx.finish()가 내부에서 하는 일 (참고)
mlflow.set_experiment(f"{ctx.project['project_nm']}_{ctx.model['worker_id']}")
with mlflow.start_run(run_name=ctx.run_name):
mlflow.log_params(ctx.model['algorithm_params'])
mlflow.log_metrics(metrics)
mlflow.log_artifacts(str(ctx.output))
mlflow.set_tags({
"worker_id": ctx.model['worker_id'],
"env_nm": ctx.model['env_nm'],
"task_type": ctx.model['task_type'],
})
iteration별 학습 곡선 추가 등록 (선택)
ctx.finish()는 최종 지표만 MLflow에 기록합니다. Iteration별 차트가 필요하면 별도로 기록합니다.
📋 복사해서 붙여넣기
import mlflow
mlflow.set_tracking_uri(ctx.project['environments'][ctx.model['env_nm']]['mlflow_url'])
mlflow.set_experiment(f"{ctx.project['project_nm']}_{ctx.model['worker_id']}")
# ctx.finish() 이후 동일 run에 iteration별 곡선 추가
with mlflow.start_run(run_name=ctx.run_name):
for i, (auc, logloss) in enumerate(zip(
evals_result['valid']['auc'],
evals_result['valid']['binary_logloss'],
)):
mlflow.log_metric("valid_auc_iter", auc, step=i)
mlflow.log_metric("valid_logloss_iter", logloss, step=i)
여러 run 한 번에 비교
📋 복사해서 붙여넣기
runs_df = mlflow.search_runs(
experiment_names=[f"{ctx.project['project_nm']}_{ctx.model['worker_id']}"],
filter_string="metrics.auc > 0.85",
order_by=["metrics.auc DESC"],
)
cols = ['run_id', 'start_time', 'metrics.auc', 'metrics.f1',
'params.learning_rate', 'params.num_leaves']
print(runs_df[cols].head(5).to_string(index=False))
시스템 자원 모니터링
ℹ️ SageMaker 학습 비용은 인스턴스 시간 × 단가입니다. CPU / 메모리 / 디스크 사용량을 기록해두면 적정 인스턴스 타입 선택과 병목 진단에 활용할 수 있습니다.
pip install psutil
스냅샷 — 현재 자원 상태
📋 복사해서 붙여넣기
import psutil
def get_system_snapshot(label=""):
mem = psutil.virtual_memory()
disk = psutil.disk_usage('/')
snap = {
"cpu_pct": psutil.cpu_percent(interval=1),
"mem_used_gb": round(mem.used / 1024**3, 2),
"mem_total_gb": round(mem.total / 1024**3, 2),
"mem_pct": mem.percent,
"disk_used_gb": round(disk.used / 1024**3, 2),
"disk_free_gb": round(disk.free / 1024**3, 2),
"swap_pct": psutil.swap_memory().percent,
}
print(f"\n[{label}]")
print(f" CPU: {snap['cpu_pct']:.1f}%")
print(f" 메모리: {snap['mem_used_gb']} / {snap['mem_total_gb']} GB ({snap['mem_pct']:.1f}%)")
print(f" 디스크: {snap['disk_used_gb']} GB 사용 / {snap['disk_free_gb']} GB 여유")
print(f" Swap: {snap['swap_pct']:.1f}%")
return snap
실시간 모니터링 — 학습 중 폴링
📋 복사해서 붙여넣기
import threading, time
resource_log = []
def monitor_resources(interval=5, stop_event=None):
while not stop_event.is_set():
mem = psutil.virtual_memory()
resource_log.append({
"ts": time.time(),
"cpu_pct": psutil.cpu_percent(interval=None),
"mem_pct": mem.percent,
"mem_used_gb": round(mem.used / 1024**3, 2),
})
time.sleep(interval)
stop_event = threading.Event()
monitor_thread = threading.Thread(
target=monitor_resources,
kwargs={"interval": 5, "stop_event": stop_event},
daemon=True,
)
monitor_thread.start()
model = lgb.train(params, train_data, ...) # 학습
stop_event.set()
monitor_thread.join()
log_df = pd.DataFrame(resource_log)
print(f"CPU 최대: {log_df['cpu_pct'].max():.1f}%")
print(f"메모리 최대: {log_df['mem_pct'].max():.1f}% ({log_df['mem_used_gb'].max():.2f} GB)")
시스템 지표를 ctx.finish()와 함께 기록
📋 복사해서 붙여넣기
ctx.finish(
metrics={
"auc": valid_metrics['auc_roc'],
"f1": valid_metrics['f1_score'],
# ── 시스템 지표 추가 ───────────────────────
"sys_cpu_max": log_df['cpu_pct'].max(),
"sys_mem_max_gb": log_df['mem_used_gb'].max(),
"sys_mem_pct": log_df['mem_pct'].max(),
},
y_true=y_valid,
y_pred=y_valid_pred,
y_prob=y_valid_proba,
)
실습 2 · Bike Sharing Demand 수요 예측
| 항목 | 내용 |
|---|---|
| 문제 유형 | 회귀 (자전거 대여 수 예측) |
| 타겟 변수 | count (시간당 대여 수) |
| 주요 피처 | season, weather, temp, humidity, windspeed, hour, weekday |
| 평가 지표 | RMSLE (Root Mean Squared Log Error) |
ℹ️ Bike Sharing은 타이타닉과 동일한 CDP 구조를 회귀 문제에 적용하는 실습입니다. conf, ctx API,
ctx.finish() 흐름은 그대로이며, 피처 엔지니어링과 평가 지표만 다릅니다.model.yaml 핵심 차이점
task_type: regression # classification → regression
target_col: count
algorithm_params:
objective: regression
metric: rmse
num_iterations: 500
learning_rate: 0.03
max_depth: 8
num_leaves: 50
시간 기반 피처 엔지니어링
📋 복사해서 붙여넣기
df['datetime'] = pd.to_datetime(df['datetime'])
df['hour'] = df['datetime'].dt.hour
df['day_of_week'] = df['datetime'].dt.dayofweek
df['month'] = df['datetime'].dt.month
df['is_peak_hour'] = df['hour'].isin([7, 8, 17, 18, 19]).astype(int)
df['is_weekend'] = (df['day_of_week'] >= 5).astype(int)
ctx.save_data(X_train, "X_train.parquet")
ctx.save_data(y_train.to_frame(), "y_train.parquet")
RMSLE 평가 & 마무리
📋 복사해서 붙여넣기
import numpy as np
from sklearn.metrics import mean_squared_error
# log1p 변환 후 학습 → 역변환 → RMSLE
y_log = np.log1p(y_train)
ctx.save_step(model, "model.pkl")
pred_log = model.predict(X_valid)
rmsle = np.sqrt(mean_squared_error(np.log1p(y_valid), pred_log))
print(f"RMSLE: {rmsle:.4f}")
ctx.finish(
metrics={"rmsle": rmsle},
extra={"log_transform": True},
)
전체 결과 확인 체크리스트
| ☐ | 확인 항목 |
|---|---|
| ☐ | model.yaml의 worker_id가 project.yaml workers_id 목록에 있는가? |
| ☐ | 디렉토리명이 experiment_topic + "_" + algorithm과 일치하는가? |
| ☐ | ctx.save_data()로 전처리 데이터를 저장했는가? |
| ☐ | ctx.save_step(model, "model.pkl")로 모델을 저장했는가? |
| ☐ | 차트를 ctx.save_output(fig, "*.png", folder="charts")로 저장했는가? |
| ☐ | ctx.finish(metrics=...)를 마지막 step 끝에 호출했는가? |
| ☐ | MLflow UI에서 내 run이 보이는가? |
| ☐ | S3 runs/{run_id}/output/에 결과가 업로드됐는가? |