Loading...
Loading...
02-reusable-code-python/pipelines/models.py
"""
파이프라인 데이터 모델 (Pydantic v2)
파이프라인 설정, 단계 실행 결과, 최종 보고서, 레시피 정의에
사용되는 Pydantic v2 데이터 모델을 정의한다.
@source claude-world/notebooklm-skill
@extracted 2026-03-18
@version 1.0.0
의존성:
- pydantic>=2.0 (필수)
사용법:
from pipelines.models import PipelineConfig, StageResult, PipelineReport
source = SourceInput(type="url", content="https://example.com/article")
config = PipelineConfig(
name="research-pipeline",
stages=[PipelineStage.INGEST, PipelineStage.SYNTHESIZE],
outputs=[OutputType.REPORT],
source=source,
)
"""
from __future__ import annotations
import logging
from datetime import datetime
from typing import Any
from pydantic import BaseModel, ConfigDict, Field, field_validator
from .types import OutputType, PipelineStage, StageStatus
logger = logging.getLogger(__name__)
# 지원하는 소스 타입 목록
VALID_SOURCE_TYPES = {"url", "text", "file"}
class SourceInput(BaseModel):
"""파이프라인 입력 소스.
Attributes:
type: 소스 유형 (url, text, file 중 하나)
content: 소스 내용 (URL 문자열, 원문 텍스트, 파일 경로)
metadata: 소스 추가 메타데이터 (언어, 작성자, 날짜 등)
"""
model_config = ConfigDict(frozen=True)
type: str = Field(..., description="소스 유형: url | text | file")
content: str = Field(..., min_length=1, description="소스 내용")
metadata: dict[str, Any] = Field(default_factory=dict, description="추가 메타데이터")
@field_validator("type")
@classmethod
def validate_source_type(cls, v: str) -> str:
"""소스 타입이 지원 범위 내인지 검증한다."""
if v not in VALID_SOURCE_TYPES:
raise ValueError(
f"Invalid source type '{v}'. Must be one of: {VALID_SOURCE_TYPES}"
)
return v
class StageResult(BaseModel):
"""단계 실행 결과.
각 파이프라인 단계 실행 후 반환되는 결과 데이터.
Attributes:
stage: 실행된 단계 이름
status: 실행 상태
outputs: 생성된 산출물 경로 또는 식별자 목록
duration_seconds: 실행 소요 시간 (초)
error: 실패 시 오류 메시지
metadata: 단계별 추가 메타데이터
"""
model_config = ConfigDict(frozen=True)
stage: PipelineStage
status: StageStatus = StageStatus.PENDING
outputs: list[str] = Field(default_factory=list, description="산출물 경로/식별자 목록")
duration_seconds: float = Field(default=0.0, ge=0.0, description="실행 소요 시간 (초)")
error: str | None = Field(default=None, description="실패 시 오류 메시지")
metadata: dict[str, Any] = Field(default_factory=dict, description="단계 메타데이터")
@property
def is_success(self) -> bool:
"""단계가 성공적으로 완료되었는지 반환한다."""
return self.status == StageStatus.COMPLETED
class PipelineConfig(BaseModel):
"""파이프라인 실행 설정.
Attributes:
name: 파이프라인 이름 (식별자)
stages: 실행할 단계 목록 (순서대로 실행됨)
outputs: 생성할 산출물 타입 목록
source: 입력 소스
options: 파이프라인 실행 옵션 (stop_on_error, timeout_seconds 등)
created_at: 설정 생성 시각
"""
model_config = ConfigDict(frozen=False)
name: str = Field(..., min_length=1, description="파이프라인 이름")
stages: list[PipelineStage] = Field(
default_factory=lambda: list(PipelineStage),
description="실행 단계 목록 (기본: 전체 4단계)",
)
outputs: list[OutputType] = Field(
default_factory=list, description="생성할 산출물 타입 목록"
)
source: SourceInput
options: dict[str, Any] = Field(
default_factory=lambda: {"stop_on_error": True, "timeout_seconds": 300},
description="실행 옵션",
)
created_at: datetime = Field(default_factory=datetime.utcnow)
@field_validator("stages")
@classmethod
def validate_stages_not_empty(cls, v: list[PipelineStage]) -> list[PipelineStage]:
"""단계 목록이 비어있지 않은지 검증한다."""
if not v:
raise ValueError("Pipeline stages list must not be empty")
return v
@property
def stop_on_error(self) -> bool:
"""오류 발생 시 파이프라인을 중단할지 반환한다."""
return bool(self.options.get("stop_on_error", True))
@property
def timeout_seconds(self) -> int:
"""파이프라인 전체 타임아웃 (초) 반환한다."""
return int(self.options.get("timeout_seconds", 300))
class PipelineReport(BaseModel):
"""파이프라인 실행 최종 보고서.
Attributes:
config: 실행에 사용된 파이프라인 설정
results: 각 단계별 실행 결과 목록
total_duration: 전체 실행 소요 시간 (초)
success: 파이프라인 전체 성공 여부
cost_estimate: 예상 비용 (USD)
finished_at: 실행 완료 시각
"""
model_config = ConfigDict(frozen=True)
config: PipelineConfig
results: list[StageResult] = Field(default_factory=list)
total_duration: float = Field(default=0.0, ge=0.0, description="전체 소요 시간 (초)")
success: bool = Field(default=False)
cost_estimate: float = Field(default=0.0, ge=0.0, description="예상 비용 (USD)")
finished_at: datetime = Field(default_factory=datetime.utcnow)
@property
def failed_stages(self) -> list[StageResult]:
"""실패한 단계 목록을 반환한다."""
return [r for r in self.results if r.status == StageStatus.FAILED]
@property
def completed_stages(self) -> list[StageResult]:
"""완료된 단계 목록을 반환한다."""
return [r for r in self.results if r.status == StageStatus.COMPLETED]
def summary(self) -> str:
"""실행 결과 요약 문자열을 반환한다."""
total = len(self.results)
done = len(self.completed_stages)
failed = len(self.failed_stages)
status = "SUCCESS" if self.success else "FAILED"
return (
f"[{status}] {self.config.name}: "
f"{done}/{total} stages completed, {failed} failed, "
f"{self.total_duration:.2f}s, ~${self.cost_estimate:.4f}"
)
class RecipeDefinition(BaseModel):
"""사전 정의 파이프라인 레시피.
재사용 가능한 파이프라인 설정 템플릿.
Attributes:
name: 레시피 식별자 (kebab-case)
description: 레시피 설명 (한국어)
stages: 실행 단계 목록
default_outputs: 기본 산출물 타입 목록
estimated_cost: 예상 비용 (USD)
tags: 분류 태그 목록
"""
model_config = ConfigDict(frozen=True)
name: str = Field(..., description="레시피 식별자 (kebab-case)")
description: str = Field(..., description="레시피 설명")
stages: list[PipelineStage] = Field(..., description="실행 단계 목록")
default_outputs: list[OutputType] = Field(default_factory=list)
estimated_cost: float = Field(default=0.0, ge=0.0, description="예상 비용 (USD)")
tags: list[str] = Field(default_factory=list, description="분류 태그")