CDP 실험 관리

모델링 실습
타이타닉 & Bike Sharing

ctx API로 실험을 관리하고, 결과를 S3와 MLflow에 자동 기록합니다.

모델링 실습

타이타닉 & Bike Sharing Demand — ctx API 실험 흐름을 익힙니다.

ℹ️ 샘플 노트북 안내: 저장소의 samples/awesome_sean/modeling/titanic_modeling.ipynb은 boilerplate312 방식(수동 yaml 로드, 수동 artifact 저장)으로 작성되어 있습니다. 이 문서는 실무 기준인 CDP 프레임워크(gsr_ml_lab_cdp)로 동일한 실험을 구성하는 방법을 다룹니다.

실험 재현의 전제 조건

"모델을 만드는 것보다 다시 만들 수 있는 것이 더 중요합니다."
요소없을 때CDP 프레임워크에서
Config 파일파라미터를 코드 안에서 직접 수정model.yaml 한 파일만 수정
실험 ID어떤 결과인지 헷갈림{YYYYMMDD}_{HHMMSS}_{6hex} 자동 생성
Artifacts결과가 로컬에만 존재ctx.finish() 한 줄로 S3 + MLflow 동시 기록
ℹ️ 이 실습에서 여러분이 집중할 것: 모델 성능이 아니라 실험 흐름 구조입니다.

실습 1 · 타이타닉 생존 예측

항목내용
문제 유형이진 분류 (생존 여부 예측)
타겟 변수Survived (0: 사망, 1: 생존)
주요 피처Pclass, Sex, Age, SibSp, Parch, Fare, Embarked
평가 지표Accuracy, ROC-AUC, F1-Score

1단계
model.yaml 설정

⚠️ 디렉토리명은 반드시 titanic_survival_lightgbm/ 이어야 합니다. (experiment_topic + "_" + algorithm)

📋 복사해서 붙여넣기

# project/titanic_survival_lightgbm/config/model.yaml
worker_id:         YOUR_ID           # ← 본인 ID로 수정
experiment_topic:  titanic_survival
algorithm:         lightgbm

env_nm: dev

data_sources:
  - name: train
    path: s3://gs-cdp-mllab-dev/titanic/data/train.csv
  - name: test
    path: s3://gs-cdp-mllab-dev/titanic/data/test.csv

algorithm_params:            # 하이퍼파라미터 — 코드에 하드코딩 금지
  objective:        binary
  boosting:         gbdt
  max_depth:        6
  learning_rate:    0.05
  num_leaves:       31
  num_iterations:   200
  feature_fraction: 0.8
  bagging_fraction: 0.8
  bagging_freq:     5
  seed:             2026

data_params:
  valid_ratio: 0.2

features:
  target_col:       Survived
  numeric_cols:     [Age, Fare, SibSp, Parch]
  categorical_cols: [Sex, Pclass, Embarked]

task_type:  classification
target_col: Survived

2단계
라이브러리 임포트

ℹ️ ctx 객체는 노트북 실행 환경에서 프레임워크가 자동으로 초기화합니다. 설정 값은 ctx.model[...] / ctx.project[...]로 접근합니다.

📋 복사해서 붙여넣기

import warnings
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.model_selection import train_test_split
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    roc_auc_score, log_loss, confusion_matrix, roc_curve,
)
import lightgbm as lgb

warnings.filterwarnings('ignore')
plt.style.use('seaborn-v0_8-whitegrid')

3단계
데이터 로드 & 전처리

ℹ️ 파생 피처 3개(FamilySize, IsAlone, FarePerPerson)가 이 실습의 핵심 피처 엔지니어링입니다.

📋 복사해서 붙여넣기

# ── ctx에서 설정값 읽기 ────────────────────────────────────────────
target_col   = ctx.model['target_col']
valid_ratio  = ctx.model['data_params']['valid_ratio']
seed         = ctx.model['algorithm_params']['seed']
numeric_cols = ctx.model['features']['numeric_cols']

