CDP 실험 관리

DynamoDB
실험 이력 & 메타데이터 저장

S3+MLflow 외에 구조화된 쿼리가 필요할 때 DynamoDB를 보완적으로 사용합니다.

DynamoDB 연동

실험 이력 구조화 저장 — S3 없이 직접 조회, 빠른 검색.

ℹ️ 샘플 노트북 안내: samples/hjsong/ddb/ 세 노트북은 boilerplate312 방식(수동 YAML 로드, 수동 run_id 생성)으로 작성되어 있습니다. 이 문서는 CDP 프레임워크 기준으로 동일한 패턴을 설명합니다.

왜 DynamoDB인가?

"S3는 파일 창고, MLflow는 실험 비교 UI, DynamoDB는 검색 가능한 실험 대장."

ctx.finish()가 S3와 MLflow를 자동으로 처리하지만, 아래 질문은 DynamoDB가 훨씬 빠릅니다.

질문MLflow로 할 때DynamoDB로 할 때
"hjsong의 최신 run 결과는?"UI 필터 or Python SDK 검색get_item 한 번
"AUC 0.87 이상 run 전체 목록"mlflow.search_runs()GSI 쿼리 한 번
"run의 차트 PNG를 직접 꺼내고 싶다"artifact 다운로드base64 decode 바로 사용
"실험 설정 스냅샷을 코드로 조회"MLflow params 하나씩METRICS item get 바로 가능
ℹ️ DynamoDB는 NoSQL입니다. 키(PK + SK)만 정해두면, 나머지 속성은 항목마다 자유롭게 추가할 수 있습니다.

CDP에서 DynamoDB의 위치

ctx.finish() 호출
   ↓ 자동 처리
   ├── S3   runs/{run_id}/output/ 전체 업로드
   └── MLflow run 등록 (metrics, params, artifacts)

   ↓ 팀이 필요한 경우 추가
   └── DynamoDB run 결과 기록 (구조화 쿼리 & 모델 직접 접근용)

DynamoDB는 ctx.finish() 이후에 팀 선택으로 추가합니다. 필수가 아니며, 구조화된 조회가 필요한 프로젝트에서 사용합니다.


DynamoDB 핵심 개념

개념설명비유
파티션 키 (PK)데이터를 나누는 기준 (필수)책장 번호
정렬 키 (SK)같은 PK 안에서 정렬/필터 (선택)책장 안 책 제목

CDP 기준 테이블 설계

테이블명:  gs-cdp-mlops-{project_nm}
PK:  experiment_id   → EXP#{worker_id}#{project_nm}#{model_dir_name}
                        RUN#{worker_id}#{project_nm}#{model_dir_name}#{run_id}
SK:  entity_type     → META | CONF | DATA#train | METRICS | MODEL#chunk_n | CHARTS | ...

실험 하나가 DDB에 저장되는 형태:

EXP#hjsong#ml_lab_titanic#titanic_survival_lightgbm  |  META       → 실험 기본 정보
EXP#hjsong#ml_lab_titanic#titanic_survival_lightgbm  |  CONF       → model.yaml 스냅샷
EXP#hjsong#ml_lab_titanic#titanic_survival_lightgbm  |  DATA#train → 학습 CSV (base64)

RUN#hjsong#ml_lab_titanic#titanic_survival_lightgbm#{run_id}  |  METRICS       → AUC, F1 등
RUN#hjsong#ml_lab_titanic#titanic_survival_lightgbm#{run_id}  |  CHARTS        → 차트 PNG (base64)
RUN#hjsong#ml_lab_titanic#titanic_survival_lightgbm#{run_id}  |  MODEL#chunk_000 → 모델 분할 저장
RUN#hjsong#ml_lab_titanic#titanic_survival_lightgbm#{run_id}  |  MANIFEST      → 실행 컨텍스트

실습 준비 — 타입 변환 헬퍼

