CDP 실험 관리

Lambda + EventBridge
서버리스 파이프라인

수동으로 실행하던 Training Job을 자동으로 트리거합니다.

서버리스 파이프라인

이벤트가 발생하면 Lambda가 Training Job을 자동 실행합니다.

"코드만 올려두면 AWS가 알아서 서버를 띄우고, 실행하고, 끕니다."
수동 실행서버리스 파이프라인
사람이 매일 아침 직접 실행스케줄에 맞춰 자동 실행
노트북을 켜놔야 함서버 불필요
실수로 빠뜨릴 수 있음놓치지 않고 반드시 실행
로그 추적 어려움CloudWatch에 자동 기록

전체 파이프라인 흐름:

Trigger
EventBridge
스케줄 / 이벤트
Execute
Lambda 함수
Python 핸들러
Train
SageMaker
Training Job
Store
S3 + DDB
결과 저장
ℹ️ 실행 환경 표기: 📓 = SageMaker 노트북 셀 | 💻 = 맥북 터미널 | 🌐 = AWS 콘솔

Part 1 · Lambda 기초 — Zip 배포

Lambda가 동작하는 원리

항목EC2 (일반 서버)Lambda (서버리스)
서버 관리직접 띄우고 관리AWS가 알아서
비용24시간 과금실행된 시간만 과금
스케일링직접 설정자동 (동시 1000개도 가능)
상태서버에 파일/메모리 유지실행 끝나면 사라짐

lambda_handler 기본 구조

import json
from datetime import datetime

def lambda_handler(event, context):
    # event: 호출자가 보낸 데이터 (dict)
    # context: 실행 환경 정보 (남은 시간, 함수 이름 등)

    name = event.get('name', 'World')

    result = {
        'message': f'Hello, {name}!',
        'timestamp': datetime.now().isoformat(),
        'received_event': event
    }

    return {
        'statusCode': 200,
        'body': json.dumps(result, ensure_ascii=False)
    }
ℹ️ event 구조는 호출자에 따라 달라집니다. EventBridge → {"source": "...", "detail": {...}}, S3 → {"Records": [...]}. Lambda 코드를 짤 때 "누가 호출할 건지"를 먼저 정하세요.

Step 1: Lambda용 IAM Role 만들기

📓 노트북에서 실행 · 📋 복사해서 붙여넣기

import boto3, json, time

iam = boto3.client('iam', region_name='us-east-2')

trust_policy = {
    "Version": "2012-10-17",
    "Statement": [{
        "Effect": "Allow",
        "Principal": {"Service": "lambda.amazonaws.com"},
        "Action": "sts:AssumeRole"
    }]
}

role_name = "lambda-basic-execution-role"

try:
    response = iam.create_role(
        RoleName=role_name,
        AssumeRolePolicyDocument=json.dumps(trust_policy),
    )
    role_arn = response['Role']['Arn']
    print(f"Role 생성 완료: {role_arn}")
except iam.exceptions.EntityAlreadyExistsException:
    role_arn = iam.get_role(RoleName=role_name)['Role']['Arn']
    print(f"이미 존재하는 Role 사용: {role_arn}")

iam.attach_role_policy(
    RoleName=role_name,
    PolicyArn="arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
)
print("권한 추가 완료")

print("대기 중 (10초)...")
time.sleep(10)
ℹ️ 두 가지 Role을 헷갈리지 마세요:
· SageMaker Role → "노트북이 Lambda를 만들 수 있는" 권한
· Lambda Role → "Lambda가 실행될 때 CloudWatch에 로그를 쓸 수 있는" 권한

Step 2: Lambda 함수 등록

📓 노트북에서 실행 · 📋 복사해서 붙여넣기

import boto3, zipfile, io, json

lambda_client = boto3.client('lambda', region_name='us-east-2')

lambda_code = """
import json
from datetime import datetime

def lambda_handler(event, context):
    name = event.get('name', 'World')
    return {
        'statusCode': 200,
        'body': json.dumps({
            'message': f'Hello, {name}!',
            'timestamp': datetime.now().isoformat()
        }, ensure_ascii=False)
    }
"""

zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
    zf.writestr('lambda_function.py', lambda_code)
zip_buffer.seek(0)

function_name = "my-first-lambda"

try:
    response = lambda_client.create_function(
        FunctionName=function_name,
        Runtime='python3.12',
        Role=role_arn,
        Handler='lambda_function.lambda_handler',
        Code={'ZipFile': zip_buffer.read()},
        Timeout=30,
        MemorySize=128,
    )
    print(f"Lambda 생성 완료: {response['FunctionArn']}")
except lambda_client.exceptions.ResourceConflictException:
    print("이미 존재합니다.")

