Loading...
Loading...
02-reusable-code-python/async/file_message_queue.py
"""
F-MES (File-based Message Exchange System) - 파일 기반 메시지 큐
폴더를 메일박스로 사용하는 경량 에이전트 간 메시지 교환 시스템.
API 서버 없이 파일 시스템만으로 에이전트 협업을 구현합니다.
@source QHDE 연구 (docs/intel/qhde-research/03-communication/)
@extracted 2026-03-17
@version 1.0.0
의존성:
- 없음 (Python 3.10+ 내장 모듈만 사용)
사용법:
from async.file_message_queue import FileMessageQueue
# 메시지 큐 초기화
queue = FileMessageQueue("/path/to/project/_COMM")
# 메시지 전송
queue.send("claude", "antigravity", {
"type": "MISSION_PLAN",
"payload": {"task": "implement auth module"}
})
# 메시지 수신 (폴링)
messages = queue.receive("claude")
for msg in messages:
print(msg)
queue.archive(msg["path"])
"""
from __future__ import annotations
import json
import os
import shutil
import time
import uuid
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
@dataclass
class FMESConfig:
"""F-MES 설정."""
comm_root: Path
agent_names: list[str] = field(default_factory=lambda: ["antigravity", "claude", "cursor", "vscode"])
archive_dir: str = "ARCHIVE"
poll_interval: float = 2.0 # seconds
class FileMessageQueue:
"""파일 기반 메시지 교환 큐.
디렉토리 구조:
_COMM/
├── To_Antigravity/ → Antigravity 에이전트 메일박스
├── To_Claude/ → Claude Code 에이전트 메일박스
├── To_Cursor/ → Cursor 에이전트 메일박스
├── To_VSCode/ → VS Code 에이전트 메일박스
└── ARCHIVE/ → 처리 완료 메시지 보관
"""
def __init__(self, comm_root: str | Path, agent_names: list[str] | None = None) -> None:
self.config = FMESConfig(
comm_root=Path(comm_root),
agent_names=agent_names or FMESConfig.agent_names,
)
self._ensure_directories()
def _ensure_directories(self) -> None:
"""메일박스 디렉토리 구조 생성."""
for agent in self.config.agent_names:
mailbox = self.config.comm_root / f"To_{agent.capitalize()}"
mailbox.mkdir(parents=True, exist_ok=True)
archive = self.config.comm_root / self.config.archive_dir
archive.mkdir(parents=True, exist_ok=True)
def _mailbox_path(self, agent: str) -> Path:
return self.config.comm_root / f"To_{agent.capitalize()}"
def send(
self,
from_agent: str,
to_agent: str,
payload: dict[str, Any],
*,
message_type: str = "TASK",
priority: int = 5,
) -> Path:
"""메시지를 대상 에이전트의 메일박스에 전송.
Args:
from_agent: 발신 에이전트 이름
to_agent: 수신 에이전트 이름
payload: 메시지 본문 (JSON 직렬화 가능)
message_type: 메시지 유형 (TASK, REPORT, REQUEST, ALERT)
priority: 우선순위 (1=최우선, 9=최저)
Returns:
생성된 메시지 파일 경로
"""
message = {
"id": str(uuid.uuid4()),
"from": from_agent,
"to": to_agent,
"type": message_type,
"priority": priority,
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"payload": payload,
}
timestamp = time.strftime("%Y%m%d_%H%M%S")
filename = f"{priority}_{timestamp}_{from_agent}_{message_type}.json"
filepath = self._mailbox_path(to_agent) / filename
# 원자적 쓰기: 임시 파일 → rename
tmp_path = filepath.with_suffix(".tmp")
tmp_path.write_text(json.dumps(message, ensure_ascii=False, indent=2), encoding="utf-8")
tmp_path.rename(filepath)
return filepath
def receive(self, agent: str, *, message_type: str | None = None) -> list[dict[str, Any]]:
"""에이전트 메일박스에서 메시지 수신.
Args:
agent: 수신 에이전트 이름
message_type: 필터링할 메시지 유형 (None이면 전체)
Returns:
우선순위 순으로 정렬된 메시지 목록 (파일 경로 포함)
"""
mailbox = self._mailbox_path(agent)
messages = []
for filepath in sorted(mailbox.glob("*.json")):
try:
data = json.loads(filepath.read_text(encoding="utf-8"))
data["_path"] = str(filepath)
if message_type is None or data.get("type") == message_type:
messages.append(data)
except (json.JSONDecodeError, OSError):
continue
return messages
def archive(self, message_path: str | Path) -> Path | None:
"""처리 완료 메시지를 아카이브로 이동.
Args:
message_path: 메시지 파일 경로
Returns:
아카이브된 파일 경로 (실패 시 None)
"""
src = Path(message_path)
if not src.exists():
return None
archive_dir = self.config.comm_root / self.config.archive_dir
dst = archive_dir / src.name
# 이름 충돌 방지
if dst.exists():
dst = archive_dir / f"{src.stem}_{uuid.uuid4().hex[:6]}{src.suffix}"
shutil.move(str(src), str(dst))
return dst
def peek(self, agent: str) -> int:
"""메일박스에 대기 중인 메시지 수 확인."""
mailbox = self._mailbox_path(agent)
return len(list(mailbox.glob("*.json")))
def broadcast(self, from_agent: str, payload: dict[str, Any], *, message_type: str = "ALERT") -> list[Path]:
"""모든 에이전트에게 메시지 브로드캐스트."""
paths = []
for agent in self.config.agent_names:
if agent != from_agent:
path = self.send(from_agent, agent, payload, message_type=message_type, priority=1)
paths.append(path)
return paths
def cleanup_stale(self, max_age_hours: float = 24.0) -> int:
"""오래된 메시지 자동 아카이브.
Args:
max_age_hours: 이 시간 이상 된 메시지를 아카이브
Returns:
정리된 메시지 수
"""
count = 0
cutoff = time.time() - (max_age_hours * 3600)
for agent in self.config.agent_names:
mailbox = self._mailbox_path(agent)
for filepath in mailbox.glob("*.json"):
if filepath.stat().st_mtime < cutoff:
self.archive(filepath)
count += 1
return count