CDP 규칙 가이드

데이터 저장 규칙
3-Tier ctx API

전처리 데이터 / step 아티팩트 / 최종 산출물을 각 계층에 맞게 저장하는 방법

데이터 저장 계층

3-tier ctx API — 데이터 종류에 따라 올바른 계층에 저장합니다.

왜 3계층인가?

실험 하나 = 여러 개의 step (노트북)
  01_preprocess.ipynb → 02_feature.ipynb → 03_train.ipynb → 04_evaluate.ipynb

각 step이 생성하는 데이터의 성격이 다르다:
  대용량 DataFrame  →  Tier 1 (data)     ctx.save_data
  모델·메타 파일    →  Tier 2 (step)     ctx.save_step
  최종 실험 기록    →  Tier 3 (output)   ctx.save_output / ctx.finish

DS가 tier를 선택하면, 경로와 S3 sync는 프레임워크가 자동으로 처리합니다.


Tier 1

전처리 데이터 — save_data / read_data

여러 step이 공유하는 대용량 데이터 (DataFrame splits 등)를 저장하는 공간.

  • test_dt가 같으면 run이 달라도 같은 경로를 공유 (스토리지 절약)
  • runs/ 최종 산출물에 복사되지 않음

저장 경로:

로컬:  project/{model_dir}/data/{test_dt}/
S3:    s3://{bucket}/{env_nm}/{project_nm}/{worker_id}/{model_dir}/data/{test_dt}/

📋 복사해서 붙여넣기

# ── 01_preprocess: 전처리 결과 저장 ──────────────────────────
ctx.save_data(df_train,   "X_train.parquet")
ctx.save_data(y_train.to_frame("label"), "y_train.parquet")
ctx.save_data(df_test,    "X_test.parquet")
ctx.save_data({"n_features": 42}, "schema.json")

# ── 02_feature / 03_train: 전처리 결과 읽기 ─────────────────
X_train = ctx.read_data("X_train.parquet")    # → DataFrame
y_train = ctx.read_data("y_train.parquet")["label"]
schema  = ctx.read_data("schema.json")        # → dict
확장자직렬화 방식
.parquetpandas.to_parquet / read_parquet
.csvpandas.to_csv / read_csv
.jsonjson.dumps / json.loads
.joblibjoblib.dump / joblib.load
.pklpickle.dump / pickle.load
미지원 확장자pickle fallback + run.log WARNING

Tier 2

step 간 경량 아티팩트 — save_step / read_step

모델, 인코더, 수치 파일 등 step 사이에 주고받는 경량 파일을 저장하는 공간.

  • 각 step이 자신의 폴더(steps/{step_name}/)에만 씀
  • S3에는 latest/로 관리 (이전 run 결과를 덮어씀)

저장 경로:

로컬:  project/{model_dir}/steps/{step_name}/
S3:    s3://{bucket}/{project_nm}/{worker_id}/{model_dir}/steps/{step_name}/latest/

📋 복사해서 붙여넣기

# ── 현재 step 산출물 저장 ─────────────────────────────────
ctx.save_step(scaler,  "scaler.joblib")
ctx.save_step({"train_shape": list(df.shape)}, "split_info.json")
ctx.save_step(model,   "model.pkl")   # predict 있으면 → finalize 시 output/model/ 자동 복사

# ── 이전 step 산출물 읽기 (파일명만으로 검색) ────────────────
scaler     = ctx.read_step("scaler.joblib")
split_info = ctx.read_step("split_info.json")
model      = ctx.read_step("model.pkl")

# ── 2-arg 형식 (step 명시, Path 반환) ───────────────────────
path = ctx.read_step("01_preprocess", "scaler.joblib")
데이터 종류권장 확장자
DataFrame (전처리·피처 결과).parquet
sklearn 인코더·스케일러·파이프라인.joblib
dict / metrics / 설정값.json
LightGBM / XGBoost 모델.txt (ctx.save_step(model, "model.txt"))
기타 Python 객체.pkl
⚠️ step 간 파일명은 반드시 고유해야 합니다. 동일 파일명이 두 step에 있으면 ConflictError가 발생합니다.

latest/ 동작 규칙

step 종료 시: 현재 step의 기존 latest/ 객체 전부 삭제 → 현재 결과만 유지
다음 step 시작 시: 모든 step의 latest/ 파일을 평탄하게(flat) 다운로드
첫 step 재실행 = 새 run_id 발급

Tier 3

최종 산출물 — save_output / ctx.finish

실험 한 번(run)의 기록 전체. 프레임워크가 대부분 자동 생성하고, DS는 save_output으로 추가 산출물만 채웁니다.

save_output 허용 폴더 (4개만)

# folder 인자는 반드시 아래 4개 중 하나. 그 외 즉시 ValueError
ctx.save_output(obj, "filename", folder="model")    # 모델 아티팩트
ctx.save_output(obj, "filename", folder="charts")   # 차트 이미지 / HTML
ctx.save_output(obj, "filename", folder="metrics")  # 지표 파일
ctx.save_output(obj, "filename", folder="reports")  # HTML 리포트