# ── 데이터 로드 ──────────────────────────────────────────────────
train_path = ctx.model['data_sources'][0]['path']
train_df   = pd.read_csv(train_path)

def preprocess(df):
    df = df.copy()

    # 결측치 처리
    df['Age']      = df['Age'].fillna(df['Age'].median())
    df['Embarked'] = df['Embarked'].fillna(df['Embarked'].mode()[0])
    df['Fare']     = df['Fare'].fillna(df['Fare'].median())

    # 파생 피처
    df['FamilySize']    = df['SibSp'] + df['Parch'] + 1
    df['IsAlone']       = (df['FamilySize'] == 1).astype(int)
    df['FarePerPerson'] = df['Fare'] / df['FamilySize']

    # 인코딩
    df['Sex'] = df['Sex'].map({'male': 1, 'female': 0})
    embarked_dummies = pd.get_dummies(df['Embarked'], prefix='Embarked')
    df = pd.concat([df, embarked_dummies], axis=1)

    feature_cols  = numeric_cols + ['Sex', 'Pclass', 'FamilySize', 'IsAlone', 'FarePerPerson']
    feature_cols += [c for c in df.columns if c.startswith('Embarked_')]

    X = df[feature_cols]
    y = df[target_col] if target_col in df.columns else None
    return X, y, feature_cols

X_full, y_full, feature_cols = preprocess(train_df)

X_train, X_valid, y_train, y_valid = train_test_split(
    X_full, y_full,
    test_size=valid_ratio,
    random_state=seed,
    stratify=y_full,
)
print(f"Train: {len(X_train)}, Valid: {len(X_valid)}, Features: {len(feature_cols)}")

# ── 전처리 데이터 저장 (Tier 1) ──────────────────────────────────
ctx.save_data(X_train, "X_train.parquet")
ctx.save_data(X_valid, "X_valid.parquet")
ctx.save_data(y_train.to_frame(), "y_train.parquet")
ctx.save_data(y_valid.to_frame(), "y_valid.parquet")

4단계
모델 학습

ℹ️ lgb.train() API를 사용합니다. evals_result에 iteration별 AUC / Log Loss가 기록되어 학습 곡선을 그릴 수 있습니다.

📋 복사해서 붙여넣기

# ── Tier 1에서 데이터 읽기 ──────────────────────────────────────
X_train = ctx.read_data("X_train.parquet")
X_valid = ctx.read_data("X_valid.parquet")
y_train = ctx.read_data("y_train.parquet")[target_col]
y_valid = ctx.read_data("y_valid.parquet")[target_col]

# ── ctx에서 하이퍼파라미터 읽기 ────────────────────────────────
hp = ctx.model['algorithm_params']

train_data = lgb.Dataset(X_train, label=y_train)
valid_data = lgb.Dataset(X_valid, label=y_valid, reference=train_data)

params = {k: v for k, v in hp.items() if k != 'num_iterations'}
params['metric']    = ['auc', 'binary_logloss']
params['verbosity'] = -1

evals_result = {}   # iteration별 지표 기록용

model = lgb.train(
    params,
    train_data,
    num_boost_round=hp['num_iterations'],
    valid_sets=[train_data, valid_data],
    valid_names=['train', 'valid'],
    callbacks=[
        lgb.log_evaluation(period=50),
        lgb.record_evaluation(evals_result),
    ],
)
print(f"Best iteration: {model.best_iteration}")

# ── 모델 저장 (Tier 2) ────────────────────────────────────────
ctx.save_step(model, "model.pkl")
model.save_model("/tmp/model.txt")
ctx.save_step(open("/tmp/model.txt").read(), "model.txt")

5단계
평가 & 아티팩트 저장

📋 복사해서 붙여넣기

