Loading...
Loading...
02-reusable-code-python/utils/audio_processor.py
"""
오디오 처리 유틸리티 - 포맷 변환, 리샘플링, 무음 감지, 파형 계산, 노이즈 제거, 품질 분석
@source voice-to-text-v2
@extracted 2026-02-15
@updated 2026-03-14 — denoise_audio, normalize_volume, compute_quality_score 추가
@version 1.1.0
의존성:
- numpy (필수)
- soundfile (권장, 오디오 읽기)
- librosa (선택, 리샘플링 폴백)
- noisereduce (선택, 노이즈 제거)
사용법:
from utils.audio_processor import bytes_to_float32, detect_silence, compute_waveform
# 오디오 바이트 → float32 배열 변환
audio = bytes_to_float32(audio_bytes, source_format="wav", target_sr=16000)
# 무음 구간 감지
silences = detect_silence(audio, threshold=0.01, min_silence_ms=500)
# 시각화용 파형 데이터 계산
waveform = compute_waveform(audio, num_points=200)
# 오디오 정보 조회
info = get_audio_info(audio, sample_rate=16000)
# 노이즈 제거 + 볼륨 정규화
from utils.audio_processor import denoise_audio, normalize_volume, compute_quality_score
clean = denoise_audio(audio)
normalized = normalize_volume(clean, target_rms=0.1)
quality = compute_quality_score(normalized)
print(f"품질 점수: {quality['score']}/100, SNR: {quality['snr_db']}dB")
"""
import io
import logging
import numpy as np
logger = logging.getLogger(__name__)
# 기본 샘플레이트 (Whisper, 대부분의 STT 엔진 기준)
DEFAULT_SAMPLE_RATE = 16000
def bytes_to_float32(
audio_bytes: bytes,
source_format: str = "wav",
target_sr: int = DEFAULT_SAMPLE_RATE,
) -> np.ndarray:
"""
오디오 바이트를 float32 numpy 배열로 변환 (지정 샘플레이트로 리샘플링).
지원 포맷: WAV, WebM, MP3, OGG, FLAC
soundfile로 먼저 시도하고, 실패 시 librosa로 폴백.
Args:
audio_bytes: 원본 오디오 바이트
source_format: 오디오 포맷 ("wav", "webm", "mp3", "ogg", "flac")
target_sr: 목표 샘플레이트 (기본: 16000)
Returns:
float32 numpy 배열 (모노, 정규화됨, -1.0 ~ 1.0)
Raises:
ValueError: 오디오 변환 실패 시
"""
try:
import soundfile as sf
audio_io = io.BytesIO(audio_bytes)
try:
data, sr = sf.read(audio_io, dtype="float32")
except Exception:
# soundfile 실패 시 librosa 폴백
audio_io.seek(0)
import librosa
data, sr = librosa.load(audio_io, sr=None, mono=True)
# 스테레오 → 모노 변환
if len(data.shape) > 1:
data = np.mean(data, axis=1)
# 리샘플링
if sr != target_sr:
try:
import librosa
data = librosa.resample(data, orig_sr=sr, target_sr=target_sr)
except ImportError:
# 간단한 리샘플링 폴백 (librosa 미설치 시)
ratio = target_sr / sr
new_length = int(len(data) * ratio)
indices = np.linspace(0, len(data) - 1, new_length)
data = np.interp(indices, np.arange(len(data)), data)
data = data.astype(np.float32)
# 정규화 (-1.0 ~ 1.0)
max_val = np.abs(data).max()
if max_val > 0:
data = data / max(max_val, 1.0)
return data
except Exception as e:
logger.error(f"오디오 변환 실패: {e}")
raise ValueError(f"오디오 처리 실패: {e}")
def detect_silence(
audio: np.ndarray,
threshold: float = 0.01,
min_silence_ms: int = 500,
sample_rate: int = DEFAULT_SAMPLE_RATE,
) -> list[dict]:
"""
오디오에서 무음 구간을 감지.
Args:
audio: float32 오디오 배열
threshold: 무음 판정 진폭 임계값 (0.0 ~ 1.0)
min_silence_ms: 최소 무음 구간 길이 (밀리초)
sample_rate: 오디오 샘플레이트
Returns:
무음 구간 리스트: [{"start_ms": int, "end_ms": int, "duration_ms": int}, ...]
"""
min_silence_samples = int(sample_rate * min_silence_ms / 1000)
is_silent = np.abs(audio) < threshold
silences = []
silent_start = None
for i, s in enumerate(is_silent):
if s and silent_start is None:
silent_start = i
elif not s and silent_start is not None:
duration = i - silent_start
if duration >= min_silence_samples:
silences.append({
"start_ms": int(silent_start / sample_rate * 1000),
"end_ms": int(i / sample_rate * 1000),
"duration_ms": int(duration / sample_rate * 1000),
})
silent_start = None
return silences
def split_on_silence(
audio: np.ndarray,
min_silence_ms: int = 700,
min_chunk_ms: int = 1000,
threshold: float = 0.01,
sample_rate: int = DEFAULT_SAMPLE_RATE,
) -> list[np.ndarray]:
"""
무음 구간 기준으로 오디오를 청크로 분할.
Args:
audio: float32 오디오 배열
min_silence_ms: 분할 기준 최소 무음 구간 (밀리초)
min_chunk_ms: 최소 청크 크기 (밀리초)
threshold: 무음 임계값
sample_rate: 오디오 샘플레이트
Returns:
오디오 청크 리스트
"""
silences = detect_silence(audio, threshold, min_silence_ms, sample_rate)
if not silences:
return [audio]
chunks = []
prev_end = 0
for silence in silences:
split_point = int(
(silence["start_ms"] + silence["end_ms"]) / 2 * sample_rate / 1000
)
chunk = audio[prev_end:split_point]
if len(chunk) >= int(min_chunk_ms * sample_rate / 1000):
chunks.append(chunk)
prev_end = split_point
# 마지막 청크
if prev_end < len(audio):
last_chunk = audio[prev_end:]
if len(last_chunk) >= int(min_chunk_ms * sample_rate / 1000):
chunks.append(last_chunk)
return chunks if chunks else [audio]
def get_audio_info(audio: np.ndarray, sample_rate: int = DEFAULT_SAMPLE_RATE) -> dict:
"""
오디오 기본 정보 조회.
Args:
audio: float32 오디오 배열
sample_rate: 오디오 샘플레이트
Returns:
dict: duration_seconds, sample_rate, samples, rms_level, peak_level, is_silent
"""
duration = len(audio) / sample_rate
rms = np.sqrt(np.mean(audio**2))
peak = np.abs(audio).max()
return {
"duration_seconds": round(duration, 2),
"sample_rate": sample_rate,
"samples": len(audio),
"rms_level": round(float(rms), 4),
"peak_level": round(float(peak), 4),
"is_silent": rms < 0.005,
}
def denoise_audio(
audio: np.ndarray,
sample_rate: int = DEFAULT_SAMPLE_RATE,
prop_decrease: float = 0.8,
) -> np.ndarray:
"""
Spectral gating 기반 노이즈 제거.
noisereduce 라이브러리를 사용하여 배경 노이즈를 제거.
미설치 시 원본 오디오를 그대로 반환.
Args:
audio: float32 오디오 배열
sample_rate: 오디오 샘플레이트
prop_decrease: 노이즈 감쇄 비율 (0.0~1.0, 기본: 0.8)
Returns:
노이즈 제거된 float32 오디오 배열
"""
try:
import noisereduce as nr
reduced = nr.reduce_noise(y=audio, sr=sample_rate, prop_decrease=prop_decrease)
return reduced.astype(np.float32)
except ImportError:
logger.warning("noisereduce 미설치 - 노이즈 제거 건너뜀. pip install noisereduce")
return audio
except Exception as e:
logger.error(f"노이즈 제거 실패: {e}")
return audio
def normalize_volume(
audio: np.ndarray,
target_rms: float = 0.1,
max_gain: float = 10.0,
) -> np.ndarray:
"""
RMS 기반 볼륨 정규화.
현재 RMS 레벨을 target_rms에 맞추되, 최대 증폭 배율과
클리핑을 방지.
Args:
audio: float32 오디오 배열
target_rms: 목표 RMS 레벨 (0.0~1.0, 기본: 0.1)
max_gain: 최대 증폭 배율 (기본: 10.0)
Returns:
정규화된 float32 오디오 배열
"""
current_rms = np.sqrt(np.mean(audio ** 2))
if current_rms < 1e-6:
return audio
gain = target_rms / current_rms
gain = min(gain, max_gain)
normalized = audio * gain
# 클리핑 방지
max_val = np.abs(normalized).max()
if max_val > 1.0:
normalized = normalized / max_val
return normalized.astype(np.float32)
def compute_quality_score(
audio: np.ndarray,
sample_rate: int = DEFAULT_SAMPLE_RATE,
) -> dict:
"""
오디오 품질 점수 계산 (0~100).
음량(RMS), 클리핑 비율, 신호 대 잡음비(SNR)를 종합 평가.
Args:
audio: float32 오디오 배열
sample_rate: 오디오 샘플레이트
Returns:
dict: {
"score": int (0~100),
"snr_db": float,
"clipping_ratio": float,
"rms": float,
"issues": list[str]
}
"""
issues: list[str] = []
scores: list[int] = []
# RMS (음량)
rms = float(np.sqrt(np.mean(audio ** 2)))
if rms < 0.005:
issues.append("음량이 너무 낮음")
scores.append(10)
elif rms < 0.02:
issues.append("음량이 낮음")
scores.append(50)
elif rms > 0.5:
issues.append("음량이 너무 높음")
scores.append(60)
else:
scores.append(90)
# 클리핑 비율
clipping_threshold = 0.99
clipping_ratio = float(np.mean(np.abs(audio) > clipping_threshold))
if clipping_ratio > 0.01:
issues.append("클리핑 감지")
scores.append(30)
elif clipping_ratio > 0.001:
issues.append("약간의 클리핑")
scores.append(70)
else:
scores.append(95)
# SNR 추정 (상위 10% 에너지 vs 하위 10%)
sorted_energy = np.sort(np.abs(audio))
noise_floor = float(np.mean(sorted_energy[:len(sorted_energy) // 10]) + 1e-10)
signal_level = float(np.mean(sorted_energy[-len(sorted_energy) // 10:]))
snr_db = float(20 * np.log10(signal_level / noise_floor)) if noise_floor > 0 else 0.0
if snr_db < 10:
issues.append("높은 노이즈")
scores.append(30)
elif snr_db < 20:
issues.append("약간의 노이즈")
scores.append(65)
else:
scores.append(90)
final_score = int(np.mean(scores))
return {
"score": max(0, min(100, final_score)),
"snr_db": round(snr_db, 1),
"clipping_ratio": round(clipping_ratio, 4),
"rms": round(rms, 4),
"issues": issues,
}
def compute_waveform(
audio: np.ndarray,
num_points: int = 200,
) -> list[float]:
"""
시각화용 파형 데이터 계산.
오디오를 num_points 개의 구간으로 나누어 각 구간의 평균 진폭을 반환.
Args:
audio: float32 오디오 배열
num_points: 시각화 데이터 포인트 수
Returns:
진폭 값 리스트 (0.0 ~ 1.0)
"""
if len(audio) == 0:
return [0.0] * num_points
chunk_size = max(1, len(audio) // num_points)
waveform = []
for i in range(num_points):
start = i * chunk_size
end = min(start + chunk_size, len(audio))
if start < len(audio):
chunk = audio[start:end]
waveform.append(round(float(np.abs(chunk).mean()), 4))
else:
waveform.append(0.0)
return waveform