pip install boto3
⚠️ DynamoDB는 Python float를 직접 저장하지 못합니다. 저장 전 Decimal로 변환, 로드 후 float로 역변환해야 합니다.

📋 복사해서 붙여넣기

from decimal import Decimal

def to_ddb(obj):
    """저장 전: float → Decimal 재귀 변환"""
    if isinstance(obj, float):  return Decimal(str(obj))
    elif isinstance(obj, bool): return obj
    elif isinstance(obj, dict): return {k: to_ddb(v) for k, v in obj.items()}
    elif isinstance(obj, list): return [to_ddb(v) for v in obj]
    return obj

def from_ddb(obj):
    """로드 후: Decimal → int/float 재귀 변환"""
    if isinstance(obj, Decimal):
        f = float(obj)
        return int(f) if f == int(f) else f
    elif isinstance(obj, dict): return {k: from_ddb(v) for k, v in obj.items()}
    elif isinstance(obj, list): return [from_ddb(v) for v in obj]
    return obj

실습 1 · 테이블 생성

📋 복사해서 붙여넣기

import boto3

# ── ctx에서 설정값 읽기 ──────────────────────────────────────────
REGION     = ctx.project['region']
PROJECT_NM = ctx.project['project_nm']
TABLE_NAME = f"gs-cdp-mlops-{PROJECT_NM}"

client = boto3.client('dynamodb', region_name=REGION)

try:
    client.describe_table(TableName=TABLE_NAME)
    print(f"테이블 [{TABLE_NAME}] 존재 확인")
except client.exceptions.ResourceNotFoundException:
    print(f"테이블 [{TABLE_NAME}] 생성 중...")
    client.create_table(
        TableName=TABLE_NAME,
        KeySchema=[
            {"AttributeName": "experiment_id", "KeyType": "HASH"},   # PK
            {"AttributeName": "entity_type",   "KeyType": "RANGE"},  # SK
        ],
        AttributeDefinitions=[
            {"AttributeName": "experiment_id", "AttributeType": "S"},
            {"AttributeName": "entity_type",   "AttributeType": "S"},
        ],
        BillingMode="PAY_PER_REQUEST",   # 사용한 만큼만 과금
    )
    client.get_waiter("table_exists").wait(TableName=TABLE_NAME)
    print(f"테이블 [{TABLE_NAME}] 생성 완료")
ℹ️ BillingMode="PAY_PER_REQUEST": 프로비저닝 용량을 미리 설정하지 않아도 됩니다. 개발·실습 환경에서는 이 모드를 사용합니다.