def evaluate(model, X, y, name=''):
    y_prob = model.predict(X)
    y_pred = (y_prob >= 0.5).astype(int)
    metrics = {
        'accuracy':  round(accuracy_score(y, y_pred), 4),
        'precision': round(precision_score(y, y_pred), 4),
        'recall':    round(recall_score(y, y_pred), 4),
        'f1_score':  round(f1_score(y, y_pred), 4),
        'auc_roc':   round(roc_auc_score(y, y_prob), 4),
        'log_loss':  round(log_loss(y, y_prob), 4),
    }
    print(f"\n[{name}]", *[f"  {k}: {v}" for k, v in metrics.items()], sep='\n')
    return metrics, y_pred, y_prob

train_metrics, _, _                        = evaluate(model, X_train, y_train, 'Train')
valid_metrics, y_valid_pred, y_valid_proba = evaluate(model, X_valid, y_valid, 'Valid')
cm = confusion_matrix(y_valid, y_valid_pred)

# ── 피처 중요도 ──────────────────────────────────────────────────
fi = pd.DataFrame({
    'feature':    feature_cols,
    'importance': model.feature_importance(importance_type='gain'),
}).sort_values('importance', ascending=False)
fi['importance'] = fi['importance'] / fi['importance'].sum()

# ── 차트 4종 생성 & output 저장 (Tier 3) ─────────────────────
# 1. Feature Importance
fig, ax = plt.subplots(figsize=(10, 8))
sns.barplot(data=fi.head(10), x='importance', y='feature', palette='Blues_d', ax=ax)
ax.set_title('Feature Importance (Top 10)')
plt.tight_layout()
ctx.save_output(fig, "feature_importance.png", folder="charts")
plt.close()

# 2. ROC Curve
fpr, tpr, _ = roc_curve(y_valid, y_valid_proba)
fig, ax = plt.subplots(figsize=(8, 8))
ax.plot(fpr, tpr, label=f"AUC = {valid_metrics['auc_roc']:.4f}")
ax.plot([0, 1], [0, 1], 'k--')
ax.set_xlabel('FPR'); ax.set_ylabel('TPR')
ax.set_title('ROC Curve'); ax.legend()
plt.tight_layout()
ctx.save_output(fig, "roc_curve.png", folder="charts")
plt.close()

# 3. Confusion Matrix
tn, fp, fn, tp = cm.ravel()
fig, ax = plt.subplots(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', ax=ax,
            xticklabels=['Died', 'Survived'], yticklabels=['Died', 'Survived'])
ax.set_xlabel('Predicted'); ax.set_ylabel('Actual')
ax.set_title('Confusion Matrix')
plt.tight_layout()
ctx.save_output(fig, "confusion_matrix.png", folder="charts")
plt.close()

# 4. Learning Curve
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
for ax, metric, label in zip(axes, ['auc', 'binary_logloss'], ['AUC', 'Log Loss']):
    ax.plot(evals_result['train'][metric], label='Train')
    ax.plot(evals_result['valid'][metric], label='Valid')
    ax.set_title(f'Learning Curve — {label}')
    ax.set_xlabel('Iteration'); ax.set_ylabel(label)
    ax.legend(); ax.grid(alpha=0.3)
plt.tight_layout()
ctx.save_output(fig, "learning_curve.png", folder="charts")
plt.close()

6단계
ctx.finish() — 실험 마무리

📋 복사해서 붙여넣기

ctx.finish(
    metrics={
        "auc":       valid_metrics['auc_roc'],
        "f1":        valid_metrics['f1_score'],
        "accuracy":  valid_metrics['accuracy'],
        "precision": valid_metrics['precision'],
        "recall":    valid_metrics['recall'],
    },
    y_true=y_valid,
    y_pred=y_valid_pred,
    y_prob=y_valid_proba,
    extra={
        "best_iteration": model.best_iteration,
        "n_features":     len(feature_cols),
        "train_samples":  len(X_train),
        "valid_samples":  len(X_valid),
    },
)

ctx.finish() 호출 후 자동으로 처리되는 것:

자동 처리 항목저장 위치
metrics.jsonoutput/metrics/
표준 차트 (ROC, Confusion Matrix, Pred Distribution 등)output/charts/
experiment_report.htmloutput/reports/
MLflow run 등록MLflow 서버
run_manifest.yamloutput/
S3 전체 업로드runs/{run_id}/output/