Step 3: 함수 테스트

📋 복사해서 붙여넣기

import json

response = lambda_client.invoke(
    FunctionName=function_name,
    InvocationType='RequestResponse',   # 동기 호출
    Payload=json.dumps({"name": "Lambda 초보자"})
)

result = json.loads(response['Payload'].read())
print(json.dumps(result, indent=2, ensure_ascii=False))

Part 2 · 컨테이너 이미지로 Lambda 배포
비교Zip 배포컨테이너 이미지
패키지 크기최대 250MB최대 10GB
ML 라이브러리Layer로 추가 (번거로움)Dockerfile에서 자유롭게
로컬 테스트어려움Docker로 동일 환경 테스트
ℹ️ ML 모델 파일이 크거나 pandas/scikit-learn 같은 무거운 라이브러리가 필요하면 컨테이너 이미지 방식을 쓰세요.

이미지 빌드 + ECR 등록 + Lambda 배포

📋 복사해서 붙여넣기

%%bash
ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
REGION="us-east-2"
REPO_NAME="my-lambda-container"
ECR_URI="${ACCOUNT_ID}.dkr.ecr.${REGION}.amazonaws.com"

# 이미지 빌드
cd ~/lambda-container
docker build -t ${REPO_NAME}:latest .

# ECR 로그인 + 푸시
aws ecr get-login-password --region ${REGION} | \
  docker login --username AWS --password-stdin ${ECR_URI}

docker tag ${REPO_NAME}:latest ${ECR_URI}/${REPO_NAME}:latest
docker push ${ECR_URI}/${REPO_NAME}:latest

echo "Push 완료: ${ECR_URI}/${REPO_NAME}:latest"
# 컨테이너 이미지로 Lambda 등록
account_id = boto3.client('sts', region_name='us-east-2').get_caller_identity()['Account']
image_uri  = f"{account_id}.dkr.ecr.us-east-2.amazonaws.com/my-lambda-container:latest"

lambda_client.create_function(
    FunctionName="my-container-lambda",
    Role=role_arn,
    Code={'ImageUri': image_uri},
    PackageType='Image',    # ← 컨테이너 이미지
    Timeout=60,
    MemorySize=256,
)
print("컨테이너 Lambda 등록 완료")

Part 3 · Lambda에서 SageMaker Training Job 호출

estimator.fit() vs boto3 — 무엇이 다른가?

항목estimator.fit()boto3.create_training_job()
패키지sagemaker (무거움, ~50MB)boto3 (Lambda 기본 내장)
사용 환경노트북, 로컬 스크립트Lambda, CI/CD, 어디서든
파라미터 방식Python 객체 (Estimator, TrainingInput)JSON 딕셔너리
기능 차이없음 — 내부적으로 동일한 SageMaker API 호출
ℹ️ estimator.fit()은 내부에서 boto3create_training_job을 래핑해 호출합니다. 두 방법의 결과는 완전히 동일합니다.

estimator.fit() 코드 (노트북 방식 — 참고)

# 📓 노트북에서 실행하는 방식
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput
import sagemaker

REGION      = "ap-northeast-2"
ACCOUNT_ID  = boto3.client("sts").get_caller_identity()["Account"]
BUCKET      = f"gs-cdp-mllab-{ctx.model['env_nm']}"
IMAGE_URI   = f"{ACCOUNT_ID}.dkr.ecr.{REGION}.amazonaws.com/gs-automl-base-containers/boilerplate312:latest"
ROLE_ARN    = sagemaker.get_execution_role()

estimator = Estimator(
    image_uri=IMAGE_URI,
    role=ROLE_ARN,
    instance_count=1,
    instance_type="ml.m5.xlarge",
    volume_size=30,
    max_run=3600,
    output_path=f"s3://{BUCKET}/sm-output/",
    hyperparameters={
        "run_id":    ctx.run_id,
        "env_nm":    ctx.model["env_nm"],
        "worker_id": ctx.model["worker_id"],
    },
    environment={"AWS_DEFAULT_REGION": REGION},
)

estimator.fit(
    inputs={
        "train": TrainingInput(
            s3_data=f"s3://{BUCKET}/{ctx.model['env_nm']}/{ctx.project['project_nm']}/data/",
            content_type="text/csv",
        )
    },
    job_name=f"cdp-train-{ctx.run_id}",
    wait=False,
)

boto3로 동일하게 호출 (Lambda 방식)

estimator.fit() 호출과 1:1로 대응됩니다. Lambda 함수 안에서 바로 사용할 수 있습니다.

📋 복사해서 붙여넣기

import boto3, json
from datetime import datetime

