Loading...
Loading...
02-reusable-code-python/httpx_utils/httpx_client.py
"""
비동기 HTTP 클라이언트 - httpx AsyncClient 수명주기 + 재시도 + 동시성 제한
@source: GitHub-커뮤니티
@extracted: 2026-02-16
@version: 1.0.0
의존성:
- httpx (필수, pip install httpx)
- tenacity (권장, pip install tenacity)
사용법:
from httpx_utils.httpx_client import get_client, fetch_json
async with get_client() as client:
data = await fetch_json(client, "https://api.example.com/users")
"""
import asyncio
import logging
from contextlib import asynccontextmanager
from typing import Any
import httpx
logger = logging.getLogger(__name__)
# ============================================
# 클라이언트 설정
# ============================================
DEFAULT_TIMEOUT = httpx.Timeout(
connect=5.0,
read=30.0,
write=10.0,
pool=5.0,
)
DEFAULT_LIMITS = httpx.Limits(
max_connections=100,
max_keepalive_connections=20,
keepalive_expiry=30,
)
@asynccontextmanager
async def get_client(
base_url: str = "",
timeout: httpx.Timeout | None = None,
limits: httpx.Limits | None = None,
headers: dict[str, str] | None = None,
):
"""비동기 HTTP 클라이언트 컨텍스트 매니저
Args:
base_url: API 기본 URL
timeout: 요청 타임아웃 설정
limits: 연결 풀 제한
headers: 기본 헤더
Yields:
httpx.AsyncClient 인스턴스
Example:
async with get_client("https://api.example.com") as client:
response = await client.get("/users")
"""
async with httpx.AsyncClient(
base_url=base_url,
timeout=timeout or DEFAULT_TIMEOUT,
limits=limits or DEFAULT_LIMITS,
headers=headers or {},
follow_redirects=True,
) as client:
yield client
# ============================================
# 재시도 유틸
# ============================================
async def fetch_with_retry(
client: httpx.AsyncClient,
method: str,
url: str,
*,
max_retries: int = 3,
backoff_factor: float = 1.0,
retry_status_codes: set[int] | None = None,
**kwargs: Any,
) -> httpx.Response:
"""재시도 로직이 포함된 HTTP 요청
Args:
client: httpx 클라이언트
method: HTTP 메서드 (GET, POST 등)
url: 요청 URL
max_retries: 최대 재시도 횟수 (기본: 3)
backoff_factor: 지수 백오프 팩터 (기본: 1.0)
retry_status_codes: 재시도할 HTTP 상태 코드 (기본: 429, 500, 502, 503, 504)
**kwargs: httpx 요청 추가 파라미터
Returns:
httpx.Response
Raises:
httpx.HTTPStatusError: 최대 재시도 후 실패
"""
if retry_status_codes is None:
retry_status_codes = {429, 500, 502, 503, 504}
last_error: Exception | None = None
for attempt in range(max_retries + 1):
try:
response = await client.request(method, url, **kwargs)
if response.status_code not in retry_status_codes:
response.raise_for_status()
return response
if attempt == max_retries:
response.raise_for_status()
return response
# 429 Too Many Requests: Retry-After 헤더 확인
retry_after = response.headers.get("retry-after")
if retry_after:
wait_time = float(retry_after)
else:
wait_time = backoff_factor * (2 ** attempt)
logger.warning(
"HTTP %d 응답, %d번째 재시도 (%.1f초 후): %s",
response.status_code,
attempt + 1,
wait_time,
url,
)
await asyncio.sleep(wait_time)
except httpx.RequestError as e:
last_error = e
if attempt == max_retries:
raise
wait_time = backoff_factor * (2 ** attempt)
logger.warning(
"요청 에러, %d번째 재시도 (%.1f초 후): %s - %s",
attempt + 1,
wait_time,
url,
e,
)
await asyncio.sleep(wait_time)
raise last_error or httpx.RequestError("최대 재시도 횟수 초과")
# ============================================
# 편의 함수
# ============================================
async def fetch_json(
client: httpx.AsyncClient,
url: str,
*,
params: dict[str, Any] | None = None,
max_retries: int = 3,
) -> Any:
"""JSON 응답 가져오기 (GET + 재시도)
Args:
client: httpx 클라이언트
url: 요청 URL
params: 쿼리 파라미터
max_retries: 최대 재시도 횟수
Returns:
JSON 응답 파싱 결과
"""
response = await fetch_with_retry(
client, "GET", url, params=params, max_retries=max_retries
)
return response.json()
async def post_json(
client: httpx.AsyncClient,
url: str,
*,
data: dict[str, Any] | None = None,
max_retries: int = 3,
) -> Any:
"""JSON 데이터 전송 (POST + 재시도)
Args:
client: httpx 클라이언트
url: 요청 URL
data: 전송할 JSON 데이터
max_retries: 최대 재시도 횟수
Returns:
JSON 응답 파싱 결과
"""
response = await fetch_with_retry(
client, "POST", url, json=data, max_retries=max_retries
)
return response.json()
# ============================================
# 동시성 제한 요청
# ============================================
async def fetch_many(
client: httpx.AsyncClient,
urls: list[str],
*,
concurrency: int = 10,
max_retries: int = 3,
) -> list[Any]:
"""여러 URL을 동시성 제한으로 병렬 요청
Args:
client: httpx 클라이언트
urls: 요청 URL 목록
concurrency: 최대 동시 요청 수 (기본: 10)
max_retries: 각 요청의 최대 재시도 횟수
Returns:
JSON 응답 목록 (순서 보장)
"""
semaphore = asyncio.Semaphore(concurrency)
async def _fetch_one(url: str) -> Any:
async with semaphore:
return await fetch_json(client, url, max_retries=max_retries)
tasks = [_fetch_one(url) for url in urls]
return await asyncio.gather(*tasks)