Loading...
Loading...
02-reusable-code-python/async/asyncio_patterns.py
"""
asyncio 패턴 - TaskGroup, Semaphore, Queue, gather+에러처리
@source GitHub-커뮤니티
@extracted 2026-02-16
@version 1.0.0
의존성:
- 없음 (Python 3.11+ asyncio 내장)
사용법:
from async.asyncio_patterns import run_with_limit, process_queue
"""
import asyncio
import logging
from typing import Any, Awaitable, Callable, TypeVar
logger = logging.getLogger(__name__)
T = TypeVar("T")
R = TypeVar("R")
# ============================================
# 동시성 제한 실행
# ============================================
async def run_with_limit(
tasks: list[Callable[[], Awaitable[T]]],
limit: int = 10,
) -> list[T]:
"""동시성 제한으로 비동기 작업 병렬 실행
Args:
tasks: 비동기 함수 목록 (인자 없는 코루틴 팩토리)
limit: 최대 동시 실행 수 (기본: 10)
Returns:
결과 목록 (순서 보장)
Example:
urls = ["https://api.example.com/1", "https://api.example.com/2"]
results = await run_with_limit(
[lambda u=url: fetch(u) for url in urls],
limit=5,
)
"""
semaphore = asyncio.Semaphore(limit)
async def _limited(task: Callable[[], Awaitable[T]]) -> T:
async with semaphore:
return await task()
return await asyncio.gather(*[_limited(task) for task in tasks])
# ============================================
# TaskGroup 패턴 (Python 3.11+)
# ============================================
async def run_task_group(
tasks: list[Callable[[], Awaitable[T]]],
) -> list[T]:
"""TaskGroup으로 구조화된 동시성 실행
하나의 태스크가 실패하면 나머지도 취소됨 (구조화된 동시성)
Args:
tasks: 비동기 함수 목록
Returns:
결과 목록
Raises:
ExceptionGroup: 여러 태스크가 실패한 경우
Example:
results = await run_task_group([
lambda: fetch_users(),
lambda: fetch_posts(),
lambda: fetch_comments(),
])
"""
results: list[T] = [None] * len(tasks) # type: ignore
async with asyncio.TaskGroup() as tg:
for i, task in enumerate(tasks):
async def _run(idx: int = i, fn: Callable = task):
results[idx] = await fn()
tg.create_task(_run())
return results
# ============================================
# gather + 에러 처리
# ============================================
async def gather_with_errors(
*coros: Awaitable[T],
return_exceptions: bool = False,
) -> list[T | Exception]:
"""asyncio.gather + 에러 로깅
Args:
*coros: 코루틴 목록
return_exceptions: True면 에러도 결과에 포함
Returns:
결과/에러 목록
Example:
results = await gather_with_errors(
fetch("url1"),
fetch("url2"),
fetch("url3"),
return_exceptions=True,
)
successes = [r for r in results if not isinstance(r, Exception)]
failures = [r for r in results if isinstance(r, Exception)]
"""
results = await asyncio.gather(*coros, return_exceptions=return_exceptions)
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.error("태스크 %d 실패: %s", i, result)
return results
# ============================================
# 비동기 큐 처리 (생산자-소비자)
# ============================================
async def process_queue(
items: list[T],
processor: Callable[[T], Awaitable[R]],
*,
num_workers: int = 5,
on_error: Callable[[T, Exception], None] | None = None,
) -> list[R]:
"""생산자-소비자 패턴으로 큐 처리
Args:
items: 처리할 항목 목록
processor: 각 항목 처리 함수
num_workers: 워커 수 (기본: 5)
on_error: 에러 콜백 (기본: 로깅)
Returns:
성공한 결과 목록 (순서 보장 안됨)
Example:
async def download(url: str) -> bytes:
async with httpx.AsyncClient() as client:
response = await client.get(url)
return response.content
results = await process_queue(
urls,
download,
num_workers=10,
)
"""
queue: asyncio.Queue[tuple[int, T]] = asyncio.Queue()
results: dict[int, R] = {}
# 큐에 항목 추가
for i, item in enumerate(items):
await queue.put((i, item))
async def worker():
while True:
try:
idx, item = queue.get_nowait()
except asyncio.QueueEmpty:
break
try:
result = await processor(item)
results[idx] = result
except Exception as e:
if on_error:
on_error(item, e)
else:
logger.error("항목 처리 실패: %s - %s", item, e)
finally:
queue.task_done()
# 워커 실행
workers = [asyncio.create_task(worker()) for _ in range(num_workers)]
await asyncio.gather(*workers)
# 인덱스 순서로 정렬하여 반환
return [results[i] for i in sorted(results.keys())]
# ============================================
# 타임아웃 래퍼
# ============================================
async def with_timeout(
coro: Awaitable[T],
timeout: float,
default: T | None = None,
) -> T | None:
"""타임아웃이 있는 비동기 실행
Args:
coro: 코루틴
timeout: 타임아웃 (초)
default: 타임아웃 시 반환 값 (기본: None)
Returns:
코루틴 결과 또는 기본값
Example:
result = await with_timeout(
slow_api_call(),
timeout=5.0,
default={"error": "timeout"},
)
"""
try:
return await asyncio.wait_for(coro, timeout=timeout)
except asyncio.TimeoutError:
logger.warning("타임아웃 (%.1f초 초과)", timeout)
return default
# ============================================
# 주기적 태스크
# ============================================
async def periodic_task(
fn: Callable[[], Awaitable[None]],
interval: float,
*,
stop_event: asyncio.Event | None = None,
) -> None:
"""주기적으로 실행되는 백그라운드 태스크
Args:
fn: 실행할 비동기 함수
interval: 실행 간격 (초)
stop_event: 중지 이벤트 (설정 시 종료)
Example:
stop = asyncio.Event()
async def cleanup():
logger.info("캐시 정리 중...")
task = asyncio.create_task(periodic_task(cleanup, 300, stop_event=stop))
# 종료 시
stop.set()
await task
"""
while True:
if stop_event and stop_event.is_set():
break
try:
await fn()
except Exception as e:
logger.error("주기적 태스크 실패: %s", e)
if stop_event:
try:
await asyncio.wait_for(stop_event.wait(), timeout=interval)
break
except asyncio.TimeoutError:
pass
else:
await asyncio.sleep(interval)