Loading...
Loading...
02-reusable-code-python/pipelines/hooks.py
"""
파이프라인 생명주기 훅
PipelineOrchestrator에 등록하여 파이프라인 실행 과정을 모니터링하거나
체크포인트를 저장하는 훅 구현체를 제공한다.
@source claude-world/notebooklm-skill
@extracted 2026-03-18
@version 1.0.0
의존성:
- 표준 라이브러리만 사용 (json, logging, pathlib, time)
사용법:
from pipelines.hooks import LoggingHook, TimingHook, CheckpointHook
orchestrator = PipelineOrchestrator()
logging_hook = LoggingHook()
timing_hook = TimingHook()
checkpoint_hook = CheckpointHook(checkpoint_dir="/tmp/pipeline-checkpoints")
orchestrator.register_hook("before_stage", logging_hook.before)
orchestrator.register_hook("after_stage", logging_hook.after)
orchestrator.register_hook("on_error", logging_hook.on_error)
orchestrator.register_hook("before_stage", timing_hook.before)
orchestrator.register_hook("after_stage", timing_hook.after)
orchestrator.register_hook("after_stage", checkpoint_hook.after)
"""
from __future__ import annotations
import json
import logging
import time
from pathlib import Path
from .models import StageResult
from .types import PipelineStage, StageStatus
logger = logging.getLogger(__name__)
class LoggingHook:
"""단계 실행 로깅 훅.
각 단계의 시작, 종료, 오류 발생 시 구조화된 로그를 남긴다.
사용 예::
hook = LoggingHook(log_level=logging.INFO)
orchestrator.register_hook("before_stage", hook.before)
orchestrator.register_hook("after_stage", hook.after)
orchestrator.register_hook("on_error", hook.on_error)
"""
def __init__(self, log_level: int = logging.INFO) -> None:
self._log_level = log_level
async def before(self, stage: PipelineStage, result: StageResult | None) -> None:
"""단계 시작 직전 호출된다."""
logger.log(
self._log_level,
"[PIPELINE] 단계 시작 → stage=%s",
stage.value,
)
async def after(self, stage: PipelineStage, result: StageResult | None) -> None:
"""단계 실행 완료 후 호출된다."""
if result is None:
return
status_icon = "✓" if result.status == StageStatus.COMPLETED else "✗"
logger.log(
self._log_level,
"[PIPELINE] 단계 완료 %s stage=%s, status=%s, duration=%.2fs, outputs=%d개",
status_icon,
stage.value,
result.status.value,
result.duration_seconds,
len(result.outputs),
)
async def on_error(self, stage: PipelineStage, result: StageResult | None) -> None:
"""단계 실패 시 호출된다."""
error_msg = result.error if result else "Unknown error"
logger.error(
"[PIPELINE] 단계 오류 ✗ stage=%s, error=%s",
stage.value,
error_msg,
)
class TimingHook:
"""단계별 실행 시간 추적 훅.
각 단계의 시작 시각을 기록하고, 완료 후 소요 시간을
내부 딕셔너리에 누적한다. get_report()로 요약을 확인할 수 있다.
사용 예::
hook = TimingHook()
orchestrator.register_hook("before_stage", hook.before)
orchestrator.register_hook("after_stage", hook.after)
report = await orchestrator.run(config)
print(hook.get_report())
"""
def __init__(self) -> None:
# 단계별 시작 시각 (monotonic)
self._start_times: dict[str, float] = {}
# 단계별 소요 시간 기록
self._durations: dict[str, float] = {}
async def before(self, stage: PipelineStage, result: StageResult | None) -> None:
"""단계 시작 시각을 기록한다."""
self._start_times[stage.value] = time.monotonic()
logger.debug("[TIMING] 시작 기록: stage=%s", stage.value)
async def after(self, stage: PipelineStage, result: StageResult | None) -> None:
"""단계 소요 시간을 계산하고 누적한다."""
start = self._start_times.get(stage.value)
if start is None:
return
elapsed = time.monotonic() - start
self._durations[stage.value] = elapsed
logger.debug("[TIMING] 소요 시간: stage=%s, elapsed=%.3fs", stage.value, elapsed)
def get_report(self) -> str:
"""누적된 단계별 실행 시간 요약 문자열을 반환한다.
Returns:
단계명과 소요 시간을 포함한 텍스트 보고서
"""
if not self._durations:
return "[TIMING] 기록된 단계 없음"
lines = ["[TIMING] 단계별 실행 시간:"]
total = 0.0
for stage_name, duration in self._durations.items():
lines.append(f" {stage_name:>12}: {duration:.3f}s")
total += duration
lines.append(f" {'합계':>12}: {total:.3f}s")
return "\n".join(lines)
@property
def total_seconds(self) -> float:
"""전체 누적 실행 시간(초)을 반환한다."""
return sum(self._durations.values())
class CheckpointHook:
"""단계 완료 후 체크포인트 저장 훅.
각 단계가 성공적으로 완료될 때마다 결과를 JSON 파일로
저장하여 파이프라인 재개 시 이전 결과를 활용할 수 있게 한다.
파일 이름 형식: ``{pipeline_name}_{stage_name}.json``
사용 예::
hook = CheckpointHook(checkpoint_dir="/tmp/checkpoints")
orchestrator.register_hook("after_stage", hook.after)
"""
def __init__(self, checkpoint_dir: str = "/tmp/pipeline-checkpoints") -> None:
self._checkpoint_dir = Path(checkpoint_dir)
# 디렉토리가 없으면 생성
self._checkpoint_dir.mkdir(parents=True, exist_ok=True)
logger.debug("[CHECKPOINT] 저장 경로: %s", self._checkpoint_dir)
async def after(self, stage: PipelineStage, result: StageResult | None) -> None:
"""단계 완료 후 결과를 JSON 파일로 저장한다.
SKIPPED 및 FAILED 상태는 저장하지 않는다.
Args:
stage: 완료된 단계
result: 단계 실행 결과
"""
if result is None or result.status != StageStatus.COMPLETED:
return
checkpoint_path = self._checkpoint_dir / f"{stage.value}_checkpoint.json"
payload: dict[str, object] = {
"stage": result.stage.value,
"status": result.status.value,
"outputs": result.outputs,
"duration_seconds": result.duration_seconds,
"metadata": result.metadata,
"saved_at": time.time(),
}
try:
checkpoint_path.write_text(
json.dumps(payload, ensure_ascii=False, indent=2),
encoding="utf-8",
)
logger.info(
"[CHECKPOINT] 저장 완료: stage=%s, path=%s",
stage.value,
checkpoint_path,
)
except OSError as exc:
logger.warning(
"[CHECKPOINT] 저장 실패 (무시): stage=%s, error=%s",
stage.value,
exc,
)
def load_checkpoint(self, stage: PipelineStage) -> dict | None:
"""저장된 체크포인트를 로드한다.
Args:
stage: 로드할 단계
Returns:
체크포인트 데이터 딕셔너리, 파일이 없으면 None
"""
checkpoint_path = self._checkpoint_dir / f"{stage.value}_checkpoint.json"
if not checkpoint_path.exists():
logger.debug("[CHECKPOINT] 파일 없음: stage=%s", stage.value)
return None
try:
data = json.loads(checkpoint_path.read_text(encoding="utf-8"))
logger.info("[CHECKPOINT] 로드 완료: stage=%s", stage.value)
return data
except (OSError, json.JSONDecodeError) as exc:
logger.warning(
"[CHECKPOINT] 로드 실패: stage=%s, error=%s", stage.value, exc
)
return None
def clear_checkpoints(self) -> int:
"""저장된 모든 체크포인트 파일을 삭제한다.
Returns:
삭제된 파일 수
"""
deleted = 0
for path in self._checkpoint_dir.glob("*_checkpoint.json"):
try:
path.unlink()
deleted += 1
except OSError as exc:
logger.warning("[CHECKPOINT] 삭제 실패: path=%s, error=%s", path, exc)
logger.info("[CHECKPOINT] 체크포인트 초기화: %d개 삭제", deleted)
return deleted