📋 복사해서 붙여넣기

# ── 세분화 지표 ──────────────────────────────────────────────
ctx.save_output(df_per_category, "per_category.csv",    folder="metrics")
ctx.save_output({"bias": 0.03},  "bias_by_promo.json",  folder="metrics")

# ── 추가 차트 ────────────────────────────────────────────────
ctx.save_output(fig,  "feature_importance.png", folder="charts")
ctx.save_output(html, "pred_vs_actual.html",    folder="charts")

# ── 추가 모델 아티팩트 ───────────────────────────────────────
ctx.save_output(encoder, "label_encoder.joblib", folder="model")
ctx.save_output(schema,  "feature_schema.json",  folder="model")

ctx.finish() — 실험 마무리

마지막 step 끝에 반드시 호출합니다.

📋 복사해서 붙여넣기

ctx.finish(
    metrics={"auc": 0.912, "f1": 0.873},  # 선택 — 있으면 metrics.json 자동 저장
    y_true=y_valid,                         # 선택 — 있으면 표준 차트 자동 생성
    y_pred=y_pred,
    y_prob=y_prob,                          # 선택 (분류만)
    extra={"experiment_note": "v2 적용"},   # 선택 — run_manifest.yaml에 기록
)
산출물조건저장 위치
metrics.jsonmetrics= 인자 전달 시output/metrics/
표준 차트y_true / y_pred 전달 시output/charts/
experiment_report.html항상output/reports/
MLflow 등록항상MLflow 서버
run_manifest.yaml항상output/

task_type별 자동 생성 차트

task_type자동 생성 차트
classificationConfusion Matrix, ROC Curve, Precision-Recall Curve, Pred Distribution
regressionActual vs Predicted, Residual Plot, Error Distribution
rankingScore Distribution, Cumulative Gain
clusteringCluster Distribution

전체 4-step 노트북 예시

📋 복사해서 붙여넣기

# ── step 01_preprocess ────────────────────────────────────────
ctx.save_data(df_clean, "df_clean.parquet")
ctx.save_data({"n_features": 42}, "schema.json")

# ── step 02_feature ───────────────────────────────────────────
df_clean = ctx.read_data("df_clean.parquet")

scaler   = StandardScaler().fit(df_clean)
X_scaled = scaler.transform(df_clean)

ctx.save_step(scaler,   "scaler.joblib")
ctx.save_step(X_scaled, "X_train.parquet")

# ── step 03_train ─────────────────────────────────────────────
scaler  = ctx.read_step("scaler.joblib")
X_train = ctx.read_step("X_train.parquet")

model = lgb.LGBMClassifier(**ctx.model['algorithm_params'])
model.fit(X_train, y_train)

ctx.save_step(model, "model.pkl")   # predict 있음 → output/model/ 자동 복사

# ── step 04_evaluate (마지막 step) ────────────────────────────
model  = ctx.read_step("model.pkl")
y_pred = model.predict(X_test)
y_prob = model.predict_proba(X_test)[:, 1]

# 추가 산출물
df_per_cat = compute_per_category_metrics(y_test, y_pred)
ctx.save_output(df_per_cat, "per_category.csv", folder="metrics")

fig = plot_feature_importance(model)
ctx.save_output(fig, "feature_importance.png", folder="charts")

# 실험 마무리 (자동: MLflow 등록 + 리포트 + S3 업로드)
ctx.finish(
    metrics={"auc": roc_auc_score(y_test, y_prob), "f1": f1_score(y_test, y_pred)},
    y_true=y_test,
    y_pred=y_pred,
    y_prob=y_prob,
)

ctx 메서드 전체 요약

# ── Tier 1: data ──────────────────────────────────────────────
ctx.save_data(obj, "filename.parquet")        # data/{test_dt}/ 저장
ctx.read_data("filename.parquet")             # → 역직렬화 객체 반환

# ── Tier 2: step ──────────────────────────────────────────────
ctx.save_step(obj, "filename.joblib")         # steps/{step_name}/ 저장
ctx.read_step("filename.joblib")              # steps/ 전체 검색 → 역직렬화 객체
ctx.read_step("01_preprocess", "file.joblib") # 명시적 step → Path 반환

# ── Tier 3: output ────────────────────────────────────────────
ctx.save_output(obj, "name.csv", folder="metrics")     # output/{folder}/ 저장
ctx.finish(metrics={...}, y_true=..., y_pred=...)       # 자동: MLflow + 리포트 + S3

# ── 경로 속성 (참조만, 직접 조립 금지) ─────────────────────────
ctx.data          # data/{test_dt}/
ctx.step          # steps/{step_name}/
ctx.output        # runs/{run_id}/output/
ctx.run_dir       # runs/{run_id}/
⚠️ ctx.output / "predictions" / "pred.parquet" 형태의 직접 경로 조립은 동작하지만 권장하지 않습니다. S3 sync 시 누락될 수 있습니다.