-
Notifications
You must be signed in to change notification settings - Fork 0
KR_Security
somaz edited this page Apr 23, 2025
·
4 revisions
import re
from html import escape
from urllib.parse import quote
def validate_input(user_input: str) -> bool:
# 기본적인 입력 검증
if not user_input:
return False
# SQL 인젝션 패턴 검사
sql_patterns = r'(\b(SELECT|INSERT|UPDATE|DELETE|DROP|UNION|ALTER)\b)'
if re.search(sql_patterns, user_input, re.IGNORECASE):
return False
# XSS 패턴 검사
xss_patterns = r'<script|javascript:|on\w+='
if re.search(xss_patterns, user_input, re.IGNORECASE):
return False
return True
def sanitize_html(html_content: str) -> str:
return escape(html_content)
def sanitize_url(url: str) -> str:
return quote(url)✅ 특징:
- 입력 검증
- SQL 인젝션 방지
- XSS 방지
from cryptography.fernet import Fernet
import hashlib
import secrets
import base64
from typing import Tuple
class Encryption:
def __init__(self):
self.key = Fernet.generate_key()
self.cipher_suite = Fernet(self.key)
def encrypt(self, data: str) -> bytes:
return self.cipher_suite.encrypt(data.encode())
def decrypt(self, encrypted_data: bytes) -> str:
return self.cipher_suite.decrypt(encrypted_data).decode()
def hash_password(password: str, salt: bytes = None) -> Tuple[bytes, bytes]:
if salt is None:
salt = secrets.token_bytes(32)
key = hashlib.pbkdf2_hmac(
'sha256',
password.encode(),
salt,
100000 # 반복 횟수
)
return key, salt✅ 특징:
- 대칭 암호화
- 비밀번호 해싱
- 솔트 사용
from datetime import datetime, timedelta
import jwt
from typing import Dict, Optional
class SessionManager:
def __init__(self, secret_key: str):
self.secret_key = secret_key
self.sessions: Dict[str, dict] = {}
def create_session(self, user_id: str) -> str:
session_id = secrets.token_urlsafe(32)
self.sessions[session_id] = {
'user_id': user_id,
'created_at': datetime.now(),
'expires_at': datetime.now() + timedelta(hours=24)
}
return session_id
class JWTManager:
def __init__(self, secret_key: str):
self.secret_key = secret_key
def create_token(self, user_data: dict) -> str:
payload = {
**user_data,
'exp': datetime.utcnow() + timedelta(hours=24)
}
return jwt.encode(payload, self.secret_key, algorithm='HS256')✅ 특징:
- 세션 관리
- JWT 토큰
- 만료 시간 설정
import ssl
import socket
from http.server import HTTPServer, SimpleHTTPRequestHandler
def create_https_server(
certfile: str,
keyfile: str,
port: int = 443
) -> HTTPServer:
context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
context.load_cert_chain(certfile, keyfile)
server = HTTPServer(('0.0.0.0', port), SimpleHTTPRequestHandler)
server.socket = context.wrap_socket(server.socket, server_side=True)
return server✅ 특징:
- SSL/TLS 설정
- HTTPS 서버
- 인증서 관리
import secrets
import hmac
import base64
import os
import re
from typing import Dict, List, Any, Tuple, Optional
import bleach
from werkzeug.utils import secure_filename
class CSRFProtection:
"""
CSRF 공격 방어를 위한 토큰 관리
"""
def __init__(self, secret_key: str):
self.secret_key = secret_key.encode() if isinstance(secret_key, str) else secret_key
self.tokens: Dict[str, float] = {} # {session_id: 만료시간}
def generate_token(self, session_id: str) -> str:
"""
CSRF 토큰 생성
Args:
session_id: 사용자 세션 ID
Returns:
str: 생성된 CSRF 토큰
"""
# 랜덤 토큰 생성
csrf_token = secrets.token_hex(32)
# HMAC으로 서명
signature = hmac.new(
self.secret_key,
f"{session_id}:{csrf_token}".encode(),
digestmod='sha256'
).digest()
# 토큰과 서명 결합 및 인코딩
token = base64.urlsafe_b64encode(
f"{csrf_token}:{base64.b64encode(signature).decode()}".encode()
).decode()
return token
def validate_token(self, session_id: str, token: str) -> bool:
"""
CSRF 토큰 검증
Args:
session_id: 사용자 세션 ID
token: 검증할 CSRF 토큰
Returns:
bool: 토큰 유효성 여부
"""
try:
# 토큰 디코딩
decoded = base64.urlsafe_b64decode(token.encode()).decode()
csrf_token, received_signature_b64 = decoded.split(':', 1)
received_signature = base64.b64decode(received_signature_b64)
# 서명 재생성하여 비교
expected_signature = hmac.new(
self.secret_key,
f"{session_id}:{csrf_token}".encode(),
digestmod='sha256'
).digest()
# 일정 시간 내 비교를 방지하는 상수 시간 비교
return hmac.compare_digest(received_signature, expected_signature)
except Exception:
return False
class ContentSecurityPolicy:
"""
콘텐츠 보안 정책(CSP) 관리
"""
def __init__(self):
self.policies = {
'default-src': ["'self'"],
'script-src': ["'self'"],
'style-src': ["'self'"],
'img-src': ["'self'"],
'connect-src': ["'self'"],
'font-src': ["'self'"],
'object-src': ["'none'"],
'media-src': ["'self'"],
'frame-src': ["'none'"],
}
def add_source(self, directive: str, source: str) -> None:
"""
CSP 소스 추가
Args:
directive: CSP 지시문
source: 추가할 소스
"""
if directive in self.policies:
if source not in self.policies[directive]:
self.policies[directive].append(source)
else:
self.policies[directive] = [source]
def get_header(self) -> str:
"""
CSP 헤더 생성
Returns:
str: CSP 헤더 값
"""
policy_parts = []
for directive, sources in self.policies.items():
policy_parts.append(f"{directive} {' '.join(sources)}")
return "; ".join(policy_parts)
class SecureFileUpload:
"""
안전한 파일 업로드 처리
"""
def __init__(self, upload_dir: str, allowed_extensions: List[str], max_size: int = 10 * 1024 * 1024):
self.upload_dir = upload_dir
self.allowed_extensions = allowed_extensions
self.max_size = max_size
# 업로드 디렉토리 생성
os.makedirs(upload_dir, exist_ok=True)
def is_allowed_file(self, filename: str) -> bool:
"""
파일 확장자 검증
Args:
filename: 검증할 파일명
Returns:
bool: 허용된 확장자 여부
"""
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in self.allowed_extensions
def sanitize_filename(self, filename: str) -> str:
"""
안전한 파일명으로 변환
Args:
filename: 원본 파일명
Returns:
str: 안전한 파일명
"""
# werkzeug의 secure_filename으로 기본 처리
safe_name = secure_filename(filename)
# 추가 제한: 알파벳, 숫자, 일부 특수문자만 허용
safe_name = re.sub(r'[^a-zA-Z0-9._-]', '', safe_name)
# 중복 방지를 위한 유니크 식별자 추가
name_parts = safe_name.rsplit('.', 1)
name_parts[0] = f"{name_parts[0]}_{secrets.token_hex(8)}"
return '.'.join(name_parts)
def save_file(self, file_data: bytes, original_filename: str) -> Tuple[bool, str]:
"""
파일 저장
Args:
file_data: 파일 데이터
original_filename: 원본 파일명
Returns:
Tuple[bool, str]: (성공 여부, 메시지 또는 저장 경로)
"""
if len(file_data) > self.max_size:
return False, "파일 크기가 제한을 초과합니다."
if not self.is_allowed_file(original_filename):
return False, "허용되지 않는 파일 형식입니다."
safe_filename = self.sanitize_filename(original_filename)
file_path = os.path.join(self.upload_dir, safe_filename)
try:
with open(file_path, 'wb') as f:
f.write(file_data)
return True, file_path
except Exception as e:
return False, f"파일 저장 오류: {str(e)}"
class XSSProtection:
"""
XSS 공격 방어
"""
def __init__(self):
# bleach 기본 설정
self.allowed_tags = [
'a', 'abbr', 'acronym', 'b', 'blockquote', 'code', 'em',
'i', 'li', 'ol', 'strong', 'ul', 'p', 'br', 'h1', 'h2',
'h3', 'h4', 'h5', 'h6', 'pre', 'span'
]
self.allowed_attributes = {
'a': ['href', 'title', 'rel'],
'abbr': ['title'],
'acronym': ['title'],
'*': ['class', 'id']
}
def sanitize(self, html_content: str) -> str:
"""
HTML 콘텐츠 정화
Args:
html_content: 정화할 HTML 문자열
Returns:
str: 정화된 HTML
"""
return bleach.clean(
html_content,
tags=self.allowed_tags,
attributes=self.allowed_attributes,
strip=True
)
def get_security_headers(self) -> Dict[str, str]:
"""
XSS 방지를 위한 보안 헤더
Returns:
Dict[str, str]: 보안 헤더 목록
"""
return {
'X-XSS-Protection': '1; mode=block',
'X-Content-Type-Options': 'nosniff',
'Referrer-Policy': 'strict-origin-when-cross-origin'
}✅ 특징:
- CSRF 토큰 관리와 검증
- 콘텐츠 보안 정책(CSP) 구현
- 안전한 파일 업로드 처리
- XSS 방어 및 콘텐츠 정화
- bleach를 활용한 HTML 필터링
- 보안 헤더 설정
- HMAC 서명 검증
- 상수 시간 비교로 타이밍 공격 방지
- 파일명 안전화 및 유효성 검사
- 업로드 크기 제한
✅ 모범 사례:
- 정기적인 보안 감사
- 의존성 취약점 검사
- 접근 제어 구현
- 로깅과 모니터링
- 에러 처리와 정보 노출
- 안전한 파일 처리
- 데이터베이스 보안
- API 보안
- 백업과 복구
- 보안 업데이트
- 최소 권한 원칙 적용
- 파이썬 패키지 무결성 검증
- 컨테이너화된 환경에서의 보안
- 정규 표현식 DoS 공격(ReDoS) 방지
- 시크릿 관리 도구 활용
- 보안 코드 리뷰 프로세스 구축
- 런타임 보안 모니터링
- 멀티팩터 인증(MFA) 구현
- 패스워드 정책 강화
- API 요청 제한 및 조절