Loading...
Loading...
02-reusable-code-python/pipelines/orchestrator.py
"""
4단계 파이프라인 오케스트레이션 엔진
Ingest → Synthesize → Create → Publish 파이프라인을 실행하는
범용 오케스트레이터. 각 단계는 StageHandler 콜백으로 구현한다.
생명주기 훅(before_stage, after_stage, on_error)을 통해 확장 가능하다.
@source claude-world/notebooklm-skill
@extracted 2026-03-18
@version 1.0.0
의존성:
- pydantic>=2.0 (필수)
사용법:
from pipelines.orchestrator import PipelineOrchestrator
from pipelines.types import PipelineStage
async def my_ingest_handler(config, previous_results):
# 소스 데이터 수집 로직
return StageResult(stage=PipelineStage.INGEST, status=StageStatus.COMPLETED)
orchestrator = PipelineOrchestrator()
orchestrator.register_handler(PipelineStage.INGEST, my_ingest_handler)
report = await orchestrator.run(config)
"""
from __future__ import annotations
import asyncio
import logging
import time
from collections.abc import Awaitable, Callable
from typing import Any
from .models import PipelineConfig, PipelineReport, StageResult
from .types import PipelineStage, StageStatus
logger = logging.getLogger(__name__)
# 스테이지 핸들러 타입: (설정, 이전 결과 목록) → 단계 결과
StageHandler = Callable[[PipelineConfig, list[StageResult]], Awaitable[StageResult]]
# 생명주기 훅 타입: (단계, 결과 또는 None) → None
LifecycleHook = Callable[[PipelineStage, StageResult | None], Awaitable[None]]
class PipelineOrchestrator:
"""4단계 파이프라인 오케스트레이터.
각 파이프라인 단계에 핸들러를 등록하고, 순차적으로 실행하여
최종 PipelineReport를 반환한다.
생명주기 훅:
- ``before_stage``: 단계 실행 직전 호출 (result=None)
- ``after_stage``: 단계 실행 직후 호출 (result=완료/실패 결과)
- ``on_error``: 단계 실패 시 추가 호출 (result=실패 결과)
사용 예::
orchestrator = PipelineOrchestrator()
orchestrator.register_handler(PipelineStage.INGEST, ingest_fn)
orchestrator.register_hook("before_stage", logging_hook)
report = await orchestrator.run(config)
"""
# 지원하는 훅 이벤트 이름 목록
SUPPORTED_HOOK_EVENTS = frozenset({"before_stage", "after_stage", "on_error"})
def __init__(self) -> None:
# 단계별 핸들러 맵: {PipelineStage → StageHandler}
self._handlers: dict[PipelineStage, StageHandler] = {}
# 생명주기 훅 맵: {event_name → [LifecycleHook, ...]}
self._hooks: dict[str, list[LifecycleHook]] = {
event: [] for event in self.SUPPORTED_HOOK_EVENTS
}
def register_handler(self, stage: PipelineStage, handler: StageHandler) -> None:
"""특정 파이프라인 단계에 실행 핸들러를 등록한다.
Args:
stage: 핸들러를 등록할 파이프라인 단계
handler: 비동기 핸들러 함수 (StageHandler 시그니처)
Raises:
ValueError: 지원하지 않는 단계 값이 전달된 경우
"""
if not isinstance(stage, PipelineStage):
raise ValueError(f"Invalid pipeline stage: {stage}")
self._handlers[stage] = handler
logger.debug("핸들러 등록: stage=%s, handler=%s", stage.value, handler.__name__)
def register_hook(self, event: str, hook: LifecycleHook) -> None:
"""파이프라인 생명주기 훅을 등록한다.
Args:
event: 훅 이벤트 이름 (before_stage | after_stage | on_error)
hook: 비동기 훅 함수 (LifecycleHook 시그니처)
Raises:
ValueError: 지원하지 않는 이벤트 이름인 경우
"""
if event not in self.SUPPORTED_HOOK_EVENTS:
raise ValueError(
f"Invalid hook event '{event}'. "
f"Supported events: {sorted(self.SUPPORTED_HOOK_EVENTS)}"
)
self._hooks[event].append(hook)
logger.debug("훅 등록: event=%s, hook=%s", event, hook.__name__ if hasattr(hook, "__name__") else repr(hook))
async def _fire_hooks(
self,
event: str,
stage: PipelineStage,
result: StageResult | None = None,
) -> None:
"""등록된 훅을 순차적으로 실행한다.
훅 실행 중 발생하는 예외는 로그에 기록하고 무시한다.
훅 실패가 파이프라인 실행을 중단시키지 않도록 설계되었다.
Args:
event: 훅 이벤트 이름
stage: 현재 단계
result: 단계 실행 결과 (before_stage 시 None)
"""
for hook in self._hooks.get(event, []):
try:
await hook(stage, result)
except Exception as exc:
logger.warning(
"훅 실행 중 오류 무시: event=%s, stage=%s, error=%s",
event,
stage.value,
exc,
)
async def _run_stage(
self,
stage: PipelineStage,
config: PipelineConfig,
previous_results: list[StageResult],
) -> StageResult:
"""단일 파이프라인 단계를 실행한다.
핸들러가 등록되지 않은 단계는 SKIPPED 상태로 처리된다.
Args:
stage: 실행할 단계
config: 파이프라인 설정
previous_results: 이전 단계 실행 결과 목록
Returns:
단계 실행 결과 (StageResult)
"""
# before_stage 훅 실행
await self._fire_hooks("before_stage", stage, None)
handler = self._handlers.get(stage)
if handler is None:
# 핸들러 미등록 단계는 건너뜀
logger.info("핸들러 없음, 단계 건너뜀: stage=%s", stage.value)
skipped = StageResult(stage=stage, status=StageStatus.SKIPPED)
await self._fire_hooks("after_stage", stage, skipped)
return skipped
logger.info("단계 시작: stage=%s", stage.value)
start = time.monotonic()
try:
# 타임아웃 적용 (단계별 타임아웃은 전체 타임아웃 공유)
result = await asyncio.wait_for(
handler(config, previous_results),
timeout=config.timeout_seconds,
)
except asyncio.TimeoutError:
duration = time.monotonic() - start
logger.error(
"단계 타임아웃: stage=%s, timeout=%ds",
stage.value,
config.timeout_seconds,
)
result = StageResult(
stage=stage,
status=StageStatus.FAILED,
duration_seconds=duration,
error=f"Stage timed out after {config.timeout_seconds}s",
)
await self._fire_hooks("on_error", stage, result)
except Exception as exc:
duration = time.monotonic() - start
logger.error(
"단계 실패: stage=%s, error=%s", stage.value, exc, exc_info=True
)
result = StageResult(
stage=stage,
status=StageStatus.FAILED,
duration_seconds=duration,
error=str(exc),
)
await self._fire_hooks("on_error", stage, result)
else:
duration = time.monotonic() - start
logger.info(
"단계 완료: stage=%s, status=%s, duration=%.2fs",
stage.value,
result.status.value,
duration,
)
await self._fire_hooks("after_stage", stage, result)
return result
async def run(self, config: PipelineConfig) -> PipelineReport:
"""파이프라인 전체를 순차 실행하고 최종 보고서를 반환한다.
config.stages 순서대로 각 단계를 실행한다.
stop_on_error=True(기본값)이면 첫 번째 실패 단계에서 중단하고,
나머지 단계는 SKIPPED로 처리된다.
Args:
config: 파이프라인 실행 설정
Returns:
전체 실행 결과를 담은 PipelineReport
"""
logger.info(
"파이프라인 시작: name=%s, stages=%s",
config.name,
[s.value for s in config.stages],
)
pipeline_start = time.monotonic()
results: list[StageResult] = []
pipeline_failed = False
for stage in config.stages:
# 이전 단계 실패 시 나머지 건너뜀 (stop_on_error 옵션)
if pipeline_failed and config.stop_on_error:
logger.info("이전 단계 실패로 건너뜀: stage=%s", stage.value)
results.append(StageResult(stage=stage, status=StageStatus.SKIPPED))
continue
result = await self._run_stage(stage, config, results)
results.append(result)
if result.status == StageStatus.FAILED:
pipeline_failed = True
if config.stop_on_error:
logger.warning(
"stop_on_error=True: 파이프라인 중단 결정, stage=%s", stage.value
)
total_duration = time.monotonic() - pipeline_start
success = not pipeline_failed and all(
r.status in (StageStatus.COMPLETED, StageStatus.SKIPPED) for r in results
)
report = PipelineReport(
config=config,
results=results,
total_duration=total_duration,
success=success,
)
logger.info(
"파이프라인 종료: %s",
report.summary(),
)
return report