MLflow 로깅 이해

ctx.finish()가 내부에서 하는 일 (참고)

mlflow.set_experiment(f"{ctx.project['project_nm']}_{ctx.model['worker_id']}")

with mlflow.start_run(run_name=ctx.run_name):
    mlflow.log_params(ctx.model['algorithm_params'])
    mlflow.log_metrics(metrics)
    mlflow.log_artifacts(str(ctx.output))
    mlflow.set_tags({
        "worker_id":  ctx.model['worker_id'],
        "env_nm":     ctx.model['env_nm'],
        "task_type":  ctx.model['task_type'],
    })

iteration별 학습 곡선 추가 등록 (선택)

ctx.finish()는 최종 지표만 MLflow에 기록합니다. Iteration별 차트가 필요하면 별도로 기록합니다.

📋 복사해서 붙여넣기

import mlflow

mlflow.set_tracking_uri(ctx.project['environments'][ctx.model['env_nm']]['mlflow_url'])
mlflow.set_experiment(f"{ctx.project['project_nm']}_{ctx.model['worker_id']}")

# ctx.finish() 이후 동일 run에 iteration별 곡선 추가
with mlflow.start_run(run_name=ctx.run_name):
    for i, (auc, logloss) in enumerate(zip(
        evals_result['valid']['auc'],
        evals_result['valid']['binary_logloss'],
    )):
        mlflow.log_metric("valid_auc_iter",     auc,     step=i)
        mlflow.log_metric("valid_logloss_iter", logloss, step=i)

여러 run 한 번에 비교

📋 복사해서 붙여넣기

runs_df = mlflow.search_runs(
    experiment_names=[f"{ctx.project['project_nm']}_{ctx.model['worker_id']}"],
    filter_string="metrics.auc > 0.85",
    order_by=["metrics.auc DESC"],
)

cols = ['run_id', 'start_time', 'metrics.auc', 'metrics.f1',
        'params.learning_rate', 'params.num_leaves']
print(runs_df[cols].head(5).to_string(index=False))

시스템 자원 모니터링

ℹ️ SageMaker 학습 비용은 인스턴스 시간 × 단가입니다. CPU / 메모리 / 디스크 사용량을 기록해두면 적정 인스턴스 타입 선택과 병목 진단에 활용할 수 있습니다.
pip install psutil

스냅샷 — 현재 자원 상태

📋 복사해서 붙여넣기

import psutil

def get_system_snapshot(label=""):
    mem  = psutil.virtual_memory()
    disk = psutil.disk_usage('/')
    snap = {
        "cpu_pct":      psutil.cpu_percent(interval=1),
        "mem_used_gb":  round(mem.used  / 1024**3, 2),
        "mem_total_gb": round(mem.total / 1024**3, 2),
        "mem_pct":      mem.percent,
        "disk_used_gb": round(disk.used / 1024**3, 2),
        "disk_free_gb": round(disk.free / 1024**3, 2),
        "swap_pct":     psutil.swap_memory().percent,
    }
    print(f"\n[{label}]")
    print(f"  CPU:   {snap['cpu_pct']:.1f}%")
    print(f"  메모리: {snap['mem_used_gb']} / {snap['mem_total_gb']} GB ({snap['mem_pct']:.1f}%)")
    print(f"  디스크: {snap['disk_used_gb']} GB 사용 / {snap['disk_free_gb']} GB 여유")
    print(f"  Swap:  {snap['swap_pct']:.1f}%")
    return snap

실시간 모니터링 — 학습 중 폴링

📋 복사해서 붙여넣기

import threading, time

resource_log = []

def monitor_resources(interval=5, stop_event=None):
    while not stop_event.is_set():
        mem = psutil.virtual_memory()
        resource_log.append({
            "ts":          time.time(),
            "cpu_pct":     psutil.cpu_percent(interval=None),
            "mem_pct":     mem.percent,
            "mem_used_gb": round(mem.used / 1024**3, 2),
        })
        time.sleep(interval)

