Loading...
Loading...
02-reusable-code-python/utils/preprocessing_logger.py
"""
@source: 260313 heath-infer-step01
@extracted: 2026-03-14
@description: 파이프라인 전용 로거 + @stage_handler + @retry 데코레이터.
싱글톤 패턴, 파일+콘솔 동시 출력, Stage별 컨텍스트 자동 추가.
사용법:
# 로거 사용
logger = PipelineLogger(log_dir="./logs")
logger.stage_start("Stage 1", records=1000)
logger.info("처리 중...")
logger.stage_end("Stage 1", records_in=1000, records_out=950)
# Stage 데코레이터
@stage_handler("Stage 1", critical=True)
def stage1_parse(filepath: str) -> list[dict]:
...
# 재시도 데코레이터
@retry(max_attempts=3, delay=1.0)
def fetch_external_data():
...
"""
import logging
import sys
from datetime import datetime
from enum import Enum
from functools import wraps
from pathlib import Path
from typing import Any, Callable, TypeVar
T = TypeVar('T')
class LogLevel(Enum):
"""로그 레벨"""
DEBUG = logging.DEBUG
INFO = logging.INFO
WARNING = logging.WARNING
ERROR = logging.ERROR
CRITICAL = logging.CRITICAL
class StageContextFilter(logging.Filter):
"""Stage 컨텍스트를 로그 레코드에 추가하는 필터"""
def __init__(self, default_stage: str = "INIT"):
super().__init__()
self.current_stage = default_stage
def filter(self, record: logging.LogRecord) -> bool:
if not hasattr(record, 'stage'):
record.stage = self.current_stage
return True
def set_stage(self, stage: str) -> None:
self.current_stage = stage
class PipelineLogger:
"""
파이프라인 전용 로거
특징:
- 싱글톤 패턴
- 파일 + 콘솔 동시 출력
- Stage별 컨텍스트 자동 추가
- 구조화된 로그 포맷
사용 예시:
logger = PipelineLogger()
logger.stage_start("Stage 1", records=1000)
logger.info("처리 중...")
logger.stage_end("Stage 1", records_in=1000, records_out=950)
"""
_instance: 'PipelineLogger | None' = None
def __new__(cls, *args: Any, **kwargs: Any) -> 'PipelineLogger':
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(
self,
log_dir: str | Path | None = None,
log_level: LogLevel = LogLevel.INFO,
console_output: bool = True,
file_output: bool = True,
logger_name: str = "pipeline",
log_prefix: str = "pipeline",
):
"""
Args:
log_dir: 로그 파일 디렉토리 (None이면 ./logs)
log_level: 로그 레벨
console_output: 콘솔 출력 여부
file_output: 파일 출력 여부
logger_name: 로거 이름 (logging 네임스페이스)
log_prefix: 로그 파일명 접두사
"""
# 중복 초기화 방지
if hasattr(self, '_initialized') and self._initialized:
if log_level != LogLevel.INFO:
self.logger.setLevel(log_level.value)
return
self._initialized = True
# 로그 디렉토리 결정
if log_dir is None:
log_dir = Path.cwd() / "logs"
self.log_dir = Path(log_dir)
self.log_dir.mkdir(parents=True, exist_ok=True)
# 로그 파일명: {prefix}_YYYYMMDD_HHMMSS.log
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
self.log_file = self.log_dir / f"{log_prefix}_{timestamp}.log"
# 로거 설정
self.logger = logging.getLogger(logger_name)
self.logger.setLevel(log_level.value)
self.logger.handlers.clear()
# Stage 컨텍스트 필터
self.stage_filter = StageContextFilter("INIT")
self.logger.addFilter(self.stage_filter)
# 포맷터
formatter = logging.Formatter(
fmt="%(asctime)s | %(levelname)-8s | %(stage)-12s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
# 파일 핸들러
if file_output:
file_handler = logging.FileHandler(self.log_file, encoding='utf-8')
file_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
# 콘솔 핸들러
if console_output:
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
self.logger.addHandler(console_handler)
self.info(f"로그 시작 - 파일: {self.log_file}")
def set_stage(self, stage: str) -> None:
"""현재 Stage 설정"""
self.stage_filter.set_stage(stage)
def debug(self, message: str, **kwargs: Any) -> None:
self.logger.debug(message, extra=kwargs)
def info(self, message: str, **kwargs: Any) -> None:
self.logger.info(message, extra=kwargs)
def warning(self, message: str, **kwargs: Any) -> None:
self.logger.warning(message, extra=kwargs)
def error(self, message: str, **kwargs: Any) -> None:
self.logger.error(message, extra=kwargs)
def critical(self, message: str, **kwargs: Any) -> None:
self.logger.critical(message, extra=kwargs)
def stage_start(self, stage_name: str, **context: Any) -> None:
"""Stage 시작 로깅"""
self.set_stage(stage_name)
ctx_str = ", ".join(f"{k}={v}" for k, v in context.items())
msg = f">> Stage 시작"
if ctx_str:
msg += f" ({ctx_str})"
self.info(msg)
def stage_end(
self,
stage_name: str,
records_in: int = 0,
records_out: int = 0,
duration_sec: float | None = None
) -> None:
"""Stage 종료 로깅"""
msg = f"[OK] Stage 완료 (입력: {records_in:,}, 출력: {records_out:,})"
if duration_sec is not None:
msg += f" [{duration_sec:.2f}s]"
self.info(msg)
def stage_error(self, stage_name: str, error: Exception) -> None:
"""Stage 에러 로깅"""
self.error(f"[FAIL] Stage 실패 - {type(error).__name__}: {str(error)}")
def progress(self, current: int, total: int, message: str = "") -> None:
"""진행률 로깅 (매 10% 마다)"""
if total == 0:
return
percentage = (current / total) * 100
if current == total or (current % max(1, total // 10) == 0):
self.debug(f"진행: {percentage:.0f}% ({current:,}/{total:,}) {message}")
@classmethod
def reset(cls) -> None:
"""싱글톤 인스턴스 리셋 (테스트용)"""
cls._instance = None
def get_logger() -> PipelineLogger:
"""전역 로거 인스턴스 반환"""
return PipelineLogger()
def stage_handler(
stage_name: str,
critical: bool = False
) -> Callable[[Callable[..., T]], Callable[..., T]]:
"""
Stage 함수 데코레이터
기능:
- 자동 시작/종료 로깅
- 실행 시간 측정
- 에러 핸들링
Args:
stage_name: Stage 이름
critical: True면 에러 시 전체 파이프라인 중단
사용 예시:
@stage_handler("Stage 1", critical=True)
def stage1_parse(filepath: str) -> list[dict]:
...
"""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
@wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> T:
logger = get_logger()
logger.stage_start(stage_name)
import time
start_time = time.time()
try:
result = func(*args, **kwargs)
duration = time.time() - start_time
# 결과가 리스트나 시퀀스이면 레코드 수 계산
records_out = 0
if hasattr(result, '__len__'):
records_out = len(result)
logger.stage_end(stage_name, records_out=records_out, duration_sec=duration)
return result
except Exception as e:
logger.stage_error(stage_name, e)
if critical:
raise
return None # type: ignore
return wrapper
return decorator
def retry(
max_attempts: int = 3,
delay: float = 1.0,
backoff: float = 2.0,
exceptions: tuple[type[Exception], ...] = (Exception,)
) -> Callable[[Callable[..., T]], Callable[..., T]]:
"""
실패 시 재시도 데코레이터
Args:
max_attempts: 최대 시도 횟수
delay: 초기 대기 시간 (초)
backoff: 대기 시간 증가 배수
exceptions: 재시도할 예외 타입
사용 예시:
@retry(max_attempts=3, delay=1.0)
def fetch_external_data():
...
"""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
@wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> T:
import time
logger = get_logger()
current_delay = delay
last_exception: Exception | None = None
for attempt in range(1, max_attempts + 1):
try:
return func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt < max_attempts:
logger.warning(
f"시도 {attempt}/{max_attempts} 실패: {e}. "
f"{current_delay:.1f}초 후 재시도..."
)
time.sleep(current_delay)
current_delay *= backoff
else:
logger.error(f"최대 재시도 횟수 초과: {e}")
if last_exception:
raise last_exception
raise RuntimeError("Unexpected retry loop exit")
return wrapper
return decorator