Loading...
Loading...
02-reusable-code-python/security/field_encryption.py
"""
@source: 260313 heath-infer-step01
@extracted: 2026-03-14
@description: AES-256-GCM 기반 필드 레벨 암호화. 객체의 특정 필드만 선택적으로 암호화/복호화.
사용법:
encryptor = FieldEncryptor()
# 단일 문자열 암호화/복호화
encrypted = encryptor.encrypt("민감한 데이터")
decrypted = encryptor.decrypt(encrypted)
# 객체의 특정 필드만 암호화
data = {"name": "홍길동", "ssn": "123456-1234567"}
encrypted_data = encryptor.encrypt_fields(data, ["ssn"])
환경 변수:
FIELD_ENCRYPTION_KEY: 암호화 키 (32바이트 base64)
FIELD_ENCRYPTION_ENABLED: 활성화 여부 (기본: true)
의존성:
pip install cryptography
"""
import base64
import json
import os
import secrets as crypto_secrets
from dataclasses import dataclass
from typing import Any, Optional
# 암호화 라이브러리 (선택적)
try:
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
CRYPTO_AVAILABLE = True
except ImportError:
CRYPTO_AVAILABLE = False
print("[WARN] cryptography 미설치 - 필드 암호화 비활성화")
# 암호화 접두사 (암호화된 데이터 식별용)
ENCRYPTED_PREFIX = "ENC::"
# Nonce 크기 (GCM 권장: 12바이트)
NONCE_SIZE = 12
@dataclass
class EncryptedField:
"""암호화된 필드 데이터 구조"""
nonce: bytes
ciphertext: bytes
tag: bytes # GCM 인증 태그
def to_string(self) -> str:
"""Base64 인코딩된 문자열로 변환"""
combined = self.nonce + self.ciphertext
return ENCRYPTED_PREFIX + base64.b64encode(combined).decode('utf-8')
@classmethod
def from_string(cls, encrypted_str: str) -> Optional['EncryptedField']:
"""문자열에서 파싱"""
if not encrypted_str.startswith(ENCRYPTED_PREFIX):
return None
try:
combined = base64.b64decode(encrypted_str[len(ENCRYPTED_PREFIX):])
nonce = combined[:NONCE_SIZE]
ciphertext = combined[NONCE_SIZE:]
return cls(nonce=nonce, ciphertext=ciphertext, tag=b'')
except Exception:
return None
class FieldEncryptor:
"""
필드 레벨 암호화 클래스
AES-256-GCM을 사용하여 개별 필드를 암호화합니다.
민감 필드 목록을 커스터마이즈하여 다양한 도메인에 적용 가능합니다.
"""
# 민감 필드 기본 목록 (범용)
DEFAULT_SENSITIVE_FIELDS = [
'ssn', # 주민등록번호
'social_security',
'phone', # 전화번호
'phone_number',
'email', # 이메일
'address', # 주소
'birth_date', # 생년월일
'birthdate',
'password', # 비밀번호 (해시 권장)
'credit_card', # 신용카드
'bank_account', # 계좌번호
'api_key', # API 키
'secret_key', # 비밀 키
]
def __init__(
self,
key: Optional[bytes] = None,
sensitive_fields: Optional[list[str]] = None,
):
"""
암호화기 초기화
Args:
key: 32바이트 AES 키. None이면 환경 변수에서 로드
sensitive_fields: 도메인별 민감 필드 목록. None이면 DEFAULT_SENSITIVE_FIELDS 사용
"""
self._enabled = os.environ.get('FIELD_ENCRYPTION_ENABLED', 'true').lower() == 'true'
self._aesgcm: Optional[Any] = None
if sensitive_fields is not None:
self.DEFAULT_SENSITIVE_FIELDS = sensitive_fields
if not CRYPTO_AVAILABLE:
self._enabled = False
return
if key:
self._key = key
else:
# 환경 변수에서 키 로드
key_str = os.environ.get('FIELD_ENCRYPTION_KEY')
if key_str:
try:
self._key = base64.b64decode(key_str)
if len(self._key) != 32:
raise ValueError("키는 32바이트여야 합니다")
except Exception as e:
print(f"[ERROR] 암호화 키 파싱 실패: {e}")
self._enabled = False
return
else:
print("[WARN] FIELD_ENCRYPTION_KEY 미설정 - 암호화 비활성화")
self._enabled = False
return
self._aesgcm = AESGCM(self._key)
@property
def is_enabled(self) -> bool:
"""암호화 활성화 여부"""
return self._enabled and self._aesgcm is not None
def encrypt(self, plaintext: str) -> str:
"""
문자열 암호화
Args:
plaintext: 평문
Returns:
암호화된 문자열 (비활성화 시 원본 반환)
"""
if not self.is_enabled:
return plaintext
try:
# 랜덤 nonce 생성
nonce = crypto_secrets.token_bytes(NONCE_SIZE)
# 암호화
ciphertext = self._aesgcm.encrypt(
nonce,
plaintext.encode('utf-8'),
None # 추가 인증 데이터 (AAD) 없음
)
# Base64 인코딩
combined = nonce + ciphertext
return ENCRYPTED_PREFIX + base64.b64encode(combined).decode('utf-8')
except Exception as e:
print(f"[ERROR] 암호화 실패: {e}")
return plaintext
def decrypt(self, encrypted: str) -> str:
"""
문자열 복호화
Args:
encrypted: 암호화된 문자열
Returns:
복호화된 문자열 (암호화되지 않은 경우 원본 반환)
"""
if not encrypted.startswith(ENCRYPTED_PREFIX):
return encrypted
if not self.is_enabled:
print("[WARN] 암호화 비활성화 상태에서 복호화 시도")
return encrypted
try:
# Base64 디코딩
combined = base64.b64decode(encrypted[len(ENCRYPTED_PREFIX):])
nonce = combined[:NONCE_SIZE]
ciphertext = combined[NONCE_SIZE:]
# 복호화
plaintext = self._aesgcm.decrypt(nonce, ciphertext, None)
return plaintext.decode('utf-8')
except Exception as e:
print(f"[ERROR] 복호화 실패: {e}")
return encrypted
def encrypt_fields(
self,
data: dict[str, Any],
fields: Optional[list[str]] = None,
recursive: bool = True
) -> dict[str, Any]:
"""
객체의 특정 필드들을 암호화
Args:
data: 원본 데이터
fields: 암호화할 필드 목록 (None이면 기본 민감 필드 사용)
recursive: 중첩 객체도 처리할지 여부
Returns:
암호화된 데이터 (새 객체)
"""
if not self.is_enabled:
return data
target_fields = set(fields or self.DEFAULT_SENSITIVE_FIELDS)
return self._encrypt_recursive(data, target_fields, recursive)
def _encrypt_recursive(
self,
data: Any,
fields: set[str],
recursive: bool
) -> Any:
"""재귀적 암호화"""
if isinstance(data, dict):
result = {}
for key, value in data.items():
if key in fields and isinstance(value, str):
result[key] = self.encrypt(value)
elif recursive and isinstance(value, (dict, list)):
result[key] = self._encrypt_recursive(value, fields, recursive)
else:
result[key] = value
return result
elif isinstance(data, list):
return [
self._encrypt_recursive(item, fields, recursive)
for item in data
]
return data
def decrypt_fields(
self,
data: dict[str, Any],
fields: Optional[list[str]] = None,
recursive: bool = True
) -> dict[str, Any]:
"""
객체의 암호화된 필드들을 복호화
Args:
data: 암호화된 데이터
fields: 복호화할 필드 목록 (None이면 기본 민감 필드 사용)
recursive: 중첩 객체도 처리할지 여부
Returns:
복호화된 데이터 (새 객체)
"""
if not self.is_enabled:
return data
target_fields = set(fields or self.DEFAULT_SENSITIVE_FIELDS)
return self._decrypt_recursive(data, target_fields, recursive)
def _decrypt_recursive(
self,
data: Any,
fields: set[str],
recursive: bool
) -> Any:
"""재귀적 복호화"""
if isinstance(data, dict):
result = {}
for key, value in data.items():
if key in fields and isinstance(value, str):
result[key] = self.decrypt(value)
elif recursive and isinstance(value, (dict, list)):
result[key] = self._decrypt_recursive(value, fields, recursive)
else:
result[key] = value
return result
elif isinstance(data, list):
return [
self._decrypt_recursive(item, fields, recursive)
for item in data
]
return data
def is_encrypted(self, value: str) -> bool:
"""값이 암호화되어 있는지 확인"""
return isinstance(value, str) and value.startswith(ENCRYPTED_PREFIX)
# 싱글톤
_encryptor: Optional[FieldEncryptor] = None
def get_field_encryptor() -> FieldEncryptor:
"""필드 암호화기 싱글톤"""
global _encryptor
if _encryptor is None:
_encryptor = FieldEncryptor()
return _encryptor
def generate_encryption_key() -> str:
"""
새 암호화 키 생성
Returns:
Base64 인코딩된 32바이트 키
"""
key = crypto_secrets.token_bytes(32)
return base64.b64encode(key).decode('utf-8')