실습 2 · 실험 설정 등록 (EXP# 아이템)

ctx.finish() 전에 한 번만 실행합니다. 실험 설정(CONF)과 학습 데이터(DATA)를 DDB에 저장해두면, 나중에 어떤 노트북에서든 DDB 조회 한 번으로 재현 가능한 환경을 복원할 수 있습니다.

📋 복사해서 붙여넣기

import boto3
from datetime import datetime

REGION        = ctx.project['region']
WORKER_ID     = ctx.model['worker_id']
PROJECT_NM    = ctx.project['project_nm']
MODEL_DIR     = ctx.model_dir_name          # experiment_topic + "_" + algorithm
TABLE_NAME    = f"gs-cdp-mlops-{PROJECT_NM}"

ddb   = boto3.resource('dynamodb', region_name=REGION)
table = ddb.Table(TABLE_NAME)

# EXP PK 생성 규칙
EXP_PK = f"EXP#{WORKER_ID}#{PROJECT_NM}#{MODEL_DIR}"
now    = datetime.utcnow().isoformat()
print(f"EXP_PK: {EXP_PK}")

# ── META 저장 ──────────────────────────────────────────────────
table.put_item(Item={
    "experiment_id": EXP_PK,
    "entity_type":   "META",
    "worker_id":     WORKER_ID,
    "project_nm":    PROJECT_NM,
    "model_dir":     MODEL_DIR,
    "env_nm":        ctx.model['env_nm'],
    "task_type":     ctx.model['task_type'],
    "created_at":    now,
    "status":        "active",
})
print("[저장] META")

# ── CONF 저장 (model.yaml 스냅샷) ─────────────────────────────
table.put_item(Item=to_ddb({
    "experiment_id":    EXP_PK,
    "entity_type":      "CONF",
    "algorithm_params": ctx.model['algorithm_params'],
    "data_params":      ctx.model.get('data_params', {}),
    "features":         ctx.model.get('features', {}),
    "uploaded_at":      now,
}))
print("[저장] CONF → algorithm_params 포함")

학습 데이터를 DDB에 저장 (선택)

📋 복사해서 붙여넣기

import base64, hashlib, io

def put_dataset(table, exp_pk, split, df, now):
    csv_bytes = df.to_csv(index=False).encode('utf-8')
    csv_b64   = base64.b64encode(csv_bytes).decode('utf-8')
    checksum  = "sha256:" + hashlib.sha256(csv_bytes).hexdigest()[:16] + "..."
    table.put_item(Item={
        "experiment_id": exp_pk,
        "entity_type":   f"DATA#{split}",
        "split":         split,
        "row_count":     len(df),
        "size_bytes":    len(csv_bytes),
        "checksum":      checksum,
        "csv_b64":       csv_b64,
        "uploaded_at":   now,
    })
    print(f"[저장] DATA#{split}  {len(df)}rows  {len(csv_bytes)/1024:.1f}KB")

put_dataset(table, EXP_PK, "train",      X_train.assign(target=y_train), now)
put_dataset(table, EXP_PK, "validation", X_valid.assign(target=y_valid), now)

DDB에서 데이터 복원

📋 복사해서 붙여넣기

def load_split_from_ddb(table, exp_pk, split):
    resp      = table.get_item(Key={"experiment_id": exp_pk, "entity_type": f"DATA#{split}"})
    csv_bytes = base64.b64decode(resp["Item"]["csv_b64"])
    return pd.read_csv(io.BytesIO(csv_bytes))

train_df = load_split_from_ddb(table, EXP_PK, "train")
valid_df = load_split_from_ddb(table, EXP_PK, "validation")
print(f"train: {len(train_df)}rows, valid: {len(valid_df)}rows")

실습 3 · ctx.finish() 이후 run 결과 DDB 저장

METRICS 저장

📋 복사해서 붙여넣기

# ctx.finish() 이후 실행
RUN_PK = f"RUN#{WORKER_ID}#{PROJECT_NM}#{MODEL_DIR}#{ctx.run_id}"
print(f"RUN_PK: {RUN_PK}")

table.put_item(Item=to_ddb({
    "experiment_id":  RUN_PK,
    "entity_type":    "METRICS",
    "experiment_key": EXP_PK,
    "run_id":         ctx.run_id,
    "worker_id":      WORKER_ID,
    **valid_metrics,
    "confusion_matrix": {
        "true_negative":  int(tn), "false_positive": int(fp),
        "false_negative": int(fn), "true_positive":  int(tp),
        "specificity":    round(float(tn / (tn + fp)), 4),
        "sensitivity":    round(float(tp / (tp + fn)), 4),
    },
    "feature_importance": [
        {"feature": r["feature"], "importance": round(float(r["importance"]), 4), "rank": int(r["rank"])}
        for _, r in fi.head(10).iterrows()
    ],
    "saved_at": datetime.utcnow().isoformat(),
}))
print("[저장] METRICS")

모델 저장 — base64 청킹

⚠️ DynamoDB 아이템 최대 크기는 400KB입니다. LightGBM .pkl은 이를 초과할 수 있어 250KB 단위로 분할 저장합니다.

📋 복사해서 붙여넣기

import pickle, math

CHUNK_SIZE = 250_000   # base64 문자 단위 (~187KB raw)

pkl_bytes    = pickle.dumps(model)
b64_str      = base64.b64encode(pkl_bytes).decode("utf-8")
total_chunks = math.ceil(len(b64_str) / CHUNK_SIZE)
print(f"model.pkl: {len(pkl_bytes)/1024:.1f}KB → base64 {len(b64_str)/1024:.1f}KB → {total_chunks}개 청크")

now = datetime.utcnow().isoformat()
for i in range(total_chunks):
    chunk_data = b64_str[i * CHUNK_SIZE : (i + 1) * CHUNK_SIZE]
    table.put_item(Item={
        "experiment_id":  RUN_PK,
        "entity_type":    f"MODEL#chunk_{i:03d}",   # 3자리 0패딩 → 정렬 보장
        "experiment_key": EXP_PK,
        "chunk_index":    i,
        "total_chunks":   total_chunks,
        "algorithm":      ctx.model['algorithm'],
        "format":         "pickle_base64",
        "data":           chunk_data,
        "saved_at":       now,
    })
    print(f"  [저장] MODEL#chunk_{i:03d}  ({len(chunk_data)/1024:.1f}KB)")

모델 복원

📋 복사해서 붙여넣기

from boto3.dynamodb.conditions import Key

resp  = table.query(
    KeyConditionExpression=Key("experiment_id").eq(RUN_PK)
        & Key("entity_type").begins_with("MODEL#chunk_")
)
items = sorted(resp["Items"], key=lambda x: int(x["chunk_index"]))
model = pickle.loads(base64.b64decode("".join(i["data"] for i in items)))
print(f"모델 복원 완료: {type(model)}")

차트 저장 (메모리 → base64 → DDB)

ℹ️ 파일을 디스크에 쓰지 않고 메모리에서 바로 DDB에 저장합니다. io.BytesIO로 PNG 바이트를 캡처합니다.

📋 복사해서 붙여넣기

import io

def fig_to_bytes(fig):
    """matplotlib Figure → PNG bytes"""
    buf = io.BytesIO()
    fig.savefig(buf, format="png", dpi=150, bbox_inches="tight")
    plt.close(fig)
    return buf.getvalue()

charts_bytes = {}

fig, ax = plt.subplots(figsize=(10, 8))
sns.barplot(data=fi.head(10), x="importance", y="feature", palette="Blues_d", ax=ax)
ax.set_title("Feature Importance (Top 10)"); plt.tight_layout()
charts_bytes["feature_importance"] = fig_to_bytes(fig)

# roc_curve, confusion_matrix, learning_curve 동일 패턴으로 추가

# 차트 4종 → CHARTS item (base64 map으로 한 아이템에 묶음)
table.put_item(Item={
    "experiment_id":  RUN_PK,
    "entity_type":    "CHARTS",
    "experiment_key": EXP_PK,
    "charts": {name: base64.b64encode(bts).decode("utf-8")
               for name, bts in charts_bytes.items()},
    "saved_at": datetime.utcnow().isoformat(),
})
print(f"[저장] CHARTS → {list(charts_bytes.keys())}")

MANIFEST 저장

📋 복사해서 붙여넣기

table.put_item(Item=to_ddb({
    "experiment_id":  RUN_PK,
    "entity_type":    "MANIFEST",
    "experiment_key": EXP_PK,
    "run_id":         ctx.run_id,
    "worker_id":      WORKER_ID,
    "project_nm":     PROJECT_NM,
    "model_dir":      MODEL_DIR,
    "env_nm":         ctx.model["env_nm"],
    "algorithm":      ctx.model["algorithm"],
    "status":         "completed",
    "storage": {
        "s3_run_path": str(ctx.run_dir),
        "ddb_table":   TABLE_NAME,
        "ddb_run_pk":  RUN_PK,
    },
    "summary": {
        "valid_auc":  valid_metrics["auc_roc"],
        "valid_f1":   valid_metrics["f1_score"],
        "n_features": len(feature_cols),
    },
    "saved_at": datetime.utcnow().isoformat(),
}))
print("[저장] MANIFEST")

실습 4 · DDB 조회

run에 저장된 모든 entity 목록

📋 복사해서 붙여넣기

from boto3.dynamodb.conditions import Key

resp = table.query(
    KeyConditionExpression=Key("experiment_id").eq(RUN_PK),
    ProjectionExpression="entity_type",
)
print(f"RUN_PK: {RUN_PK}")
print("=" * 70)
for item in sorted(resp["Items"], key=lambda x: x["entity_type"]):
    print(f"  entity_type={item['entity_type']}")
print(f"총 {len(resp['Items'])}개 아이템")

예상 결과:

RUN_PK: RUN#hjsong#ml_lab_titanic#titanic_survival_lightgbm#20260526_143012_a3f9bc
======================================================================
  entity_type=CHARTS
  entity_type=MANIFEST
  entity_type=METRICS
  entity_type=MODEL#chunk_000
  entity_type=MODEL#chunk_001
  entity_type=REPORT
======================================================================
총 6개 아이템

특정 run의 메트릭 조회

📋 복사해서 붙여넣기

resp    = table.get_item(Key={"experiment_id": RUN_PK, "entity_type": "METRICS"})
metrics = from_ddb(resp["Item"])

print(f"AUC-ROC:  {metrics['auc_roc']}")
print(f"F1 Score: {metrics['f1_score']}")
print(f"Accuracy: {metrics['accuracy']}")

차트 PNG 꺼내기

📋 복사해서 붙여넣기

from IPython.display import Image, display

resp   = table.get_item(Key={"experiment_id": RUN_PK, "entity_type": "CHARTS"})
charts = resp["Item"]["charts"]

# 원하는 차트: feature_importance / roc_curve / confusion_matrix / learning_curve
png_bytes = base64.b64decode(charts["roc_curve"])
display(Image(data=png_bytes))

MLflow 시스템 메트릭 자동 수집

ℹ️ MLflow >= 2.8에서는 enable_system_metrics_logging()을 호출하면 학습 중 CPU / 메모리 / 디스크를 iteration별로 자동 기록합니다.

📋 복사해서 붙여넣기

import mlflow

mlflow.set_tracking_uri(ctx.project['environments'][ctx.model['env_nm']]['mlflow_url'])

mlflow.enable_system_metrics_logging()   # CPU / 메모리 / 디스크 iteration별 자동 기록

with mlflow.start_run(run_name=ctx.run_name):
    mlflow.log_params(ctx.model['algorithm_params'])
    mlflow.log_metrics({f"valid_{k}": v for k, v in valid_metrics.items() if k != 'samples'})
    mlflow.lightgbm.log_model(model, artifact_path="model")
    mlflow.set_tags({
        "worker_id":  ctx.model['worker_id'],
        "env_nm":     ctx.model['env_nm'],
        "ddb_run_pk": RUN_PK,           # DDB PK를 MLflow 태그에도 기록
    })
메트릭 이름내용
system/cpu_utilization_percentage전체 CPU 사용률 (%)
system/memory_usage_megabytes메모리 사용량 (MB)
system/available_memory_megabytes여유 메모리 (MB)
system/disk_available_megabytes여유 디스크 (MB)
system/disk_usage_megabytes디스크 사용량 (MB)

결과 확인 체크리스트

확인 항목
gs-cdp-mlops-{project_nm} 테이블이 생성됐는가?
EXP_PK 아래 META / CONF 아이템이 있는가?
RUN_PK 아래 METRICS 아이템에 AUC 값이 있는가?
MODEL#chunk_000 이상의 아이템이 있는가?
CHARTS 아이템에서 PNG를 꺼내 Jupyter에서 시각화했는가?
MLflow UI에서 system/cpu_utilization_percentage 차트가 보이는가?
MLflow 태그에 ddb_run_pk가 기록됐는가?