diff --git a/src/agentready/services/llm_cache.py b/src/agentready/services/llm_cache.py index 3ce2db7a..600a2453 100644 --- a/src/agentready/services/llm_cache.py +++ b/src/agentready/services/llm_cache.py @@ -34,7 +34,11 @@ def get(self, cache_key: str) -> DiscoveredSkill | None: Returns: Cached DiscoveredSkill or None if miss/expired """ - cache_file = self.cache_dir / f"{cache_key}.json" + # Security: Validate cache_key to prevent path traversal + cache_file = self._get_safe_cache_path(cache_key) + if cache_file is None: + logger.warning(f"Invalid cache key rejected: {cache_key}") + return None if not cache_file.exists(): logger.debug(f"Cache miss: {cache_key}") @@ -65,7 +69,11 @@ def set(self, cache_key: str, skill: DiscoveredSkill): cache_key: Unique cache key skill: DiscoveredSkill to cache """ - cache_file = self.cache_dir / f"{cache_key}.json" + # Security: Validate cache_key to prevent path traversal + cache_file = self._get_safe_cache_path(cache_key) + if cache_file is None: + logger.warning(f"Invalid cache key rejected: {cache_key}") + return try: data = { @@ -81,6 +89,38 @@ def set(self, cache_key: str, skill: DiscoveredSkill): except Exception as e: logger.warning(f"Cache write error for {cache_key}: {e}") + def _get_safe_cache_path(self, cache_key: str) -> Path | None: + """Validate cache key and return safe path. + + Security: Prevents path traversal attacks by validating cache_key + contains no directory separators and resolves within cache_dir. + + Args: + cache_key: Cache key to validate + + Returns: + Validated Path or None if invalid + """ + # Reject keys with path separators (/, \) + if "/" in cache_key or "\\" in cache_key: + return None + + # Reject keys with null bytes or other dangerous characters + if "\0" in cache_key or ".." in cache_key: + return None + + # Construct path and resolve to canonical form + cache_file = (self.cache_dir / f"{cache_key}.json").resolve() + + # Ensure resolved path is within cache directory + try: + cache_file.relative_to(self.cache_dir.resolve()) + except ValueError: + # Path is outside cache_dir + return None + + return cache_file + @staticmethod def generate_key(attribute_id: str, score: float, evidence_hash: str) -> str: """Generate cache key from finding attributes.