CDP 실험 관리
Lambda + EventBridge
서버리스 파이프라인
수동으로 실행하던 Training Job을 자동으로 트리거합니다.
서버리스 파이프라인
이벤트가 발생하면 Lambda가 Training Job을 자동 실행합니다.
"코드만 올려두면 AWS가 알아서 서버를 띄우고, 실행하고, 끕니다."
| 수동 실행 | 서버리스 파이프라인 |
|---|---|
| 사람이 매일 아침 직접 실행 | 스케줄에 맞춰 자동 실행 |
| 노트북을 켜놔야 함 | 서버 불필요 |
| 실수로 빠뜨릴 수 있음 | 놓치지 않고 반드시 실행 |
| 로그 추적 어려움 | CloudWatch에 자동 기록 |
전체 파이프라인 흐름:
Trigger
EventBridge
스케줄 / 이벤트
→
Execute
Lambda 함수
Python 핸들러
→
Train
SageMaker
Training Job
→
Store
S3 + DDB
결과 저장
ℹ️ 실행 환경 표기: 📓 = SageMaker 노트북 셀 | 💻 = 맥북 터미널 | 🌐 = AWS 콘솔
Part 1 · Lambda 기초 — Zip 배포
Lambda가 동작하는 원리
| 항목 | EC2 (일반 서버) | Lambda (서버리스) |
|---|---|---|
| 서버 관리 | 직접 띄우고 관리 | AWS가 알아서 |
| 비용 | 24시간 과금 | 실행된 시간만 과금 |
| 스케일링 | 직접 설정 | 자동 (동시 1000개도 가능) |
| 상태 | 서버에 파일/메모리 유지 | 실행 끝나면 사라짐 |
lambda_handler 기본 구조
import json
from datetime import datetime
def lambda_handler(event, context):
# event: 호출자가 보낸 데이터 (dict)
# context: 실행 환경 정보 (남은 시간, 함수 이름 등)
name = event.get('name', 'World')
result = {
'message': f'Hello, {name}!',
'timestamp': datetime.now().isoformat(),
'received_event': event
}
return {
'statusCode': 200,
'body': json.dumps(result, ensure_ascii=False)
}
ℹ️ event 구조는 호출자에 따라 달라집니다. EventBridge →
{"source": "...", "detail": {...}}, S3 → {"Records": [...]}. Lambda 코드를 짤 때 "누가 호출할 건지"를 먼저 정하세요.Step 1: Lambda용 IAM Role 만들기
📓 노트북에서 실행 · 📋 복사해서 붙여넣기
import boto3, json, time
iam = boto3.client('iam', region_name='us-east-2')
trust_policy = {
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {"Service": "lambda.amazonaws.com"},
"Action": "sts:AssumeRole"
}]
}
role_name = "lambda-basic-execution-role"
try:
response = iam.create_role(
RoleName=role_name,
AssumeRolePolicyDocument=json.dumps(trust_policy),
)
role_arn = response['Role']['Arn']
print(f"Role 생성 완료: {role_arn}")
except iam.exceptions.EntityAlreadyExistsException:
role_arn = iam.get_role(RoleName=role_name)['Role']['Arn']
print(f"이미 존재하는 Role 사용: {role_arn}")
iam.attach_role_policy(
RoleName=role_name,
PolicyArn="arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
)
print("권한 추가 완료")
print("대기 중 (10초)...")
time.sleep(10)
ℹ️ 두 가지 Role을 헷갈리지 마세요:
· SageMaker Role → "노트북이 Lambda를 만들 수 있는" 권한
· Lambda Role → "Lambda가 실행될 때 CloudWatch에 로그를 쓸 수 있는" 권한
· SageMaker Role → "노트북이 Lambda를 만들 수 있는" 권한
· Lambda Role → "Lambda가 실행될 때 CloudWatch에 로그를 쓸 수 있는" 권한
Step 2: Lambda 함수 등록
📓 노트북에서 실행 · 📋 복사해서 붙여넣기
import boto3, zipfile, io, json
lambda_client = boto3.client('lambda', region_name='us-east-2')
lambda_code = """
import json
from datetime import datetime
def lambda_handler(event, context):
name = event.get('name', 'World')
return {
'statusCode': 200,
'body': json.dumps({
'message': f'Hello, {name}!',
'timestamp': datetime.now().isoformat()
}, ensure_ascii=False)
}
"""
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
zf.writestr('lambda_function.py', lambda_code)
zip_buffer.seek(0)
function_name = "my-first-lambda"
try:
response = lambda_client.create_function(
FunctionName=function_name,
Runtime='python3.12',
Role=role_arn,
Handler='lambda_function.lambda_handler',
Code={'ZipFile': zip_buffer.read()},
Timeout=30,
MemorySize=128,
)
print(f"Lambda 생성 완료: {response['FunctionArn']}")
except lambda_client.exceptions.ResourceConflictException:
print("이미 존재합니다.")
Step 3: 함수 테스트
📋 복사해서 붙여넣기
import json
response = lambda_client.invoke(
FunctionName=function_name,
InvocationType='RequestResponse', # 동기 호출
Payload=json.dumps({"name": "Lambda 초보자"})
)
result = json.loads(response['Payload'].read())
print(json.dumps(result, indent=2, ensure_ascii=False))
Part 2 · 컨테이너 이미지로 Lambda 배포
| 비교 | Zip 배포 | 컨테이너 이미지 |
|---|---|---|
| 패키지 크기 | 최대 250MB | 최대 10GB |
| ML 라이브러리 | Layer로 추가 (번거로움) | Dockerfile에서 자유롭게 |
| 로컬 테스트 | 어려움 | Docker로 동일 환경 테스트 |
ℹ️ ML 모델 파일이 크거나 pandas/scikit-learn 같은 무거운 라이브러리가 필요하면 컨테이너 이미지 방식을 쓰세요.
이미지 빌드 + ECR 등록 + Lambda 배포
📋 복사해서 붙여넣기
%%bash
ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
REGION="us-east-2"
REPO_NAME="my-lambda-container"
ECR_URI="${ACCOUNT_ID}.dkr.ecr.${REGION}.amazonaws.com"
# 이미지 빌드
cd ~/lambda-container
docker build -t ${REPO_NAME}:latest .
# ECR 로그인 + 푸시
aws ecr get-login-password --region ${REGION} | \
docker login --username AWS --password-stdin ${ECR_URI}
docker tag ${REPO_NAME}:latest ${ECR_URI}/${REPO_NAME}:latest
docker push ${ECR_URI}/${REPO_NAME}:latest
echo "Push 완료: ${ECR_URI}/${REPO_NAME}:latest"
# 컨테이너 이미지로 Lambda 등록
account_id = boto3.client('sts', region_name='us-east-2').get_caller_identity()['Account']
image_uri = f"{account_id}.dkr.ecr.us-east-2.amazonaws.com/my-lambda-container:latest"
lambda_client.create_function(
FunctionName="my-container-lambda",
Role=role_arn,
Code={'ImageUri': image_uri},
PackageType='Image', # ← 컨테이너 이미지
Timeout=60,
MemorySize=256,
)
print("컨테이너 Lambda 등록 완료")
Part 3 · Lambda에서 SageMaker Training Job 호출
estimator.fit() vs boto3 — 무엇이 다른가?
| 항목 | estimator.fit() | boto3.create_training_job() |
|---|---|---|
| 패키지 | sagemaker (무거움, ~50MB) | boto3 (Lambda 기본 내장) |
| 사용 환경 | 노트북, 로컬 스크립트 | Lambda, CI/CD, 어디서든 |
| 파라미터 방식 | Python 객체 (Estimator, TrainingInput) | JSON 딕셔너리 |
| 기능 차이 | 없음 — 내부적으로 동일한 SageMaker API 호출 | |
ℹ️
estimator.fit()은 내부에서 boto3의 create_training_job을 래핑해 호출합니다. 두 방법의 결과는 완전히 동일합니다.estimator.fit() 코드 (노트북 방식 — 참고)
# 📓 노트북에서 실행하는 방식
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput
import sagemaker
REGION = "ap-northeast-2"
ACCOUNT_ID = boto3.client("sts").get_caller_identity()["Account"]
BUCKET = f"gs-cdp-mllab-{ctx.model['env_nm']}"
IMAGE_URI = f"{ACCOUNT_ID}.dkr.ecr.{REGION}.amazonaws.com/gs-automl-base-containers/boilerplate312:latest"
ROLE_ARN = sagemaker.get_execution_role()
estimator = Estimator(
image_uri=IMAGE_URI,
role=ROLE_ARN,
instance_count=1,
instance_type="ml.m5.xlarge",
volume_size=30,
max_run=3600,
output_path=f"s3://{BUCKET}/sm-output/",
hyperparameters={
"run_id": ctx.run_id,
"env_nm": ctx.model["env_nm"],
"worker_id": ctx.model["worker_id"],
},
environment={"AWS_DEFAULT_REGION": REGION},
)
estimator.fit(
inputs={
"train": TrainingInput(
s3_data=f"s3://{BUCKET}/{ctx.model['env_nm']}/{ctx.project['project_nm']}/data/",
content_type="text/csv",
)
},
job_name=f"cdp-train-{ctx.run_id}",
wait=False,
)
boto3로 동일하게 호출 (Lambda 방식)
위 estimator.fit() 호출과 1:1로 대응됩니다. Lambda 함수 안에서 바로 사용할 수 있습니다.
📋 복사해서 붙여넣기
import boto3, json
from datetime import datetime
def lambda_handler(event, context):
"""
EventBridge 또는 직접 호출로 SageMaker Training Job을 시작합니다.
event 예시:
{"run_id": "20260526_143012_a3f9bc", "env_nm": "dev", "worker_id": "hjsong"}
"""
run_id = event.get("run_id", datetime.now().strftime("%Y%m%d_%H%M%S"))
env_nm = event.get("env_nm", "dev")
worker_id = event.get("worker_id", "unknown")
REGION = "ap-northeast-2"
ACCOUNT_ID = boto3.client("sts").get_caller_identity()["Account"]
BUCKET = f"gs-cdp-mllab-{env_nm}"
IMAGE_URI = f"{ACCOUNT_ID}.dkr.ecr.{REGION}.amazonaws.com/gs-automl-base-containers/boilerplate312:latest"
ROLE_ARN = f"arn:aws:iam::{ACCOUNT_ID}:role/gs-cdp-sagemaker-role"
sm = boto3.client("sagemaker", region_name=REGION)
response = sm.create_training_job(
TrainingJobName=f"cdp-train-{run_id}",
# estimator(image_uri=...) 에 해당
AlgorithmSpecification={
"TrainingImage": IMAGE_URI,
"TrainingInputMode": "File",
},
# estimator(role=...) 에 해당
RoleArn=ROLE_ARN,
# estimator.fit(inputs={...}) 에 해당
InputDataConfig=[{
"ChannelName": "train",
"DataSource": {
"S3DataSource": {
"S3DataType": "S3Prefix",
"S3Uri": f"s3://{BUCKET}/{env_nm}/data/",
"S3DataDistributionType": "FullyReplicated",
}
},
"ContentType": "text/csv",
"InputMode": "File",
}],
# estimator(output_path=...) 에 해당
OutputDataConfig={
"S3OutputPath": f"s3://{BUCKET}/sm-output/",
},
# estimator(instance_type=..., instance_count=..., volume_size=...) 에 해당
ResourceConfig={
"InstanceType": "ml.m5.xlarge",
"InstanceCount": 1,
"VolumeSizeInGB": 30,
},
# estimator(max_run=...) 에 해당
StoppingCondition={
"MaxRuntimeInSeconds": 3600,
},
# estimator(hyperparameters={...}) 에 해당
# ⚠️ 값은 반드시 문자열이어야 합니다
HyperParameters={
"run_id": run_id,
"env_nm": env_nm,
"worker_id": worker_id,
},
# estimator(environment={...}) 에 해당
Environment={
"AWS_DEFAULT_REGION": REGION,
},
)
job_arn = response["TrainingJobArn"]
print(f"Training Job 시작: {job_arn}")
return {
"statusCode": 200,
"body": json.dumps({
"training_job_name": f"cdp-train-{run_id}",
"training_job_arn": job_arn,
"run_id": run_id,
})
}
파라미터 대응표
| Estimator / fit() 파라미터 | create_training_job JSON 키 | 비고 |
|---|---|---|
image_uri | AlgorithmSpecification.TrainingImage | |
role | RoleArn | |
instance_type | ResourceConfig.InstanceType | |
instance_count | ResourceConfig.InstanceCount | |
volume_size | ResourceConfig.VolumeSizeInGB | |
max_run | StoppingCondition.MaxRuntimeInSeconds | |
output_path | OutputDataConfig.S3OutputPath | |
hyperparameters | HyperParameters | 값이 반드시 str |
environment | Environment | |
fit(inputs={"train": TrainingInput(...)}) | InputDataConfig[].DataSource.S3DataSource.S3Uri | |
fit(job_name=...) | TrainingJobName | |
fit(wait=False) | create_training_job은 항상 비동기 | 완료 대기는 별도 polling 필요 |
⚠️
HyperParameters의 모든 값은 문자열이어야 합니다. 정수나 float를 넣으면 API 오류가 납니다.
# 틀림 "n_estimators": 300 # 맞음 "n_estimators": "300"
Training Job 상태 확인 (polling)
estimator.fit(wait=True)에 해당하는 boto3 패턴입니다.
📋 복사해서 붙여넣기
import time
def wait_for_training_job(job_name, region="ap-northeast-2", poll_sec=30):
sm = boto3.client("sagemaker", region_name=region)
status = "InProgress"
while status in ("InProgress", "Stopping"):
resp = sm.describe_training_job(TrainingJobName=job_name)
status = resp["TrainingJobStatus"]
secs = resp.get("TrainingTimeInSeconds", 0)
print(f"[{status}] {secs}s elapsed")
if status in ("InProgress", "Stopping"):
time.sleep(poll_sec)
if status == "Completed":
s3_model = resp["ModelArtifacts"]["S3ModelArtifacts"]
print(f"완료: {s3_model}")
else:
reason = resp.get("FailureReason", "unknown")
raise RuntimeError(f"Training Job 실패: {status} — {reason}")
return status, resp
# 사용 예시
status, info = wait_for_training_job(f"cdp-train-{run_id}")
Part 4 · EventBridge로 자동 트리거 연결
EventBridge는 자동 알람 시계입니다. "매일 오전 8시에 Lambda를 실행해" 또는 "S3에 파일이 올라오면 Lambda를 실행해" 같은 규칙을 만들 수 있습니다.
스케줄 트리거 만들기
📓 노트북에서 실행 · 📋 복사해서 붙여넣기
events = boto3.client('events', region_name='us-east-2')
account_id = boto3.client('sts', region_name='us-east-2').get_caller_identity()['Account']
REGION = 'us-east-2'
# 규칙 생성 (매일 오전 9시 UTC = 한국시간 오후 6시)
rule_response = events.put_rule(
Name='daily-ml-training',
ScheduleExpression='cron(0 9 * * ? *)', # UTC 기준
State='ENABLED',
Description='매일 오전 9시 (UTC) ML 모델 학습 자동 실행',
)
rule_arn = rule_response['RuleArn']
print(f"EventBridge Rule: {rule_arn}")
# Lambda에 EventBridge 호출 권한 부여
lambda_client.add_permission(
FunctionName="my-container-lambda",
StatementId='eventbridge-invoke',
Action='lambda:InvokeFunction',
Principal='events.amazonaws.com',
SourceArn=rule_arn,
)
# Lambda를 트리거 타겟으로 등록
events.put_targets(
Rule='daily-ml-training',
Targets=[{
'Id': 'ml-training-target',
'Arn': f'arn:aws:lambda:{REGION}:{account_id}:function:my-container-lambda',
'Input': json.dumps({
"task": "daily-training",
"user_id": "YOUR_ID",
"project": "titanic-survival-prediction"
})
}]
)
print("EventBridge 트리거 등록 완료")
⚠️
cron 표현식은 UTC 기준입니다. 한국시간(KST)은 UTC+9이므로, 한국시간 오전 6시 실행 = cron(0 21 * * ? *) (전날 21시 UTC).CloudWatch 로그 확인
📋 복사해서 붙여넣기
logs = boto3.client('logs', region_name='us-east-2')
log_group = '/aws/lambda/my-container-lambda'
streams = logs.describe_log_streams(
logGroupName=log_group,
orderBy='LastEventTime',
descending=True,
limit=1
)
if streams['logStreams']:
stream_name = streams['logStreams'][0]['logStreamName']
events_resp = logs.get_log_events(
logGroupName=log_group,
logStreamName=stream_name,
limit=20
)
for e in events_resp['events']:
print(e['message'])
결과 확인 체크리스트
| ☐ | 확인 항목 | 방법 |
|---|---|---|
| ☐ | Lambda 함수가 생성됐는가? | AWS 콘솔 → Lambda → Functions |
| ☐ | 직접 호출 테스트가 성공했는가? | 노트북 invoke 결과 확인 |
| ☐ | 컨테이너 이미지가 ECR에 올라갔는가? | aws ecr list-images |
| ☐ | boto3.create_training_job()이 TrainingJobArn을 반환했는가? | Lambda invoke 응답 확인 |
| ☐ | SageMaker 콘솔에서 Training Job이 Completed됐는가? | AWS 콘솔 → SageMaker → Training jobs |
| ☐ | EventBridge 규칙이 ENABLED 상태인가? | AWS 콘솔 → EventBridge → Rules |
| ☐ | CloudWatch 로그에 실행 기록이 있는가? | 위 로그 조회 코드 실행 |
✅ 여기까지 완료했다면, 여러분은 "이벤트 발생 → Lambda 실행 → SageMaker Training → 결과 저장"의 완전 자동화된 ML 파이프라인을 직접 구축한 겁니다.
이제 새벽에 일어나 학습 버튼을 누르지 않아도 됩니다. 시스템이 알아서 합니다.
이제 새벽에 일어나 학습 버튼을 누르지 않아도 됩니다. 시스템이 알아서 합니다.
전체 과정 마무리
Linux/Git/Docker → 개발 환경 표준화
SageMaker + ECR → 재현 가능한 학습 환경
S3 + ctx API → 실험 결과 자동 저장
DynamoDB → 실험 이력 구조화 저장
boto3.create_training_job() → 어디서든 Training Job 호출
Lambda + EventBridge → 파이프라인 자동화
✅ "내 노트북에서만 되는 코드"는 이제 없습니다. 시스템이 실험을 기억하고, 재현하고, 자동으로 실행합니다.