Loading...
Loading...
02-reusable-code-python/async/file_semaphore.py
"""
파일 기반 세마포어 - 동시 접근 수 제한
파일 시스템을 사용하여 N개 에이전트의 동시 접근을 제어합니다.
뮤텍스(N=1)부터 카운팅 세마포어(N>1)까지 지원합니다.
@source QHDE 연구 (docs/intel/qhde-research/03-communication/03-semaphore-lock.md)
@extracted 2026-03-17
@version 1.0.0
의존성:
- 없음 (Python 3.10+ 내장 모듈만 사용)
사용법:
from async.file_semaphore import FileSemaphore
# 최대 3개 에이전트 동시 접근 허용
sem = FileSemaphore("/path/to/.locks", "shared-resource", max_count=3)
if sem.acquire("agent-auth"):
try:
# ... 작업 수행 ...
pass
finally:
sem.release("agent-auth")
"""
from __future__ import annotations
import json
import os
import time
import uuid
from pathlib import Path
from typing import Any
class FileSemaphore:
"""파일 기반 카운팅 세마포어.
각 슬롯이 개별 파일로 존재하여 N개 동시 접근을 허용합니다.
구조:
.locks/{resource_name}.sem/
├── slot_0.json → 사용 중 (에이전트 정보 포함)
├── slot_1.json → 사용 중
└── slot_2.json → 비어있음 (파일 없음)
"""
def __init__(
self,
locks_dir: str | Path,
resource_name: str,
max_count: int = 1,
ttl_seconds: int = 300,
) -> None:
self.sem_dir = Path(locks_dir) / f"{resource_name}.sem"
self.sem_dir.mkdir(parents=True, exist_ok=True)
self.max_count = max_count
self.ttl_seconds = ttl_seconds
self.resource_name = resource_name
def acquire(self, agent_id: str, *, timeout: float = 10.0) -> bool:
"""세마포어 슬롯 획득.
Args:
agent_id: 에이전트 식별자
timeout: 최대 대기 시간 (초)
Returns:
True면 슬롯 획득 성공
"""
deadline = time.time() + timeout
delay = 0.1
while time.time() < deadline:
# 만료된 슬롯 정리
self._cleanup_expired()
# 빈 슬롯 찾기
for i in range(self.max_count):
slot_path = self.sem_dir / f"slot_{i}.json"
if self._try_acquire_slot(slot_path, agent_id):
return True
time.sleep(min(delay, deadline - time.time()))
delay = min(delay * 2, 2.0)
return False
def release(self, agent_id: str) -> bool:
"""세마포어 슬롯 해제.
Args:
agent_id: 해제할 에이전트 식별자
Returns:
True면 해제 성공
"""
for slot_path in self.sem_dir.glob("slot_*.json"):
try:
data = json.loads(slot_path.read_text(encoding="utf-8"))
if data.get("agent_id") == agent_id:
slot_path.unlink()
return True
except (json.JSONDecodeError, OSError):
continue
return False
def available(self) -> int:
"""사용 가능한 슬롯 수."""
self._cleanup_expired()
used = len(list(self.sem_dir.glob("slot_*.json")))
return max(0, self.max_count - used)
def holders(self) -> list[dict[str, Any]]:
"""현재 슬롯을 보유한 에이전트 목록."""
result = []
for slot_path in sorted(self.sem_dir.glob("slot_*.json")):
try:
data = json.loads(slot_path.read_text(encoding="utf-8"))
result.append(data)
except (json.JSONDecodeError, OSError):
continue
return result
def _try_acquire_slot(self, slot_path: Path, agent_id: str) -> bool:
"""원자적 슬롯 획득 시도."""
now = time.time()
slot_data = {
"slot_id": str(uuid.uuid4()),
"agent_id": agent_id,
"process_id": os.getpid(),
"acquired_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(now)),
"expires_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(now + self.ttl_seconds)),
}
try:
fd = os.open(str(slot_path), os.O_CREAT | os.O_EXCL | os.O_WRONLY)
try:
content = json.dumps(slot_data, ensure_ascii=False, indent=2)
os.write(fd, content.encode("utf-8"))
os.fsync(fd)
finally:
os.close(fd)
return True
except FileExistsError:
return False
def _cleanup_expired(self) -> None:
"""만료된 슬롯 정리."""
now = time.time()
for slot_path in self.sem_dir.glob("slot_*.json"):
try:
data = json.loads(slot_path.read_text(encoding="utf-8"))
expires_str = data.get("expires_at", "")
if expires_str:
expires = time.mktime(time.strptime(expires_str, "%Y-%m-%dT%H:%M:%SZ"))
if now > expires:
slot_path.unlink(missing_ok=True)
except (json.JSONDecodeError, OSError, ValueError):
slot_path.unlink(missing_ok=True)