Loading...
Loading...
02-reusable-code-python/async/agent_lock.py
"""
Agent_Lock 프로토콜 - 파일 기반 에이전트 간 동기화 락
JSON 메타데이터를 포함한 의도 기반(Intent-based) 파일 락으로,
멀티 에이전트 환경에서 파일 충돌을 방지합니다.
@source QHDE 연구 (docs/intel/qhde-research/03-communication/03-semaphore-lock.md)
@extracted 2026-03-17
@version 1.0.0
의존성:
- 없음 (Python 3.10+ 내장 모듈만 사용)
사용법:
from async.agent_lock import AgentLock, LockIntent
lock = AgentLock(
locks_dir="/path/to/project/.locks",
agent_id="agent-auth",
session_id="kdyswarm-001"
)
# 락 획득
intent = LockIntent(action="WRITE", scope=["src/features/auth/**"])
acquired = lock.acquire("src/features/auth/login.ts", intent=intent, ttl_seconds=300)
if acquired:
# ... 파일 작업 수행 ...
lock.release("src/features/auth/login.ts")
"""
from __future__ import annotations
import hashlib
import json
import os
import time
import uuid
from dataclasses import asdict, dataclass, field
from pathlib import Path
from typing import Any
@dataclass
class LockIntent:
"""락의 목적과 범위를 명시하는 의도 메타데이터."""
action: str = "WRITE" # READ, WRITE, REFACTOR, DELETE
scope: list[str] = field(default_factory=list)
description: str = ""
priority: int = 5 # 1=최우선, 9=최저
@dataclass
class LockOwner:
"""락 소유자 정보."""
agent_id: str
process_id: int
session_id: str
host: str = ""
@dataclass
class LockMetadata:
"""Agent_Lock JSON 스키마."""
schema_version: str = "1.0"
lock_id: str = ""
owner: LockOwner | None = None
intent: LockIntent | None = None
resource: str = ""
created_timestamp: str = ""
expiry_timestamp: str = ""
def to_dict(self) -> dict[str, Any]:
return {
"schemaVersion": self.schema_version,
"lockId": self.lock_id,
"ownerAgent": asdict(self.owner) if self.owner else {},
"intent": asdict(self.intent) if self.intent else {},
"resource": self.resource,
"createdTimestamp": self.created_timestamp,
"expiryTimestamp": self.expiry_timestamp,
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> LockMetadata:
owner_data = data.get("ownerAgent", {})
intent_data = data.get("intent", {})
return cls(
schema_version=data.get("schemaVersion", "1.0"),
lock_id=data.get("lockId", ""),
owner=LockOwner(
agent_id=owner_data.get("agent_id", ""),
process_id=owner_data.get("process_id", 0),
session_id=owner_data.get("session_id", ""),
host=owner_data.get("host", ""),
),
intent=LockIntent(
action=intent_data.get("action", "WRITE"),
scope=intent_data.get("scope", []),
description=intent_data.get("description", ""),
priority=intent_data.get("priority", 5),
),
resource=data.get("resource", ""),
created_timestamp=data.get("createdTimestamp", ""),
expiry_timestamp=data.get("expiryTimestamp", ""),
)
class AgentLock:
"""파일 기반 에이전트 간 동기화 락.
락 파일 위치: {locks_dir}/{file_path_hash}.agent_lock
"""
LOCK_SUFFIX = ".agent_lock"
def __init__(self, locks_dir: str | Path, agent_id: str, session_id: str = "") -> None:
self.locks_dir = Path(locks_dir)
self.locks_dir.mkdir(parents=True, exist_ok=True)
self.owner = LockOwner(
agent_id=agent_id,
process_id=os.getpid(),
session_id=session_id or str(uuid.uuid4()),
host=os.uname().nodename if hasattr(os, "uname") else os.environ.get("COMPUTERNAME", "unknown"),
)
self._held_locks: dict[str, Path] = {}
def _lock_path(self, resource: str) -> Path:
"""리소스 경로를 해시하여 락 파일 경로 생성."""
h = hashlib.sha256(resource.encode()).hexdigest()[:16]
return self.locks_dir / f"{h}{self.LOCK_SUFFIX}"
def acquire(
self,
resource: str,
*,
intent: LockIntent | None = None,
ttl_seconds: int = 300,
max_retries: int = 3,
retry_delay: float = 1.0,
) -> bool:
"""락 획득 시도.
Args:
resource: 잠글 리소스 경로 (예: "src/features/auth/login.ts")
intent: 락 의도 메타데이터
ttl_seconds: 락 자동 만료 시간 (초)
max_retries: 최대 재시도 횟수
retry_delay: 재시도 간 대기 시간 (초)
Returns:
True면 락 획득 성공
"""
lock_path = self._lock_path(resource)
intent = intent or LockIntent()
now = time.time()
metadata = LockMetadata(
lock_id=str(uuid.uuid4()),
owner=self.owner,
intent=intent,
resource=resource,
created_timestamp=time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(now)),
expiry_timestamp=time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(now + ttl_seconds)),
)
for attempt in range(max_retries + 1):
try:
# 원자적 생성 시도 (O_CREAT | O_EXCL)
fd = os.open(str(lock_path), os.O_CREAT | os.O_EXCL | os.O_WRONLY)
try:
content = json.dumps(metadata.to_dict(), ensure_ascii=False, indent=2)
os.write(fd, content.encode("utf-8"))
os.fsync(fd)
finally:
os.close(fd)
self._held_locks[resource] = lock_path
return True
except FileExistsError:
# 기존 락 확인: 만료 또는 좀비 여부
if self._try_steal_stale_lock(lock_path, metadata):
self._held_locks[resource] = lock_path
return True
if attempt < max_retries:
time.sleep(retry_delay * (2**attempt)) # exponential backoff
return False
def release(self, resource: str) -> bool:
"""락 해제.
Args:
resource: 해제할 리소스 경로
Returns:
True면 정상 해제
"""
lock_path = self._held_locks.pop(resource, None)
if lock_path is None:
lock_path = self._lock_path(resource)
try:
# 자기 소유 확인 후 삭제
if lock_path.exists():
data = json.loads(lock_path.read_text(encoding="utf-8"))
owner = data.get("ownerAgent", {})
if owner.get("agent_id") == self.owner.agent_id:
lock_path.unlink()
return True
except (json.JSONDecodeError, OSError):
pass
return False
def release_all(self) -> int:
"""보유한 모든 락 해제."""
count = 0
for resource in list(self._held_locks.keys()):
if self.release(resource):
count += 1
return count
def is_locked(self, resource: str) -> LockMetadata | None:
"""리소스의 락 상태 확인.
Returns:
락이 걸려있으면 LockMetadata, 아니면 None
"""
lock_path = self._lock_path(resource)
if not lock_path.exists():
return None
try:
data = json.loads(lock_path.read_text(encoding="utf-8"))
metadata = LockMetadata.from_dict(data)
# 만료 확인
if metadata.expiry_timestamp:
expiry = time.mktime(time.strptime(metadata.expiry_timestamp, "%Y-%m-%dT%H:%M:%SZ"))
if time.time() > expiry:
return None # 만료됨
return metadata
except (json.JSONDecodeError, OSError, ValueError):
return None
def list_locks(self) -> list[LockMetadata]:
"""현재 활성 락 목록 조회."""
locks = []
for lock_file in self.locks_dir.glob(f"*{self.LOCK_SUFFIX}"):
try:
data = json.loads(lock_file.read_text(encoding="utf-8"))
locks.append(LockMetadata.from_dict(data))
except (json.JSONDecodeError, OSError):
continue
return locks
def cleanup_stale(self) -> int:
"""만료/좀비 락 정리.
Returns:
정리된 락 수
"""
count = 0
now = time.time()
for lock_file in self.locks_dir.glob(f"*{self.LOCK_SUFFIX}"):
try:
data = json.loads(lock_file.read_text(encoding="utf-8"))
# 만료 확인
expiry_str = data.get("expiryTimestamp", "")
if expiry_str:
expiry = time.mktime(time.strptime(expiry_str, "%Y-%m-%dT%H:%M:%SZ"))
if now > expiry:
lock_file.unlink()
count += 1
continue
# 좀비 감지 (PID 확인)
owner = data.get("ownerAgent", {})
pid = owner.get("process_id", 0)
if pid and not self._is_process_alive(pid):
lock_file.unlink()
count += 1
except (json.JSONDecodeError, OSError, ValueError):
lock_file.unlink(missing_ok=True)
count += 1
return count
def _try_steal_stale_lock(self, lock_path: Path, new_metadata: LockMetadata) -> bool:
"""만료/좀비 락을 강제 탈취."""
try:
data = json.loads(lock_path.read_text(encoding="utf-8"))
# 만료 확인
expiry_str = data.get("expiryTimestamp", "")
if expiry_str:
expiry = time.mktime(time.strptime(expiry_str, "%Y-%m-%dT%H:%M:%SZ"))
if time.time() > expiry:
lock_path.unlink()
return self._atomic_write_lock(lock_path, new_metadata)
# 좀비 감지
owner = data.get("ownerAgent", {})
pid = owner.get("process_id", 0)
if pid and not self._is_process_alive(pid):
lock_path.unlink()
return self._atomic_write_lock(lock_path, new_metadata)
except (json.JSONDecodeError, OSError, ValueError):
# 손상된 락 파일 — 제거 후 재획득
lock_path.unlink(missing_ok=True)
return self._atomic_write_lock(lock_path, new_metadata)
return False
def _atomic_write_lock(self, lock_path: Path, metadata: LockMetadata) -> bool:
"""원자적 락 파일 쓰기."""
try:
fd = os.open(str(lock_path), os.O_CREAT | os.O_EXCL | os.O_WRONLY)
try:
content = json.dumps(metadata.to_dict(), ensure_ascii=False, indent=2)
os.write(fd, content.encode("utf-8"))
os.fsync(fd)
finally:
os.close(fd)
return True
except FileExistsError:
return False
@staticmethod
def _is_process_alive(pid: int) -> bool:
"""PID로 프로세스 생존 확인."""
try:
os.kill(pid, 0)
return True
except (OSError, ProcessLookupError):
return False