CDP 실험 관리
DynamoDB
실험 이력 & 메타데이터 저장
S3+MLflow 외에 구조화된 쿼리가 필요할 때 DynamoDB를 보완적으로 사용합니다.
DynamoDB 연동
실험 이력 구조화 저장 — S3 없이 직접 조회, 빠른 검색.
ℹ️ 샘플 노트북 안내:
samples/hjsong/ddb/ 세 노트북은 boilerplate312 방식(수동 YAML 로드, 수동 run_id 생성)으로 작성되어 있습니다. 이 문서는 CDP 프레임워크 기준으로 동일한 패턴을 설명합니다.왜 DynamoDB인가?
"S3는 파일 창고, MLflow는 실험 비교 UI, DynamoDB는 검색 가능한 실험 대장."
ctx.finish()가 S3와 MLflow를 자동으로 처리하지만, 아래 질문은 DynamoDB가 훨씬 빠릅니다.
| 질문 | MLflow로 할 때 | DynamoDB로 할 때 |
|---|---|---|
| "hjsong의 최신 run 결과는?" | UI 필터 or Python SDK 검색 | get_item 한 번 |
| "AUC 0.87 이상 run 전체 목록" | mlflow.search_runs() | GSI 쿼리 한 번 |
| "run의 차트 PNG를 직접 꺼내고 싶다" | artifact 다운로드 | base64 decode 바로 사용 |
| "실험 설정 스냅샷을 코드로 조회" | MLflow params 하나씩 | METRICS item get 바로 가능 |
ℹ️ DynamoDB는 NoSQL입니다. 키(PK + SK)만 정해두면, 나머지 속성은 항목마다 자유롭게 추가할 수 있습니다.
CDP에서 DynamoDB의 위치
ctx.finish() 호출
↓ 자동 처리
├── S3 runs/{run_id}/output/ 전체 업로드
└── MLflow run 등록 (metrics, params, artifacts)
↓ 팀이 필요한 경우 추가
└── DynamoDB run 결과 기록 (구조화 쿼리 & 모델 직접 접근용)
DynamoDB는 ctx.finish() 이후에 팀 선택으로 추가합니다. 필수가 아니며, 구조화된 조회가 필요한 프로젝트에서 사용합니다.
DynamoDB 핵심 개념
| 개념 | 설명 | 비유 |
|---|---|---|
| 파티션 키 (PK) | 데이터를 나누는 기준 (필수) | 책장 번호 |
| 정렬 키 (SK) | 같은 PK 안에서 정렬/필터 (선택) | 책장 안 책 제목 |
CDP 기준 테이블 설계
테이블명: gs-cdp-mlops-{project_nm}
PK: experiment_id → EXP#{worker_id}#{project_nm}#{model_dir_name}
RUN#{worker_id}#{project_nm}#{model_dir_name}#{run_id}
SK: entity_type → META | CONF | DATA#train | METRICS | MODEL#chunk_n | CHARTS | ...
실험 하나가 DDB에 저장되는 형태:
EXP#hjsong#ml_lab_titanic#titanic_survival_lightgbm | META → 실험 기본 정보
EXP#hjsong#ml_lab_titanic#titanic_survival_lightgbm | CONF → model.yaml 스냅샷
EXP#hjsong#ml_lab_titanic#titanic_survival_lightgbm | DATA#train → 학습 CSV (base64)
RUN#hjsong#ml_lab_titanic#titanic_survival_lightgbm#{run_id} | METRICS → AUC, F1 등
RUN#hjsong#ml_lab_titanic#titanic_survival_lightgbm#{run_id} | CHARTS → 차트 PNG (base64)
RUN#hjsong#ml_lab_titanic#titanic_survival_lightgbm#{run_id} | MODEL#chunk_000 → 모델 분할 저장
RUN#hjsong#ml_lab_titanic#titanic_survival_lightgbm#{run_id} | MANIFEST → 실행 컨텍스트
실습 준비 — 타입 변환 헬퍼
pip install boto3
⚠️ DynamoDB는 Python
float를 직접 저장하지 못합니다. 저장 전 Decimal로 변환, 로드 후 float로 역변환해야 합니다.📋 복사해서 붙여넣기
from decimal import Decimal
def to_ddb(obj):
"""저장 전: float → Decimal 재귀 변환"""
if isinstance(obj, float): return Decimal(str(obj))
elif isinstance(obj, bool): return obj
elif isinstance(obj, dict): return {k: to_ddb(v) for k, v in obj.items()}
elif isinstance(obj, list): return [to_ddb(v) for v in obj]
return obj
def from_ddb(obj):
"""로드 후: Decimal → int/float 재귀 변환"""
if isinstance(obj, Decimal):
f = float(obj)
return int(f) if f == int(f) else f
elif isinstance(obj, dict): return {k: from_ddb(v) for k, v in obj.items()}
elif isinstance(obj, list): return [from_ddb(v) for v in obj]
return obj
실습 1 · 테이블 생성
📋 복사해서 붙여넣기
import boto3
# ── ctx에서 설정값 읽기 ──────────────────────────────────────────
REGION = ctx.project['region']
PROJECT_NM = ctx.project['project_nm']
TABLE_NAME = f"gs-cdp-mlops-{PROJECT_NM}"
client = boto3.client('dynamodb', region_name=REGION)
try:
client.describe_table(TableName=TABLE_NAME)
print(f"테이블 [{TABLE_NAME}] 존재 확인")
except client.exceptions.ResourceNotFoundException:
print(f"테이블 [{TABLE_NAME}] 생성 중...")
client.create_table(
TableName=TABLE_NAME,
KeySchema=[
{"AttributeName": "experiment_id", "KeyType": "HASH"}, # PK
{"AttributeName": "entity_type", "KeyType": "RANGE"}, # SK
],
AttributeDefinitions=[
{"AttributeName": "experiment_id", "AttributeType": "S"},
{"AttributeName": "entity_type", "AttributeType": "S"},
],
BillingMode="PAY_PER_REQUEST", # 사용한 만큼만 과금
)
client.get_waiter("table_exists").wait(TableName=TABLE_NAME)
print(f"테이블 [{TABLE_NAME}] 생성 완료")
ℹ️
BillingMode="PAY_PER_REQUEST": 프로비저닝 용량을 미리 설정하지 않아도 됩니다. 개발·실습 환경에서는 이 모드를 사용합니다.실습 2 · 실험 설정 등록 (EXP# 아이템)
ctx.finish() 전에 한 번만 실행합니다. 실험 설정(CONF)과 학습 데이터(DATA)를 DDB에 저장해두면, 나중에 어떤 노트북에서든 DDB 조회 한 번으로 재현 가능한 환경을 복원할 수 있습니다.
📋 복사해서 붙여넣기
import boto3
from datetime import datetime
REGION = ctx.project['region']
WORKER_ID = ctx.model['worker_id']
PROJECT_NM = ctx.project['project_nm']
MODEL_DIR = ctx.model_dir_name # experiment_topic + "_" + algorithm
TABLE_NAME = f"gs-cdp-mlops-{PROJECT_NM}"
ddb = boto3.resource('dynamodb', region_name=REGION)
table = ddb.Table(TABLE_NAME)
# EXP PK 생성 규칙
EXP_PK = f"EXP#{WORKER_ID}#{PROJECT_NM}#{MODEL_DIR}"
now = datetime.utcnow().isoformat()
print(f"EXP_PK: {EXP_PK}")
# ── META 저장 ──────────────────────────────────────────────────
table.put_item(Item={
"experiment_id": EXP_PK,
"entity_type": "META",
"worker_id": WORKER_ID,
"project_nm": PROJECT_NM,
"model_dir": MODEL_DIR,
"env_nm": ctx.model['env_nm'],
"task_type": ctx.model['task_type'],
"created_at": now,
"status": "active",
})
print("[저장] META")
# ── CONF 저장 (model.yaml 스냅샷) ─────────────────────────────
table.put_item(Item=to_ddb({
"experiment_id": EXP_PK,
"entity_type": "CONF",
"algorithm_params": ctx.model['algorithm_params'],
"data_params": ctx.model.get('data_params', {}),
"features": ctx.model.get('features', {}),
"uploaded_at": now,
}))
print("[저장] CONF → algorithm_params 포함")
학습 데이터를 DDB에 저장 (선택)
📋 복사해서 붙여넣기
import base64, hashlib, io
def put_dataset(table, exp_pk, split, df, now):
csv_bytes = df.to_csv(index=False).encode('utf-8')
csv_b64 = base64.b64encode(csv_bytes).decode('utf-8')
checksum = "sha256:" + hashlib.sha256(csv_bytes).hexdigest()[:16] + "..."
table.put_item(Item={
"experiment_id": exp_pk,
"entity_type": f"DATA#{split}",
"split": split,
"row_count": len(df),
"size_bytes": len(csv_bytes),
"checksum": checksum,
"csv_b64": csv_b64,
"uploaded_at": now,
})
print(f"[저장] DATA#{split} {len(df)}rows {len(csv_bytes)/1024:.1f}KB")
put_dataset(table, EXP_PK, "train", X_train.assign(target=y_train), now)
put_dataset(table, EXP_PK, "validation", X_valid.assign(target=y_valid), now)
DDB에서 데이터 복원
📋 복사해서 붙여넣기
def load_split_from_ddb(table, exp_pk, split):
resp = table.get_item(Key={"experiment_id": exp_pk, "entity_type": f"DATA#{split}"})
csv_bytes = base64.b64decode(resp["Item"]["csv_b64"])
return pd.read_csv(io.BytesIO(csv_bytes))
train_df = load_split_from_ddb(table, EXP_PK, "train")
valid_df = load_split_from_ddb(table, EXP_PK, "validation")
print(f"train: {len(train_df)}rows, valid: {len(valid_df)}rows")
실습 3 · ctx.finish() 이후 run 결과 DDB 저장
METRICS 저장
📋 복사해서 붙여넣기
# ctx.finish() 이후 실행
RUN_PK = f"RUN#{WORKER_ID}#{PROJECT_NM}#{MODEL_DIR}#{ctx.run_id}"
print(f"RUN_PK: {RUN_PK}")
table.put_item(Item=to_ddb({
"experiment_id": RUN_PK,
"entity_type": "METRICS",
"experiment_key": EXP_PK,
"run_id": ctx.run_id,
"worker_id": WORKER_ID,
**valid_metrics,
"confusion_matrix": {
"true_negative": int(tn), "false_positive": int(fp),
"false_negative": int(fn), "true_positive": int(tp),
"specificity": round(float(tn / (tn + fp)), 4),
"sensitivity": round(float(tp / (tp + fn)), 4),
},
"feature_importance": [
{"feature": r["feature"], "importance": round(float(r["importance"]), 4), "rank": int(r["rank"])}
for _, r in fi.head(10).iterrows()
],
"saved_at": datetime.utcnow().isoformat(),
}))
print("[저장] METRICS")
모델 저장 — base64 청킹
⚠️ DynamoDB 아이템 최대 크기는 400KB입니다. LightGBM
.pkl은 이를 초과할 수 있어 250KB 단위로 분할 저장합니다.📋 복사해서 붙여넣기
import pickle, math
CHUNK_SIZE = 250_000 # base64 문자 단위 (~187KB raw)
pkl_bytes = pickle.dumps(model)
b64_str = base64.b64encode(pkl_bytes).decode("utf-8")
total_chunks = math.ceil(len(b64_str) / CHUNK_SIZE)
print(f"model.pkl: {len(pkl_bytes)/1024:.1f}KB → base64 {len(b64_str)/1024:.1f}KB → {total_chunks}개 청크")
now = datetime.utcnow().isoformat()
for i in range(total_chunks):
chunk_data = b64_str[i * CHUNK_SIZE : (i + 1) * CHUNK_SIZE]
table.put_item(Item={
"experiment_id": RUN_PK,
"entity_type": f"MODEL#chunk_{i:03d}", # 3자리 0패딩 → 정렬 보장
"experiment_key": EXP_PK,
"chunk_index": i,
"total_chunks": total_chunks,
"algorithm": ctx.model['algorithm'],
"format": "pickle_base64",
"data": chunk_data,
"saved_at": now,
})
print(f" [저장] MODEL#chunk_{i:03d} ({len(chunk_data)/1024:.1f}KB)")
모델 복원
📋 복사해서 붙여넣기
from boto3.dynamodb.conditions import Key
resp = table.query(
KeyConditionExpression=Key("experiment_id").eq(RUN_PK)
& Key("entity_type").begins_with("MODEL#chunk_")
)
items = sorted(resp["Items"], key=lambda x: int(x["chunk_index"]))
model = pickle.loads(base64.b64decode("".join(i["data"] for i in items)))
print(f"모델 복원 완료: {type(model)}")
차트 저장 (메모리 → base64 → DDB)
ℹ️ 파일을 디스크에 쓰지 않고 메모리에서 바로 DDB에 저장합니다.
io.BytesIO로 PNG 바이트를 캡처합니다.📋 복사해서 붙여넣기
import io
def fig_to_bytes(fig):
"""matplotlib Figure → PNG bytes"""
buf = io.BytesIO()
fig.savefig(buf, format="png", dpi=150, bbox_inches="tight")
plt.close(fig)
return buf.getvalue()
charts_bytes = {}
fig, ax = plt.subplots(figsize=(10, 8))
sns.barplot(data=fi.head(10), x="importance", y="feature", palette="Blues_d", ax=ax)
ax.set_title("Feature Importance (Top 10)"); plt.tight_layout()
charts_bytes["feature_importance"] = fig_to_bytes(fig)
# roc_curve, confusion_matrix, learning_curve 동일 패턴으로 추가
# 차트 4종 → CHARTS item (base64 map으로 한 아이템에 묶음)
table.put_item(Item={
"experiment_id": RUN_PK,
"entity_type": "CHARTS",
"experiment_key": EXP_PK,
"charts": {name: base64.b64encode(bts).decode("utf-8")
for name, bts in charts_bytes.items()},
"saved_at": datetime.utcnow().isoformat(),
})
print(f"[저장] CHARTS → {list(charts_bytes.keys())}")
MANIFEST 저장
📋 복사해서 붙여넣기
table.put_item(Item=to_ddb({
"experiment_id": RUN_PK,
"entity_type": "MANIFEST",
"experiment_key": EXP_PK,
"run_id": ctx.run_id,
"worker_id": WORKER_ID,
"project_nm": PROJECT_NM,
"model_dir": MODEL_DIR,
"env_nm": ctx.model["env_nm"],
"algorithm": ctx.model["algorithm"],
"status": "completed",
"storage": {
"s3_run_path": str(ctx.run_dir),
"ddb_table": TABLE_NAME,
"ddb_run_pk": RUN_PK,
},
"summary": {
"valid_auc": valid_metrics["auc_roc"],
"valid_f1": valid_metrics["f1_score"],
"n_features": len(feature_cols),
},
"saved_at": datetime.utcnow().isoformat(),
}))
print("[저장] MANIFEST")
실습 4 · DDB 조회
run에 저장된 모든 entity 목록
📋 복사해서 붙여넣기
from boto3.dynamodb.conditions import Key
resp = table.query(
KeyConditionExpression=Key("experiment_id").eq(RUN_PK),
ProjectionExpression="entity_type",
)
print(f"RUN_PK: {RUN_PK}")
print("=" * 70)
for item in sorted(resp["Items"], key=lambda x: x["entity_type"]):
print(f" entity_type={item['entity_type']}")
print(f"총 {len(resp['Items'])}개 아이템")
예상 결과:
RUN_PK: RUN#hjsong#ml_lab_titanic#titanic_survival_lightgbm#20260526_143012_a3f9bc
======================================================================
entity_type=CHARTS
entity_type=MANIFEST
entity_type=METRICS
entity_type=MODEL#chunk_000
entity_type=MODEL#chunk_001
entity_type=REPORT
======================================================================
총 6개 아이템
특정 run의 메트릭 조회
📋 복사해서 붙여넣기
resp = table.get_item(Key={"experiment_id": RUN_PK, "entity_type": "METRICS"})
metrics = from_ddb(resp["Item"])
print(f"AUC-ROC: {metrics['auc_roc']}")
print(f"F1 Score: {metrics['f1_score']}")
print(f"Accuracy: {metrics['accuracy']}")
차트 PNG 꺼내기
📋 복사해서 붙여넣기
from IPython.display import Image, display
resp = table.get_item(Key={"experiment_id": RUN_PK, "entity_type": "CHARTS"})
charts = resp["Item"]["charts"]
# 원하는 차트: feature_importance / roc_curve / confusion_matrix / learning_curve
png_bytes = base64.b64decode(charts["roc_curve"])
display(Image(data=png_bytes))
MLflow 시스템 메트릭 자동 수집
ℹ️ MLflow >= 2.8에서는
enable_system_metrics_logging()을 호출하면 학습 중 CPU / 메모리 / 디스크를 iteration별로 자동 기록합니다.📋 복사해서 붙여넣기
import mlflow
mlflow.set_tracking_uri(ctx.project['environments'][ctx.model['env_nm']]['mlflow_url'])
mlflow.enable_system_metrics_logging() # CPU / 메모리 / 디스크 iteration별 자동 기록
with mlflow.start_run(run_name=ctx.run_name):
mlflow.log_params(ctx.model['algorithm_params'])
mlflow.log_metrics({f"valid_{k}": v for k, v in valid_metrics.items() if k != 'samples'})
mlflow.lightgbm.log_model(model, artifact_path="model")
mlflow.set_tags({
"worker_id": ctx.model['worker_id'],
"env_nm": ctx.model['env_nm'],
"ddb_run_pk": RUN_PK, # DDB PK를 MLflow 태그에도 기록
})
| 메트릭 이름 | 내용 |
|---|---|
system/cpu_utilization_percentage | 전체 CPU 사용률 (%) |
system/memory_usage_megabytes | 메모리 사용량 (MB) |
system/available_memory_megabytes | 여유 메모리 (MB) |
system/disk_available_megabytes | 여유 디스크 (MB) |
system/disk_usage_megabytes | 디스크 사용량 (MB) |
결과 확인 체크리스트
| ☐ | 확인 항목 |
|---|---|
| ☐ | gs-cdp-mlops-{project_nm} 테이블이 생성됐는가? |
| ☐ | EXP_PK 아래 META / CONF 아이템이 있는가? |
| ☐ | RUN_PK 아래 METRICS 아이템에 AUC 값이 있는가? |
| ☐ | MODEL#chunk_000 이상의 아이템이 있는가? |
| ☐ | CHARTS 아이템에서 PNG를 꺼내 Jupyter에서 시각화했는가? |
| ☐ | MLflow UI에서 system/cpu_utilization_percentage 차트가 보이는가? |
| ☐ | MLflow 태그에 ddb_run_pk가 기록됐는가? |