CDP 규칙 가이드
데이터 저장 규칙
3-Tier ctx API
전처리 데이터 / step 아티팩트 / 최종 산출물을 각 계층에 맞게 저장하는 방법
데이터 저장 계층
3-tier ctx API — 데이터 종류에 따라 올바른 계층에 저장합니다.
왜 3계층인가?
실험 하나 = 여러 개의 step (노트북)
01_preprocess.ipynb → 02_feature.ipynb → 03_train.ipynb → 04_evaluate.ipynb
각 step이 생성하는 데이터의 성격이 다르다:
대용량 DataFrame → Tier 1 (data) ctx.save_data
모델·메타 파일 → Tier 2 (step) ctx.save_step
최종 실험 기록 → Tier 3 (output) ctx.save_output / ctx.finish
DS가 tier를 선택하면, 경로와 S3 sync는 프레임워크가 자동으로 처리합니다.
Tier 1
전처리 데이터 — save_data / read_data
여러 step이 공유하는 대용량 데이터 (DataFrame splits 등)를 저장하는 공간.
test_dt가 같으면 run이 달라도 같은 경로를 공유 (스토리지 절약)runs/최종 산출물에 복사되지 않음
저장 경로:
로컬: project/{model_dir}/data/{test_dt}/
S3: s3://{bucket}/{env_nm}/{project_nm}/{worker_id}/{model_dir}/data/{test_dt}/
📋 복사해서 붙여넣기
# ── 01_preprocess: 전처리 결과 저장 ──────────────────────────
ctx.save_data(df_train, "X_train.parquet")
ctx.save_data(y_train.to_frame("label"), "y_train.parquet")
ctx.save_data(df_test, "X_test.parquet")
ctx.save_data({"n_features": 42}, "schema.json")
# ── 02_feature / 03_train: 전처리 결과 읽기 ─────────────────
X_train = ctx.read_data("X_train.parquet") # → DataFrame
y_train = ctx.read_data("y_train.parquet")["label"]
schema = ctx.read_data("schema.json") # → dict
| 확장자 | 직렬화 방식 |
|---|---|
.parquet | pandas.to_parquet / read_parquet |
.csv | pandas.to_csv / read_csv |
.json | json.dumps / json.loads |
.joblib | joblib.dump / joblib.load |
.pkl | pickle.dump / pickle.load |
| 미지원 확장자 | pickle fallback + run.log WARNING |
Tier 2
step 간 경량 아티팩트 — save_step / read_step
모델, 인코더, 수치 파일 등 step 사이에 주고받는 경량 파일을 저장하는 공간.
- 각 step이 자신의 폴더(
steps/{step_name}/)에만 씀 - S3에는
latest/로 관리 (이전 run 결과를 덮어씀)
저장 경로:
로컬: project/{model_dir}/steps/{step_name}/
S3: s3://{bucket}/{project_nm}/{worker_id}/{model_dir}/steps/{step_name}/latest/
📋 복사해서 붙여넣기
# ── 현재 step 산출물 저장 ─────────────────────────────────
ctx.save_step(scaler, "scaler.joblib")
ctx.save_step({"train_shape": list(df.shape)}, "split_info.json")
ctx.save_step(model, "model.pkl") # predict 있으면 → finalize 시 output/model/ 자동 복사
# ── 이전 step 산출물 읽기 (파일명만으로 검색) ────────────────
scaler = ctx.read_step("scaler.joblib")
split_info = ctx.read_step("split_info.json")
model = ctx.read_step("model.pkl")
# ── 2-arg 형식 (step 명시, Path 반환) ───────────────────────
path = ctx.read_step("01_preprocess", "scaler.joblib")
| 데이터 종류 | 권장 확장자 |
|---|---|
| DataFrame (전처리·피처 결과) | .parquet |
| sklearn 인코더·스케일러·파이프라인 | .joblib |
| dict / metrics / 설정값 | .json |
| LightGBM / XGBoost 모델 | .txt (ctx.save_step(model, "model.txt")) |
| 기타 Python 객체 | .pkl |
⚠️ step 간 파일명은 반드시 고유해야 합니다. 동일 파일명이 두 step에 있으면
ConflictError가 발생합니다.latest/ 동작 규칙
step 종료 시: 현재 step의 기존 latest/ 객체 전부 삭제 → 현재 결과만 유지
다음 step 시작 시: 모든 step의 latest/ 파일을 평탄하게(flat) 다운로드
첫 step 재실행 = 새 run_id 발급
Tier 3
최종 산출물 — save_output / ctx.finish
실험 한 번(run)의 기록 전체. 프레임워크가 대부분 자동 생성하고, DS는 save_output으로 추가 산출물만 채웁니다.
save_output 허용 폴더 (4개만)
# folder 인자는 반드시 아래 4개 중 하나. 그 외 즉시 ValueError
ctx.save_output(obj, "filename", folder="model") # 모델 아티팩트
ctx.save_output(obj, "filename", folder="charts") # 차트 이미지 / HTML
ctx.save_output(obj, "filename", folder="metrics") # 지표 파일
ctx.save_output(obj, "filename", folder="reports") # HTML 리포트
📋 복사해서 붙여넣기
# ── 세분화 지표 ──────────────────────────────────────────────
ctx.save_output(df_per_category, "per_category.csv", folder="metrics")
ctx.save_output({"bias": 0.03}, "bias_by_promo.json", folder="metrics")
# ── 추가 차트 ────────────────────────────────────────────────
ctx.save_output(fig, "feature_importance.png", folder="charts")
ctx.save_output(html, "pred_vs_actual.html", folder="charts")
# ── 추가 모델 아티팩트 ───────────────────────────────────────
ctx.save_output(encoder, "label_encoder.joblib", folder="model")
ctx.save_output(schema, "feature_schema.json", folder="model")
ctx.finish() — 실험 마무리
마지막 step 끝에 반드시 호출합니다.
📋 복사해서 붙여넣기
ctx.finish(
metrics={"auc": 0.912, "f1": 0.873}, # 선택 — 있으면 metrics.json 자동 저장
y_true=y_valid, # 선택 — 있으면 표준 차트 자동 생성
y_pred=y_pred,
y_prob=y_prob, # 선택 (분류만)
extra={"experiment_note": "v2 적용"}, # 선택 — run_manifest.yaml에 기록
)
| 산출물 | 조건 | 저장 위치 |
|---|---|---|
metrics.json | metrics= 인자 전달 시 | output/metrics/ |
| 표준 차트 | y_true / y_pred 전달 시 | output/charts/ |
experiment_report.html | 항상 | output/reports/ |
| MLflow 등록 | 항상 | MLflow 서버 |
run_manifest.yaml | 항상 | output/ |
task_type별 자동 생성 차트
| task_type | 자동 생성 차트 |
|---|---|
classification | Confusion Matrix, ROC Curve, Precision-Recall Curve, Pred Distribution |
regression | Actual vs Predicted, Residual Plot, Error Distribution |
ranking | Score Distribution, Cumulative Gain |
clustering | Cluster Distribution |
전체 4-step 노트북 예시
📋 복사해서 붙여넣기
# ── step 01_preprocess ────────────────────────────────────────
ctx.save_data(df_clean, "df_clean.parquet")
ctx.save_data({"n_features": 42}, "schema.json")
# ── step 02_feature ───────────────────────────────────────────
df_clean = ctx.read_data("df_clean.parquet")
scaler = StandardScaler().fit(df_clean)
X_scaled = scaler.transform(df_clean)
ctx.save_step(scaler, "scaler.joblib")
ctx.save_step(X_scaled, "X_train.parquet")
# ── step 03_train ─────────────────────────────────────────────
scaler = ctx.read_step("scaler.joblib")
X_train = ctx.read_step("X_train.parquet")
model = lgb.LGBMClassifier(**ctx.model['algorithm_params'])
model.fit(X_train, y_train)
ctx.save_step(model, "model.pkl") # predict 있음 → output/model/ 자동 복사
# ── step 04_evaluate (마지막 step) ────────────────────────────
model = ctx.read_step("model.pkl")
y_pred = model.predict(X_test)
y_prob = model.predict_proba(X_test)[:, 1]
# 추가 산출물
df_per_cat = compute_per_category_metrics(y_test, y_pred)
ctx.save_output(df_per_cat, "per_category.csv", folder="metrics")
fig = plot_feature_importance(model)
ctx.save_output(fig, "feature_importance.png", folder="charts")
# 실험 마무리 (자동: MLflow 등록 + 리포트 + S3 업로드)
ctx.finish(
metrics={"auc": roc_auc_score(y_test, y_prob), "f1": f1_score(y_test, y_pred)},
y_true=y_test,
y_pred=y_pred,
y_prob=y_prob,
)
ctx 메서드 전체 요약
# ── Tier 1: data ──────────────────────────────────────────────
ctx.save_data(obj, "filename.parquet") # data/{test_dt}/ 저장
ctx.read_data("filename.parquet") # → 역직렬화 객체 반환
# ── Tier 2: step ──────────────────────────────────────────────
ctx.save_step(obj, "filename.joblib") # steps/{step_name}/ 저장
ctx.read_step("filename.joblib") # steps/ 전체 검색 → 역직렬화 객체
ctx.read_step("01_preprocess", "file.joblib") # 명시적 step → Path 반환
# ── Tier 3: output ────────────────────────────────────────────
ctx.save_output(obj, "name.csv", folder="metrics") # output/{folder}/ 저장
ctx.finish(metrics={...}, y_true=..., y_pred=...) # 자동: MLflow + 리포트 + S3
# ── 경로 속성 (참조만, 직접 조립 금지) ─────────────────────────
ctx.data # data/{test_dt}/
ctx.step # steps/{step_name}/
ctx.output # runs/{run_id}/output/
ctx.run_dir # runs/{run_id}/
⚠️
ctx.output / "predictions" / "pred.parquet" 형태의 직접 경로 조립은 동작하지만 권장하지 않습니다. S3 sync 시 누락될 수 있습니다.