diff --git a/tests/unit/utils/__init__.py b/tests/unit/utils/__init__.py new file mode 100644 index 00000000..ab175e8e --- /dev/null +++ b/tests/unit/utils/__init__.py @@ -0,0 +1 @@ +"""Unit tests for agentready.utils package.""" diff --git a/tests/unit/utils/test_privacy.py b/tests/unit/utils/test_privacy.py new file mode 100644 index 00000000..754d54b2 --- /dev/null +++ b/tests/unit/utils/test_privacy.py @@ -0,0 +1,471 @@ +"""Unit tests for privacy utilities.""" + +import getpass +from pathlib import Path +from unittest.mock import patch + +import pytest + +from agentready.utils.privacy import ( + sanitize_command_args, + sanitize_error_message, + sanitize_metadata, + sanitize_path, + shorten_commit_hash, +) + + +class TestSanitizePath: + """Test path sanitization for public display.""" + + def test_sanitize_path_with_relative_to(self, tmp_path): + """Test relative path calculation.""" + base = tmp_path / "base" + base.mkdir() + target = base / "subdir" / "file.txt" + target.parent.mkdir(parents=True) + target.touch() + + result = sanitize_path(target, relative_to=base) + assert result == str(Path("subdir") / "file.txt") + + def test_sanitize_path_not_relative_to(self, tmp_path): + """Test path not relative to specified directory.""" + base = tmp_path / "base" + base.mkdir() + other = tmp_path / "other" / "file.txt" + other.parent.mkdir(parents=True) + other.touch() + + # Should fallback to home directory replacement + result = sanitize_path(other, relative_to=base) + assert str(other) in result or "~" in result + + def test_sanitize_path_home_directory(self): + """Test home directory replacement.""" + home = Path.home() + test_path = home / "project" / "src" + + result = sanitize_path(test_path) + assert result.startswith("~") + assert "project" in result + + @patch("pathlib.Path.home") + def test_sanitize_path_home_error(self, mock_home, tmp_path): + """Test home directory error handling.""" + mock_home.side_effect = RuntimeError("No home directory") + test_path = tmp_path / "project" + + # Should not crash, just return path without home replacement + result = sanitize_path(test_path) + assert isinstance(result, str) + + @patch("getpass.getuser") + def test_sanitize_path_username_unix(self, mock_user): + """Test username redaction on Unix paths.""" + mock_user.return_value = "testuser" + path = "/home/testuser/project/src" + + result = sanitize_path(path) + assert result == "~/project/src" + + @patch("getpass.getuser") + def test_sanitize_path_username_macos(self, mock_user): + """Test username redaction on macOS paths.""" + mock_user.return_value = "testuser" + path = "/Users/testuser/project/src" + + result = sanitize_path(path) + assert result == "~/project/src" + + @patch("getpass.getuser") + def test_sanitize_path_username_windows(self, mock_user): + """Test username redaction on Windows paths.""" + mock_user.return_value = "testuser" + path = "C:\\Users\\testuser\\project\\src" + + result = sanitize_path(path) + assert result == "~\\project\\src" + + @patch("getpass.getuser") + def test_sanitize_path_username_in_middle(self, mock_user): + """Test username redaction in middle of path.""" + mock_user.return_value = "testuser" + path = "/var/testuser/data" + + result = sanitize_path(path) + assert "testuser" not in result or result == "/var//data" + + @patch("getpass.getuser") + def test_sanitize_path_username_error(self, mock_user, tmp_path): + """Test username error handling.""" + mock_user.side_effect = Exception("No username") + test_path = tmp_path / "project" + + # Should not crash, just return path without username replacement + result = sanitize_path(test_path) + assert isinstance(result, str) + + def test_sanitize_path_string_input(self): + """Test string path input.""" + path_str = "/tmp/test/file.txt" + result = sanitize_path(path_str) + assert isinstance(result, str) + + def test_sanitize_path_path_object(self, tmp_path): + """Test Path object input.""" + path_obj = tmp_path / "test" + result = sanitize_path(path_obj) + assert isinstance(result, str) + + +class TestSanitizeCommandArgs: + """Test command-line argument sanitization.""" + + def test_sanitize_basic_args(self): + """Test basic arguments pass through.""" + args = ["agentready", "assess", "."] + result = sanitize_command_args(args) + assert result == args + + def test_sanitize_config_flag(self): + """Test --config flag redaction.""" + args = ["agentready", "assess", ".", "--config", "/secret/config.yaml"] + result = sanitize_command_args(args) + + assert "--config" in result + assert "" in result + assert "/secret/config.yaml" not in result + + @pytest.mark.parametrize( + "flag", + ["--config", "-c", "--api-key", "--token", "--password"], + ) + def test_sanitize_sensitive_flags(self, flag): + """Test various sensitive flags are redacted.""" + args = ["cmd", flag, "secret-value"] + result = sanitize_command_args(args) + + assert flag in result + assert "" in result + assert "secret-value" not in result + + def test_sanitize_absolute_path_unix(self): + """Test absolute Unix path sanitization.""" + args = ["agentready", "assess", "/home/user/project"] + result = sanitize_command_args(args) + + # Path should be sanitized + assert result[0] == "agentready" + assert result[1] == "assess" + # Third arg should be sanitized path (contains ~) + assert "~" in result[2] or "<" in result[2] + + def test_sanitize_absolute_path_windows(self): + """Test absolute Windows path sanitization.""" + args = ["agentready", "assess", "C:\\Users\\test\\project"] + result = sanitize_command_args(args) + + # Path should be sanitized + assert result[0] == "agentready" + assert result[1] == "assess" + assert "~" in result[2] or "<" in result[2] + + def test_sanitize_tilde_path(self): + """Test tilde path sanitization.""" + args = ["agentready", "assess", "~/project"] + result = sanitize_command_args(args) + + # Tilde path should be sanitized + assert "~" in result[2] or "<" in result[2] + + def test_sanitize_api_key_in_arg(self): + """Test API key in argument value.""" + args = ["cmd", "--key", "sk-ant-api03-1234567890abcdefghij"] + result = sanitize_command_args(args) + + assert result == ["cmd", "--key", ""] + + def test_sanitize_api_key_pattern(self): + """Test API key pattern detection.""" + args = ["authenticate", "sk-test-1234567890abcdefghij"] + result = sanitize_command_args(args) + + assert "" in result + assert "sk-test" not in result + + def test_sanitize_mixed_args(self): + """Test mixed sensitive and safe arguments.""" + args = [ + "agentready", + "assess", + "/home/user/project", + "--config", + "config.yaml", + "--verbose", + ] + result = sanitize_command_args(args) + + assert result[0] == "agentready" + assert result[1] == "assess" + assert "--config" in result + assert "" in result + assert "--verbose" in result + + def test_sanitize_empty_args(self): + """Test empty arguments list.""" + args = [] + result = sanitize_command_args(args) + assert result == [] + + +class TestSanitizeErrorMessage: + """Test error message sanitization.""" + + def test_sanitize_empty_message(self): + """Test empty message handling.""" + result = sanitize_error_message("") + assert result == "" + + def test_sanitize_none_message(self): + """Test None message handling.""" + result = sanitize_error_message(None) + assert result is None + + def test_sanitize_repo_path(self, tmp_path): + """Test repository path redaction.""" + repo = tmp_path / "project" + repo.mkdir() + message = f"Error in {repo}/src/file.py" + + result = sanitize_error_message(message, repo_path=repo) + assert "" in result + assert str(repo) not in result + + def test_sanitize_home_directory(self): + """Test home directory redaction.""" + home = Path.home() + message = f"Error in {home}/project/file.py" + + result = sanitize_error_message(message) + assert "" in result + assert str(home) not in result + + @patch("pathlib.Path.home") + def test_sanitize_home_directory_error(self, mock_home): + """Test home directory error handling.""" + mock_home.side_effect = RuntimeError("No home") + message = "Error occurred" + + # Should not crash + result = sanitize_error_message(message) + assert result == "Error occurred" + + @patch("getpass.getuser") + def test_sanitize_username_unix(self, mock_user): + """Test username redaction in Unix paths.""" + mock_user.return_value = "testuser" + message = "Error in /home/testuser/project" + + result = sanitize_error_message(message) + assert "testuser" not in result or "//" in result + + @patch("getpass.getuser") + def test_sanitize_username_windows(self, mock_user): + """Test username redaction in Windows paths.""" + mock_user.return_value = "testuser" + message = "Error in C:\\Users\\testuser\\project" + + result = sanitize_error_message(message) + assert "testuser" not in result or "\\\\" in result + + @patch("getpass.getuser") + def test_sanitize_username_error(self, mock_user): + """Test username error handling.""" + mock_user.side_effect = Exception("No username") + message = "Error occurred" + + # Should not crash + result = sanitize_error_message(message) + assert result == "Error occurred" + + def test_sanitize_absolute_paths(self): + """Test absolute path redaction.""" + message = "Error in /var/log/app.log and C:\\Windows\\System32\\file.dll" + + result = sanitize_error_message(message) + assert "/var/log/app.log" not in result + assert "C:\\Windows\\System32" not in result + assert "" in result + + def test_sanitize_anthropic_api_key(self): + """Test Anthropic API key redaction.""" + message = "Auth failed: sk-ant-api03-abcdefghij1234567890" + + result = sanitize_error_message(message) + assert "sk-ant" not in result + assert "" in result + + def test_sanitize_generic_api_key(self): + """Test generic API key redaction.""" + message = "Invalid key: sk-test-1234567890abcdefghijklmnop" + + result = sanitize_error_message(message) + assert "sk-test" not in result + assert "" in result + + def test_sanitize_email_address(self): + """Test email address redaction.""" + message = "Contact user@example.com or admin@company.co.uk" + + result = sanitize_error_message(message) + assert "user@example.com" not in result + assert "admin@company.co.uk" not in result + assert "" in result + + def test_sanitize_multiple_emails(self): + """Test multiple email redaction.""" + message = "Contact john.doe@example.com or jane_smith+test@domain.io" + + result = sanitize_error_message(message) + assert "@" not in result or result.count("") >= 2 + + def test_sanitize_long_message(self): + """Test long message truncation.""" + message = "Error: " + ("x" * 2000) + + result = sanitize_error_message(message) + assert len(result) <= 1100 # 1000 + "... (truncated)" + assert "truncated" in result + + def test_sanitize_combined_patterns(self): + """Test multiple patterns in one message.""" + message = ( + "Error in /home/user/project: " + "sk-ant-api03-test123456 " + "contact admin@example.com" + ) + + result = sanitize_error_message(message) + assert "/home/user" not in result + assert "sk-ant" not in result + assert "admin@example.com" not in result + assert "" in result or "" in result + assert "" in result + assert "" in result + + +class TestShortenCommitHash: + """Test git commit hash shortening.""" + + def test_shorten_full_hash(self): + """Test shortening full 40-character hash.""" + hash_full = "abc123def456789012345678901234567890abcd" + result = shorten_commit_hash(hash_full) + assert result == "abc123de" + + def test_shorten_short_hash(self): + """Test shortening already short hash.""" + hash_short = "abc123" + result = shorten_commit_hash(hash_short) + assert result == "abc123" + + def test_shorten_8_char_hash(self): + """Test shortening exactly 8-character hash.""" + hash_8 = "abc12345" + result = shorten_commit_hash(hash_8) + assert result == "abc12345" + + def test_shorten_empty_hash(self): + """Test empty hash handling.""" + result = shorten_commit_hash("") + assert result == "" + + def test_shorten_none_hash(self): + """Test None hash handling.""" + result = shorten_commit_hash(None) + assert result is None + + +class TestSanitizeMetadata: + """Test metadata dictionary sanitization.""" + + def test_sanitize_empty_metadata(self): + """Test empty metadata.""" + metadata = {} + result = sanitize_metadata(metadata) + assert result == {} + + def test_sanitize_command_field(self): + """Test command field sanitization.""" + metadata = {"command": "agentready assess /home/user/project --config secret.yaml"} + result = sanitize_metadata(metadata) + + assert "command" in result + assert "/home/user/project" not in result["command"] + assert "" in result["command"] + + def test_sanitize_path_like_string(self): + """Test path-like string sanitization.""" + metadata = {"file_path": "/home/user/project/src/file.py"} + result = sanitize_metadata(metadata) + + assert "file_path" in result + # Should be sanitized with ~ or + assert "~" in result["file_path"] or "<" in result["file_path"] + + def test_sanitize_windows_path(self): + """Test Windows path sanitization.""" + metadata = {"path": "C:\\Users\\test\\project\\file.py"} + result = sanitize_metadata(metadata) + + assert "path" in result + assert "~" in result["path"] or "<" in result["path"] + + def test_sanitize_non_path_strings(self): + """Test non-path strings pass through.""" + metadata = { + "name": "test-project", + "version": "1.0.0", + "status": "success", + } + result = sanitize_metadata(metadata) + assert result == metadata + + def test_sanitize_mixed_metadata(self): + """Test mixed metadata with paths and values.""" + metadata = { + "name": "project", + "path": "/home/user/project", + "command": "assess /secret --config api.yaml", + "count": 42, + } + result = sanitize_metadata(metadata) + + assert result["name"] == "project" + assert result["count"] == 42 + assert "/home/user" not in result["path"] + assert "/secret" not in result["command"] + assert "" in result["command"] + + def test_sanitize_nested_values(self): + """Test that nested structures are not deeply sanitized.""" + metadata = { + "simple": "value", + "path": "/home/user/file", + } + result = sanitize_metadata(metadata) + + assert result["simple"] == "value" + assert "~" in result["path"] or "<" in result["path"] + + def test_sanitize_preserves_types(self): + """Test non-string types are preserved.""" + metadata = { + "count": 42, + "ratio": 3.14, + "enabled": True, + "items": None, + } + result = sanitize_metadata(metadata) + assert result == metadata diff --git a/tests/unit/utils/test_subprocess_utils.py b/tests/unit/utils/test_subprocess_utils.py new file mode 100644 index 00000000..1ac86438 --- /dev/null +++ b/tests/unit/utils/test_subprocess_utils.py @@ -0,0 +1,408 @@ +"""Unit tests for subprocess utilities.""" + +import subprocess +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from agentready.utils.subprocess_utils import ( + MAX_OUTPUT_SIZE, + SUBPROCESS_TIMEOUT, + SubprocessSecurityError, + safe_subprocess_run, + sanitize_subprocess_error, + validate_repository_path, +) + + +class TestValidateRepositoryPath: + """Test repository path validation.""" + + def test_validate_valid_git_repository(self, tmp_path): + """Test validation of valid git repository.""" + repo = tmp_path / "repo" + repo.mkdir() + (repo / ".git").mkdir() + + result = validate_repository_path(repo) + assert result == repo.resolve() + + def test_validate_git_file_worktree(self, tmp_path): + """Test validation of git worktree (with .git file).""" + repo = tmp_path / "repo" + repo.mkdir() + (repo / ".git").write_text("gitdir: /path/to/worktree") + + result = validate_repository_path(repo) + assert result == repo.resolve() + + def test_validate_non_git_directory(self, tmp_path): + """Test validation fails for non-git directory.""" + repo = tmp_path / "not-a-repo" + repo.mkdir() + + with pytest.raises(SubprocessSecurityError, match="Not a git repository"): + validate_repository_path(repo) + + def test_validate_forbidden_etc(self, tmp_path): + """Test validation blocks /etc directory.""" + # Create a mock path that starts with /etc + with patch.object(Path, "resolve") as mock_resolve: + mock_resolve.return_value = Path("/etc/project") + + with pytest.raises(SubprocessSecurityError, match="Cannot access sensitive directory"): + validate_repository_path(Path("/etc/project")) + + def test_validate_forbidden_sys(self, tmp_path): + """Test validation blocks /sys directory.""" + with patch.object(Path, "resolve") as mock_resolve: + mock_resolve.return_value = Path("/sys/project") + + with pytest.raises(SubprocessSecurityError, match="Cannot access sensitive directory"): + validate_repository_path(Path("/sys/project")) + + def test_validate_forbidden_proc(self, tmp_path): + """Test validation blocks /proc directory.""" + with patch.object(Path, "resolve") as mock_resolve: + mock_resolve.return_value = Path("/proc/project") + + with pytest.raises(SubprocessSecurityError, match="Cannot access sensitive directory"): + validate_repository_path(Path("/proc/project")) + + def test_validate_forbidden_root_ssh(self, tmp_path): + """Test validation blocks /.ssh directory.""" + with patch.object(Path, "resolve") as mock_resolve: + mock_resolve.return_value = Path("/.ssh") + + with pytest.raises(SubprocessSecurityError, match="Cannot access sensitive directory"): + validate_repository_path(Path("/.ssh")) + + def test_validate_symlink_resolution(self, tmp_path): + """Test that symlinks are resolved to actual path.""" + real_repo = tmp_path / "real-repo" + real_repo.mkdir() + (real_repo / ".git").mkdir() + + symlink = tmp_path / "symlink-repo" + symlink.symlink_to(real_repo) + + result = validate_repository_path(symlink) + assert result == real_repo.resolve() + + def test_validate_resolve_error(self, tmp_path): + """Test handling of path resolution errors.""" + with patch.object(Path, "resolve", side_effect=OSError("Cannot resolve")): + with pytest.raises(SubprocessSecurityError, match="Cannot resolve path"): + validate_repository_path(Path("/nonexistent")) + + def test_validate_runtime_error(self, tmp_path): + """Test handling of runtime errors during resolution.""" + with patch.object(Path, "resolve", side_effect=RuntimeError("Runtime issue")): + with pytest.raises(SubprocessSecurityError, match="Cannot resolve path"): + validate_repository_path(Path("/test")) + + +class TestSanitizeSubprocessError: + """Test subprocess error sanitization.""" + + def test_sanitize_basic_error(self): + """Test basic error message sanitization.""" + error = Exception("Command failed") + result = sanitize_subprocess_error(error) + assert result == "Command failed" + + def test_sanitize_repo_path(self, tmp_path): + """Test repository path redaction.""" + repo = tmp_path / "project" + repo.mkdir() + error = Exception(f"Error in {repo}/src/file.py") + + result = sanitize_subprocess_error(error, repo_path=repo) + assert "" in result + assert str(repo) not in result + + def test_sanitize_home_directory(self): + """Test home directory redaction.""" + home = Path.home() + error = Exception(f"Error in {home}/project") + + result = sanitize_subprocess_error(error) + assert "" in result + assert str(home) not in result + + @patch("pathlib.Path.home") + def test_sanitize_home_directory_error(self, mock_home): + """Test home directory error handling.""" + mock_home.side_effect = RuntimeError("No home") + error = Exception("Error occurred") + + # Should not crash + result = sanitize_subprocess_error(error) + assert result == "Error occurred" + + @patch("getpass.getuser") + def test_sanitize_username_unix(self, mock_user): + """Test username redaction in Unix paths.""" + mock_user.return_value = "testuser" + error = Exception("Error in /home/testuser/project") + + result = sanitize_subprocess_error(error) + assert "testuser" not in result or "//" in result + + @patch("getpass.getuser") + def test_sanitize_username_windows(self, mock_user): + """Test username redaction in Windows paths.""" + mock_user.return_value = "testuser" + error = Exception("Error in C:\\Users\\testuser\\project") + + result = sanitize_subprocess_error(error) + assert "testuser" not in result or "\\\\" in result + + @patch("getpass.getuser") + def test_sanitize_username_error(self, mock_user): + """Test username error handling.""" + mock_user.side_effect = Exception("No username") + error = Exception("Error occurred") + + # Should not crash + result = sanitize_subprocess_error(error) + assert result == "Error occurred" + + def test_sanitize_long_error(self): + """Test long error message truncation.""" + long_message = "Error: " + ("x" * 1000) + error = Exception(long_message) + + result = sanitize_subprocess_error(error) + assert len(result) <= 550 # 500 + "... (truncated)" + assert "truncated" in result + + def test_sanitize_combined_patterns(self, tmp_path): + """Test multiple patterns in error message.""" + repo = tmp_path / "project" + repo.mkdir() + home = Path.home() + error = Exception(f"Failed in {repo} and {home}") + + result = sanitize_subprocess_error(error, repo_path=repo) + assert str(repo) not in result + assert str(home) not in result + + +class TestSafeSubprocessRun: + """Test safe subprocess execution.""" + + def test_run_simple_command(self): + """Test successful command execution.""" + result = safe_subprocess_run(["echo", "hello"], capture_output=True, text=True) + + assert result.returncode == 0 + assert "hello" in result.stdout + + def test_run_failed_command(self): + """Test failed command execution.""" + # Use 'false' command which always fails + result = safe_subprocess_run(["false"]) + assert result.returncode != 0 + + def test_run_with_custom_timeout(self): + """Test command with custom timeout.""" + # Quick command should complete within short timeout + result = safe_subprocess_run( + ["echo", "test"], + timeout=1, + capture_output=True, + text=True, + ) + assert result.returncode == 0 + + def test_run_timeout_expired(self): + """Test command timeout.""" + with pytest.raises(subprocess.TimeoutExpired): + safe_subprocess_run(["sleep", "10"], timeout=0.1) + + def test_run_default_timeout(self): + """Test default timeout is enforced.""" + # Verify default timeout constant exists + assert SUBPROCESS_TIMEOUT == 120 + + # Quick command should work with default timeout + result = safe_subprocess_run(["echo", "test"], capture_output=True, text=True) + assert result.returncode == 0 + + def test_run_with_cwd(self, tmp_path): + """Test command execution with working directory.""" + # Create a git repo + repo = tmp_path / "repo" + repo.mkdir() + (repo / ".git").mkdir() + + result = safe_subprocess_run( + ["pwd"], + cwd=repo, + capture_output=True, + text=True, + ) + assert result.returncode == 0 + assert str(repo) in result.stdout + + def test_run_with_non_git_cwd(self, tmp_path): + """Test command with non-git working directory.""" + # Non-git directory should work (validation is skipped) + non_git = tmp_path / "non-git" + non_git.mkdir() + + result = safe_subprocess_run( + ["echo", "test"], + cwd=non_git, + capture_output=True, + text=True, + ) + assert result.returncode == 0 + + def test_run_shell_forbidden(self): + """Test that shell=True is blocked.""" + with pytest.raises(SubprocessSecurityError, match="shell=True is forbidden"): + safe_subprocess_run(["echo test"], shell=True) + + def test_run_output_size_limit_stdout(self): + """Test stdout size limit enforcement.""" + # Create command that outputs too much data + large_output = "x" * (MAX_OUTPUT_SIZE + 1000) + + with patch("subprocess.run") as mock_run: + # Mock successful run with large output + mock_result = MagicMock() + mock_result.returncode = 0 + mock_result.stdout = large_output.encode() + mock_result.stderr = b"" + mock_run.return_value = mock_result + + with pytest.raises(SubprocessSecurityError, match="output too large"): + safe_subprocess_run(["echo", "test"], capture_output=True) + + def test_run_output_size_limit_stderr(self): + """Test stderr size limit enforcement.""" + large_error = "x" * (MAX_OUTPUT_SIZE + 1000) + + with patch("subprocess.run") as mock_run: + mock_result = MagicMock() + mock_result.returncode = 1 + mock_result.stdout = b"" + mock_result.stderr = large_error.encode() + mock_run.return_value = mock_result + + with pytest.raises(SubprocessSecurityError, match="stderr too large"): + safe_subprocess_run(["cmd"], capture_output=True) + + def test_run_output_size_within_limit(self): + """Test output within size limit passes.""" + result = safe_subprocess_run( + ["echo", "small output"], + capture_output=True, + text=True, + ) + assert result.returncode == 0 + assert len(result.stdout) < MAX_OUTPUT_SIZE + + def test_run_captures_called_process_error(self): + """Test CalledProcessError is handled.""" + with pytest.raises(subprocess.CalledProcessError): + safe_subprocess_run(["false"], check=True) + + def test_run_with_additional_kwargs(self): + """Test passing additional subprocess.run kwargs.""" + result = safe_subprocess_run( + ["echo", "test"], + capture_output=True, + text=True, + env={"TEST_VAR": "value"}, + ) + assert result.returncode == 0 + + @patch("subprocess.run") + def test_run_logs_execution(self, mock_run, tmp_path): + """Test that subprocess execution is logged.""" + repo = tmp_path / "repo" + repo.mkdir() + (repo / ".git").mkdir() + + mock_result = MagicMock() + mock_result.returncode = 0 + mock_result.stdout = b"test" + mock_result.stderr = b"" + mock_run.return_value = mock_result + + with patch("agentready.utils.subprocess_utils.logger") as mock_logger: + safe_subprocess_run(["test", "cmd"], cwd=repo) + mock_logger.debug.assert_called() + + def test_run_validates_git_repo_path(self, tmp_path): + """Test that git repo paths are validated.""" + # Create a git repo in forbidden location (simulate) + with patch.object(Path, "resolve") as mock_resolve: + mock_resolve.return_value = Path("/etc/evil-repo") + + repo = tmp_path / "repo" + repo.mkdir() + (repo / ".git").mkdir() + + # Should log debug and continue (validation is skipped on failure) + result = safe_subprocess_run( + ["echo", "test"], + cwd=repo, + capture_output=True, + text=True, + ) + # Should still execute since validation is only logged + assert result.returncode == 0 + + def test_run_timeout_in_kwargs(self): + """Test timeout can be passed in kwargs.""" + result = safe_subprocess_run( + ["echo", "test"], + timeout=5, + capture_output=True, + text=True, + ) + assert result.returncode == 0 + + def test_run_command_not_found(self): + """Test handling of command not found error.""" + with pytest.raises(FileNotFoundError): + safe_subprocess_run(["nonexistent-command-12345"]) + + def test_run_with_bytes_output(self): + """Test command with bytes output.""" + result = safe_subprocess_run(["echo", "test"], capture_output=True) + assert result.returncode == 0 + assert isinstance(result.stdout, bytes) + + def test_run_with_text_output(self): + """Test command with text output.""" + result = safe_subprocess_run( + ["echo", "test"], + capture_output=True, + text=True, + ) + assert result.returncode == 0 + assert isinstance(result.stdout, str) + + +class TestSecurityConstants: + """Test security constants are properly defined.""" + + def test_subprocess_timeout_defined(self): + """Test SUBPROCESS_TIMEOUT is defined.""" + assert SUBPROCESS_TIMEOUT == 120 + + def test_max_output_size_defined(self): + """Test MAX_OUTPUT_SIZE is defined.""" + assert MAX_OUTPUT_SIZE == 10_000_000 + + def test_subprocess_security_error_defined(self): + """Test SubprocessSecurityError exception exists.""" + error = SubprocessSecurityError("test") + assert isinstance(error, Exception) + assert str(error) == "test"