Loading...
Loading...
02-reusable-code-python/api/websocket_handler.py
"""
FastAPI WebSocket 스트리밍 핸들러 - 실시간 오디오 스트리밍 패턴
@source voice-to-text-v2
@extracted 2026-02-15
@version 1.0.0
의존성:
- fastapi
- numpy
사용법:
from fastapi import FastAPI, WebSocket
from api.websocket_handler import create_websocket_handler
app = FastAPI()
# 프로세서 콜백 정의
async def process_audio(audio_buffer, language, **kwargs):
# 오디오 처리 로직 (STT, 분석 등)
return {"text": "변환 결과", "confidence": 0.95}
# 핸들러 생성
ws_handler = create_websocket_handler(process_fn=process_audio)
@app.websocket("/ws/stream")
async def websocket_endpoint(websocket: WebSocket):
await ws_handler(websocket)
프로토콜:
Client → Server:
{"action": "start", "language": "ko", ...} # 스트리밍 시작
[binary audio chunks] # 오디오 데이터
{"action": "stop"} # 스트리밍 중지
{"action": "ping"} # 연결 유지
Server → Client:
{"type": "status", "message": "ready|processing|stopped"}
{"type": "level", "value": 0.25} # 오디오 레벨
{"type": "result", ...} # 처리 결과
{"type": "error", "message": "..."} # 에러
{"type": "ping"} / {"type": "pong"} # 하트비트
"""
import asyncio
import json
import logging
from typing import Any, Callable
import numpy as np
from fastapi import WebSocket, WebSocketDisconnect
logger = logging.getLogger(__name__)
# 기본 설정
DEFAULT_TIMEOUT = 60.0 # WebSocket 수신 타임아웃 (초)
DEFAULT_MIN_BUFFER_SECONDS = 0.5 # 최소 처리 버퍼 길이 (초)
DEFAULT_SAMPLE_RATE = 16000
class StreamingSession:
"""
WebSocket 스트리밍 세션 관리.
오디오 청크를 수신하여 버퍼에 누적하고,
오디오 레벨을 클라이언트에 전송합니다.
"""
def __init__(
self,
websocket: WebSocket,
params: dict,
sample_rate: int = DEFAULT_SAMPLE_RATE,
):
"""
Args:
websocket: FastAPI WebSocket 연결
params: 클라이언트가 start 시 전달한 파라미터
sample_rate: 오디오 샘플레이트
"""
self.ws = websocket
self.params = params
self.sample_rate = sample_rate
self.audio_buffer = np.array([], dtype=np.float32)
self.is_active = False
async def send_message(self, msg_type: str, **kwargs):
"""타입이 지정된 JSON 메시지 전송"""
message = {"type": msg_type, **kwargs}
try:
await self.ws.send_json(message)
except Exception as e:
logger.error(f"WS 메시지 전송 실패: {e}")
async def process_audio_chunk(
self,
audio_bytes: bytes,
converter_fn: Callable | None = None,
):
"""
오디오 청크 처리 및 버퍼 누적.
Args:
audio_bytes: 원본 오디오 바이트
converter_fn: 바이트 → float32 변환 함수 (None이면 기본 PCM float32 해석)
"""
try:
if converter_fn:
audio_chunk = converter_fn(audio_bytes)
else:
# 기본: raw PCM float32 해석
audio_chunk = np.frombuffer(audio_bytes, dtype=np.float32)
self.audio_buffer = np.concatenate([self.audio_buffer, audio_chunk])
# 오디오 레벨 전송 (시각화용)
level = float(np.abs(audio_chunk).mean())
await self.send_message("level", value=round(level, 4))
except Exception as e:
logger.error(f"오디오 청크 처리 에러: {e}")
await self.send_message("error", message=f"오디오 처리 에러: {e}")
def get_buffer(self) -> np.ndarray:
"""현재 오디오 버퍼 반환"""
return self.audio_buffer
def reset(self):
"""오디오 버퍼 초기화"""
self.audio_buffer = np.array([], dtype=np.float32)
ProcessFunction = Callable[[np.ndarray, dict], Any]
def create_websocket_handler(
process_fn: ProcessFunction,
converter_fn: Callable[[bytes], np.ndarray] | None = None,
timeout: float = DEFAULT_TIMEOUT,
min_buffer_seconds: float = DEFAULT_MIN_BUFFER_SECONDS,
sample_rate: int = DEFAULT_SAMPLE_RATE,
on_start: Callable | None = None,
) -> Callable:
"""
WebSocket 스트리밍 핸들러 팩토리.
Args:
process_fn: 오디오 처리 함수.
시그니처: async (audio_buffer: np.ndarray, params: dict) -> dict
params에는 클라이언트가 start 시 전달한 모든 파라미터가 포함됨.
converter_fn: 바이트 → float32 변환 함수 (None이면 raw PCM float32)
timeout: WebSocket 수신 타임아웃 (초)
min_buffer_seconds: 처리를 위한 최소 버퍼 길이 (초)
sample_rate: 오디오 샘플레이트
on_start: 스트리밍 시작 시 호출되는 콜백 (params를 인자로 받음)
Returns:
WebSocket 핸들러 함수
"""
async def handler(websocket: WebSocket):
await websocket.accept()
logger.info("WebSocket 연결 열림")
session: StreamingSession | None = None
try:
while True:
# 메시지 수신 (타임아웃 적용)
try:
data = await asyncio.wait_for(
websocket.receive(),
timeout=timeout,
)
except asyncio.TimeoutError:
await websocket.send_json({"type": "ping"})
continue
if "text" in data:
# JSON 컨트롤 메시지
try:
msg = json.loads(data["text"])
except json.JSONDecodeError:
continue
action = msg.get("action", "")
if action == "start":
params = {k: v for k, v in msg.items() if k != "action"}
session = StreamingSession(websocket, params, sample_rate)
session.is_active = True
if on_start:
await on_start(params) if asyncio.iscoroutinefunction(on_start) else on_start(params)
await session.send_message("status", message="ready")
logger.info(f"스트리밍 시작: {params}")
elif action == "stop":
if session and session.is_active:
session.is_active = False
# 최소 버퍼 길이 확인 후 처리
buffer = session.get_buffer()
if len(buffer) >= sample_rate * min_buffer_seconds:
await session.send_message("status", message="processing")
try:
result = await process_fn(buffer, session.params) if asyncio.iscoroutinefunction(process_fn) else process_fn(buffer, session.params)
if result:
await session.send_message("result", **result if isinstance(result, dict) else {"data": result})
except Exception as e:
logger.error(f"처리 에러: {e}")
await session.send_message("error", message=str(e))
session.reset()
await session.send_message("status", message="stopped")
logger.info("스트리밍 중지")
elif action == "ping":
await websocket.send_json({"type": "pong"})
elif "bytes" in data:
# 바이너리 오디오 데이터
if session and session.is_active:
await session.process_audio_chunk(data["bytes"], converter_fn)
except WebSocketDisconnect:
logger.info("WebSocket 연결 해제")
except Exception as e:
logger.error(f"WebSocket 에러: {e}")
finally:
if session:
session.reset()
logger.info("WebSocket 연결 종료")
return handler