def lambda_handler(event, context):
    """
    EventBridge 또는 직접 호출로 SageMaker Training Job을 시작합니다.
    event 예시:
      {"run_id": "20260526_143012_a3f9bc", "env_nm": "dev", "worker_id": "hjsong"}
    """
    run_id    = event.get("run_id",    datetime.now().strftime("%Y%m%d_%H%M%S"))
    env_nm    = event.get("env_nm",    "dev")
    worker_id = event.get("worker_id", "unknown")

    REGION     = "ap-northeast-2"
    ACCOUNT_ID = boto3.client("sts").get_caller_identity()["Account"]
    BUCKET     = f"gs-cdp-mllab-{env_nm}"
    IMAGE_URI  = f"{ACCOUNT_ID}.dkr.ecr.{REGION}.amazonaws.com/gs-automl-base-containers/boilerplate312:latest"
    ROLE_ARN   = f"arn:aws:iam::{ACCOUNT_ID}:role/gs-cdp-sagemaker-role"

    sm = boto3.client("sagemaker", region_name=REGION)

    response = sm.create_training_job(
        TrainingJobName=f"cdp-train-{run_id}",

        # estimator(image_uri=...) 에 해당
        AlgorithmSpecification={
            "TrainingImage":     IMAGE_URI,
            "TrainingInputMode": "File",
        },

        # estimator(role=...) 에 해당
        RoleArn=ROLE_ARN,

        # estimator.fit(inputs={...}) 에 해당
        InputDataConfig=[{
            "ChannelName": "train",
            "DataSource": {
                "S3DataSource": {
                    "S3DataType":             "S3Prefix",
                    "S3Uri":                  f"s3://{BUCKET}/{env_nm}/data/",
                    "S3DataDistributionType": "FullyReplicated",
                }
            },
            "ContentType": "text/csv",
            "InputMode":   "File",
        }],

        # estimator(output_path=...) 에 해당
        OutputDataConfig={
            "S3OutputPath": f"s3://{BUCKET}/sm-output/",
        },

        # estimator(instance_type=..., instance_count=..., volume_size=...) 에 해당
        ResourceConfig={
            "InstanceType":   "ml.m5.xlarge",
            "InstanceCount":  1,
            "VolumeSizeInGB": 30,
        },

        # estimator(max_run=...) 에 해당
        StoppingCondition={
            "MaxRuntimeInSeconds": 3600,
        },

        # estimator(hyperparameters={...}) 에 해당
        # ⚠️ 값은 반드시 문자열이어야 합니다
        HyperParameters={
            "run_id":    run_id,
            "env_nm":    env_nm,
            "worker_id": worker_id,
        },

        # estimator(environment={...}) 에 해당
        Environment={
            "AWS_DEFAULT_REGION": REGION,
        },
    )

    job_arn = response["TrainingJobArn"]
    print(f"Training Job 시작: {job_arn}")

    return {
        "statusCode": 200,
        "body": json.dumps({
            "training_job_name": f"cdp-train-{run_id}",
            "training_job_arn":  job_arn,
            "run_id":            run_id,
        })
    }

파라미터 대응표

Estimator / fit() 파라미터create_training_job JSON 키비고
image_uriAlgorithmSpecification.TrainingImage
roleRoleArn
instance_typeResourceConfig.InstanceType
instance_countResourceConfig.InstanceCount
volume_sizeResourceConfig.VolumeSizeInGB
max_runStoppingCondition.MaxRuntimeInSeconds
output_pathOutputDataConfig.S3OutputPath
hyperparametersHyperParameters값이 반드시 str
environmentEnvironment
fit(inputs={"train": TrainingInput(...)})InputDataConfig[].DataSource.S3DataSource.S3Uri
fit(job_name=...)TrainingJobName
fit(wait=False)create_training_job은 항상 비동기완료 대기는 별도 polling 필요
⚠️ HyperParameters의 모든 값은 문자열이어야 합니다. 정수나 float를 넣으면 API 오류가 납니다.
# 틀림
"n_estimators": 300
# 맞음
"n_estimators": "300"

Training Job 상태 확인 (polling)

estimator.fit(wait=True)에 해당하는 boto3 패턴입니다.

📋 복사해서 붙여넣기

import time

def wait_for_training_job(job_name, region="ap-northeast-2", poll_sec=30):
    sm     = boto3.client("sagemaker", region_name=region)
    status = "InProgress"

    while status in ("InProgress", "Stopping"):
        resp   = sm.describe_training_job(TrainingJobName=job_name)
        status = resp["TrainingJobStatus"]
        secs   = resp.get("TrainingTimeInSeconds", 0)
        print(f"[{status}] {secs}s elapsed")
        if status in ("InProgress", "Stopping"):
            time.sleep(poll_sec)

    if status == "Completed":
        s3_model = resp["ModelArtifacts"]["S3ModelArtifacts"]
        print(f"완료: {s3_model}")
    else:
        reason = resp.get("FailureReason", "unknown")
        raise RuntimeError(f"Training Job 실패: {status} — {reason}")

    return status, resp

