Loading...
Loading...
02-reusable-code-python/news/collector.py
"""
멀티소스 비동기 뉴스 수집 오케스트레이터.
다중 뉴스 소스에서 비동기 병렬 수집을 수행하고,
중복 제거 및 소스별 집계를 포함한 CollectionResult를 반환한다.
@source: 00-general-pro
@extracted: 2026-03-08
@version: 1.0.0
의존성:
- pydantic >= 2.0
- httpx >= 0.24 (소스별 수집기)
사용법:
```python
import asyncio
from news.collector import NewsCollector
collector = NewsCollector(
sources=["hackernews"],
keywords=["AI", "LLM"],
days=7,
limit_per_source=50,
)
result = asyncio.run(collector.collect())
print(f"수집 완료: {len(result.items)}건")
```
"""
import asyncio
import logging
import time
from collections import Counter
from .models import CollectionResult, NewsItem
from .sources import AVAILABLE_SOURCES
logger = logging.getLogger(__name__)
# ============================================
# 도메인별 기본 키워드
# ============================================
_DOMAIN_KEYWORDS: dict[str, list[str]] = {
"ai": [
"AI",
"LLM",
"GPT",
"machine learning",
"artificial intelligence",
],
"saas": [
"SaaS",
"micro SaaS",
"B2B software",
"productivity tool",
],
"mobile": [
"mobile app",
"Android",
"iOS",
"React Native",
"Flutter",
],
"fintech": [
"fintech",
"payment",
"crypto",
"trading",
],
}
_DEFAULT_KEYWORDS = [
"software",
"app",
"startup",
"side project",
]
# ============================================
# 뉴스 수집 오케스트레이터
# ============================================
class NewsCollector:
"""멀티소스 뉴스 수집기.
AVAILABLE_SOURCES에 등록된 소스들을 asyncio.gather로 병렬 수집하고,
URL 기준 중복 제거 후 CollectionResult를 반환한다.
Attributes:
sources: 사용할 소스 이름 목록.
keywords: 검색 키워드 목록.
days: 수집 기간 (일).
limit_per_source: 소스당 최대 수집 건수.
"""
def __init__(
self,
sources: list[str] | None = None,
keywords: list[str] | None = None,
days: int = 7,
limit_per_source: int = 50,
domains: list[str] | None = None,
) -> None:
"""초기화.
Args:
sources: 사용할 소스 목록 (기본: 전체 등록 소스).
keywords: 검색 키워드 (기본: 도메인별 자동 생성).
days: 수집 기간 (일). 기본 7.
limit_per_source: 소스당 최대 수집 건수. 기본 50.
domains: 키워드 자동 생성에 사용할 도메인 목록.
"""
self.sources = sources
self.days = days
self.limit_per_source = limit_per_source
self.domains = domains or ["ai"]
self.keywords = keywords or self._get_default_keywords(
self.domains
)
def _get_default_keywords(
self, domains: list[str]
) -> list[str]:
"""도메인별 기본 키워드를 반환한다.
Args:
domains: 도메인 목록 (ai, saas, mobile, fintech 등).
Returns:
도메인에 매핑된 키워드 목록. 매핑 없으면 기본 키워드.
"""
keywords: list[str] = []
for domain in domains:
domain_lower = domain.lower()
if domain_lower in _DOMAIN_KEYWORDS:
keywords.extend(_DOMAIN_KEYWORDS[domain_lower])
else:
keywords.extend(_DEFAULT_KEYWORDS)
# 중복 제거 (순서 유지)
seen: set[str] = set()
unique: list[str] = []
for kw in keywords:
if kw not in seen:
seen.add(kw)
unique.append(kw)
return unique
async def collect(self) -> CollectionResult:
"""모든 소스에서 병렬 수집을 수행한다.
부분 실패를 허용하며, 실패한 소스는 errors에 기록한다.
URL 기준으로 중복을 제거하고 소스별 수집 건수를 집계한다.
Returns:
수집 결과 (항목, 에러, 소요 시간, 소스별 건수).
"""
start = time.monotonic()
# 소스 인스턴스 생성
source_names = self.sources or list(
AVAILABLE_SOURCES.keys()
)
source_instances = []
errors: list[str] = []
for name in source_names:
if name not in AVAILABLE_SOURCES:
msg = f"알 수 없는 소스: {name}"
logger.warning(msg)
errors.append(msg)
continue
source_instances.append(AVAILABLE_SOURCES[name]())
if not source_instances:
logger.warning("사용 가능한 소스가 없음")
return CollectionResult(
items=[],
errors=errors,
duration_seconds=time.monotonic() - start,
)
# 병렬 수집 (부분 실패 허용)
tasks = [
source.fetch(
self.keywords,
days=self.days,
limit=self.limit_per_source,
)
for source in source_instances
]
results = await asyncio.gather(
*tasks, return_exceptions=True
)
# 결과 통합
all_items: list[NewsItem] = []
for source, result in zip(source_instances, results):
if isinstance(result, Exception):
msg = f"{source.name} 수집 실패: {result}"
logger.error(msg)
errors.append(msg)
elif isinstance(result, list):
all_items.extend(result)
logger.info(
"%s에서 %d건 수집", source.name, len(result)
)
else:
msg = f"{source.name} 예상치 못한 결과 타입: {type(result)}"
logger.warning(msg)
errors.append(msg)
# URL 기준 중복 제거 (먼저 수집된 항목 유지)
seen_urls: set[str] = set()
unique_items: list[NewsItem] = []
for item in all_items:
if item.url not in seen_urls:
seen_urls.add(item.url)
unique_items.append(item)
dedup_count = len(all_items) - len(unique_items)
if dedup_count > 0:
logger.info("중복 제거: %d건", dedup_count)
# 소스별 수집 건수 집계
items_by_source = dict(
Counter(item.source for item in unique_items)
)
duration = time.monotonic() - start
logger.info(
"전체 수집 완료: %d건 (%.1f초)",
len(unique_items),
duration,
)
return CollectionResult(
items=unique_items,
errors=errors,
duration_seconds=round(duration, 2),
items_by_source=items_by_source,
)
# ============================================
# CLI 엔트리포인트
# ============================================
def _parse_args() -> "argparse.Namespace":
"""CLI 인수를 파싱한다."""
import argparse
parser = argparse.ArgumentParser(
description="멀티소스 뉴스 수집기",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=(
"예시:\n"
" uv run -m news.collector --domains ai,saas"
" --days 7 --output raw.json\n"
" uv run -m news.collector --sources hackernews"
" --keywords AI,LLM --limit 30"
),
)
parser.add_argument(
"--domains",
type=str,
default="ai",
help="쉼표 구분 도메인 (기본: ai)",
)
parser.add_argument(
"--days",
type=int,
default=7,
help="수집 기간 - 일 (기본: 7)",
)
parser.add_argument(
"--sources",
type=str,
default=None,
help="사용할 소스 (쉼표 구분, 기본: 전체)",
)
parser.add_argument(
"--limit",
type=int,
default=50,
help="소스당 최대 건수 (기본: 50)",
)
parser.add_argument(
"--output",
type=str,
default=None,
help="JSON 출력 파일 경로 (기본: stdout)",
)
parser.add_argument(
"--keywords",
type=str,
default=None,
help="추가 키워드 (쉼표 구분)",
)
return parser.parse_args()
async def _main() -> None:
"""비동기 메인 함수."""
import json
import sys
from pathlib import Path
args = _parse_args()
domains = [d.strip() for d in args.domains.split(",")]
sources = (
[s.strip() for s in args.sources.split(",")]
if args.sources
else None
)
keywords = (
[k.strip() for k in args.keywords.split(",")]
if args.keywords
else None
)
collector = NewsCollector(
sources=sources,
keywords=keywords,
days=args.days,
limit_per_source=args.limit,
domains=domains,
)
result = await collector.collect()
# JSON 직렬화
output = result.model_dump(mode="json")
json_str = json.dumps(output, ensure_ascii=False, indent=2)
if args.output:
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json_str, encoding="utf-8")
print(f"결과 저장: {output_path} ({len(result.items)}건)")
else:
sys.stdout.buffer.write((json_str + "\n").encode("utf-8"))
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
asyncio.run(_main())