stop_event     = threading.Event()
monitor_thread = threading.Thread(
    target=monitor_resources,
    kwargs={"interval": 5, "stop_event": stop_event},
    daemon=True,
)
monitor_thread.start()

model = lgb.train(params, train_data, ...)   # 학습

stop_event.set()
monitor_thread.join()

log_df = pd.DataFrame(resource_log)
print(f"CPU 최대:    {log_df['cpu_pct'].max():.1f}%")
print(f"메모리 최대: {log_df['mem_pct'].max():.1f}% ({log_df['mem_used_gb'].max():.2f} GB)")

시스템 지표를 ctx.finish()와 함께 기록

📋 복사해서 붙여넣기

ctx.finish(
    metrics={
        "auc":            valid_metrics['auc_roc'],
        "f1":             valid_metrics['f1_score'],
        # ── 시스템 지표 추가 ───────────────────────
        "sys_cpu_max":    log_df['cpu_pct'].max(),
        "sys_mem_max_gb": log_df['mem_used_gb'].max(),
        "sys_mem_pct":    log_df['mem_pct'].max(),
    },
    y_true=y_valid,
    y_pred=y_valid_pred,
    y_prob=y_valid_proba,
)

실습 2 · Bike Sharing Demand 수요 예측

항목내용
문제 유형회귀 (자전거 대여 수 예측)
타겟 변수count (시간당 대여 수)
주요 피처season, weather, temp, humidity, windspeed, hour, weekday
평가 지표RMSLE (Root Mean Squared Log Error)
ℹ️ Bike Sharing은 타이타닉과 동일한 CDP 구조를 회귀 문제에 적용하는 실습입니다. conf, ctx API, ctx.finish() 흐름은 그대로이며, 피처 엔지니어링과 평가 지표만 다릅니다.

model.yaml 핵심 차이점

task_type:  regression      # classification → regression
target_col: count

algorithm_params:
  objective:      regression
  metric:         rmse
  num_iterations: 500
  learning_rate:  0.03
  max_depth:      8
  num_leaves:     50

시간 기반 피처 엔지니어링

📋 복사해서 붙여넣기

df['datetime']     = pd.to_datetime(df['datetime'])
df['hour']         = df['datetime'].dt.hour
df['day_of_week']  = df['datetime'].dt.dayofweek
df['month']        = df['datetime'].dt.month
df['is_peak_hour'] = df['hour'].isin([7, 8, 17, 18, 19]).astype(int)
df['is_weekend']   = (df['day_of_week'] >= 5).astype(int)

ctx.save_data(X_train, "X_train.parquet")
ctx.save_data(y_train.to_frame(), "y_train.parquet")

RMSLE 평가 & 마무리

📋 복사해서 붙여넣기

import numpy as np
from sklearn.metrics import mean_squared_error

# log1p 변환 후 학습 → 역변환 → RMSLE
y_log = np.log1p(y_train)
ctx.save_step(model, "model.pkl")

pred_log = model.predict(X_valid)
rmsle    = np.sqrt(mean_squared_error(np.log1p(y_valid), pred_log))
print(f"RMSLE: {rmsle:.4f}")

ctx.finish(
    metrics={"rmsle": rmsle},
    extra={"log_transform": True},
)

전체 결과 확인 체크리스트

확인 항목
model.yamlworker_idproject.yaml workers_id 목록에 있는가?
디렉토리명이 experiment_topic + "_" + algorithm과 일치하는가?
ctx.save_data()로 전처리 데이터를 저장했는가?
ctx.save_step(model, "model.pkl")로 모델을 저장했는가?
차트를 ctx.save_output(fig, "*.png", folder="charts")로 저장했는가?
ctx.finish(metrics=...)를 마지막 step 끝에 호출했는가?
MLflow UI에서 내 run이 보이는가?
S3 runs/{run_id}/output/에 결과가 업로드됐는가?