# 사용 예시
status, info = wait_for_training_job(f"cdp-train-{run_id}")

Part 4 · EventBridge로 자동 트리거 연결

EventBridge는 자동 알람 시계입니다. "매일 오전 8시에 Lambda를 실행해" 또는 "S3에 파일이 올라오면 Lambda를 실행해" 같은 규칙을 만들 수 있습니다.

스케줄 트리거 만들기

📓 노트북에서 실행 · 📋 복사해서 붙여넣기

events = boto3.client('events', region_name='us-east-2')
account_id = boto3.client('sts', region_name='us-east-2').get_caller_identity()['Account']
REGION = 'us-east-2'

# 규칙 생성 (매일 오전 9시 UTC = 한국시간 오후 6시)
rule_response = events.put_rule(
    Name='daily-ml-training',
    ScheduleExpression='cron(0 9 * * ? *)',     # UTC 기준
    State='ENABLED',
    Description='매일 오전 9시 (UTC) ML 모델 학습 자동 실행',
)
rule_arn = rule_response['RuleArn']
print(f"EventBridge Rule: {rule_arn}")

# Lambda에 EventBridge 호출 권한 부여
lambda_client.add_permission(
    FunctionName="my-container-lambda",
    StatementId='eventbridge-invoke',
    Action='lambda:InvokeFunction',
    Principal='events.amazonaws.com',
    SourceArn=rule_arn,
)

# Lambda를 트리거 타겟으로 등록
events.put_targets(
    Rule='daily-ml-training',
    Targets=[{
        'Id': 'ml-training-target',
        'Arn': f'arn:aws:lambda:{REGION}:{account_id}:function:my-container-lambda',
        'Input': json.dumps({
            "task": "daily-training",
            "user_id": "YOUR_ID",
            "project": "titanic-survival-prediction"
        })
    }]
)
print("EventBridge 트리거 등록 완료")
⚠️ cron 표현식은 UTC 기준입니다. 한국시간(KST)은 UTC+9이므로, 한국시간 오전 6시 실행 = cron(0 21 * * ? *) (전날 21시 UTC).

CloudWatch 로그 확인

📋 복사해서 붙여넣기

logs = boto3.client('logs', region_name='us-east-2')

log_group = '/aws/lambda/my-container-lambda'

streams = logs.describe_log_streams(
    logGroupName=log_group,
    orderBy='LastEventTime',
    descending=True,
    limit=1
)

if streams['logStreams']:
    stream_name = streams['logStreams'][0]['logStreamName']
    events_resp = logs.get_log_events(
        logGroupName=log_group,
        logStreamName=stream_name,
        limit=20
    )
    for e in events_resp['events']:
        print(e['message'])

결과 확인 체크리스트

확인 항목방법
Lambda 함수가 생성됐는가?AWS 콘솔 → Lambda → Functions
직접 호출 테스트가 성공했는가?노트북 invoke 결과 확인
컨테이너 이미지가 ECR에 올라갔는가?aws ecr list-images
boto3.create_training_job()TrainingJobArn을 반환했는가?Lambda invoke 응답 확인
SageMaker 콘솔에서 Training Job이 Completed됐는가?AWS 콘솔 → SageMaker → Training jobs
EventBridge 규칙이 ENABLED 상태인가?AWS 콘솔 → EventBridge → Rules
CloudWatch 로그에 실행 기록이 있는가?위 로그 조회 코드 실행
여기까지 완료했다면, 여러분은 "이벤트 발생 → Lambda 실행 → SageMaker Training → 결과 저장"의 완전 자동화된 ML 파이프라인을 직접 구축한 겁니다.

이제 새벽에 일어나 학습 버튼을 누르지 않아도 됩니다. 시스템이 알아서 합니다.

전체 과정 마무리

Linux/Git/Docker                 →  개발 환경 표준화
SageMaker + ECR                  →  재현 가능한 학습 환경
S3 + ctx API                     →  실험 결과 자동 저장
DynamoDB                         →  실험 이력 구조화 저장
boto3.create_training_job()      →  어디서든 Training Job 호출
Lambda + EventBridge             →  파이프라인 자동화
"내 노트북에서만 되는 코드"는 이제 없습니다. 시스템이 실험을 기억하고, 재현하고, 자동으로 실행합니다.