Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces significant architectural and functional upgrades to the Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces significant new features, including multimodal model support, a comprehensive FastAPI-based API server, and a completely new Next.js frontend. However, the new backend API implementation contains critical security vulnerabilities, specifically multiple path traversal vulnerabilities in the file management API and a remote code execution (RCE) vector via the MCP server configuration endpoint, exacerbated by a lack of authentication. Other issues include an insecure CORS configuration and architectural patterns that may limit scalability. Immediate remediation of these security and architectural concerns is required for a robust and secure application.
| def resolve_root_dir(root_dir: Optional[str], session_id: Optional[str] = None) -> str: | ||
| """ | ||
| Resolve optional root_dir to an absolute normalized path within allowed roots. | ||
| Default: output_dir | ||
| Supports: | ||
| - None/"" => output_dir | ||
| - "output", "projects", "projects/xxx" | ||
| - absolute path (must still be under allowed roots) | ||
| """ | ||
| if session_id: | ||
| session_root = get_session_root(session_id) | ||
| return str(session_root.resolve()) | ||
|
|
||
| _, output_dir, projects_dir = get_allowed_roots() | ||
|
|
||
| if not root_dir or root_dir.strip() == '': | ||
| resolved = output_dir | ||
| else: | ||
| rd = root_dir.strip() | ||
|
|
||
| if os.path.isabs(rd): | ||
| resolved = rd | ||
| else: | ||
| # Allow explicit "output"/"projects" | ||
| if rd in ('output', 'output/'): | ||
| resolved = output_dir | ||
| elif rd in ('projects', 'projects/'): | ||
| resolved = projects_dir | ||
| else: | ||
| cand1 = os.path.join(output_dir, rd) | ||
| cand2 = os.path.join(projects_dir, rd) | ||
| # choose existing one if possible, otherwise default to cand1 | ||
| resolved = cand1 if os.path.exists(cand1) else ( | ||
| cand2 if os.path.exists(cand2) else cand1) | ||
|
|
||
| resolved = os.path.normpath(os.path.abspath(resolved)) | ||
|
|
||
| # TODO: Security check: ensure `resolved` is within configured allowed roots. | ||
|
|
||
| return resolved | ||
|
|
||
|
|
||
| def resolve_file_path(root_dir_abs: str, file_path: str) -> str: | ||
| """ | ||
| Resolve file_path against root_dir_abs. | ||
| - if file_path starts with 'projects/', resolve from ms-agent base dir | ||
| - if file_path is absolute, use as-is | ||
| - if relative, join(root_dir_abs, file_path) | ||
| """ | ||
| root_dir_abs = os.path.normpath(os.path.abspath(root_dir_abs)) | ||
|
|
||
| if os.path.isabs(file_path): | ||
| full_path = os.path.normpath(os.path.abspath(file_path)) | ||
| elif file_path.startswith('projects/'): | ||
| # Special case: if path starts with 'projects/', resolve from base_dir | ||
| base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | ||
| full_path = os.path.normpath( | ||
| os.path.abspath(os.path.join(base_dir, file_path))) | ||
| else: | ||
| # Try multiple locations | ||
| base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | ||
|
|
||
| candidates = [ | ||
| # First try with root_dir_abs (for session-based access) | ||
| os.path.join(root_dir_abs, file_path), | ||
| ] | ||
|
|
||
| # Search in project output directories | ||
| projects_dir = os.path.join(base_dir, 'projects') | ||
| if os.path.exists(projects_dir): | ||
| try: | ||
| for project_name in os.listdir(projects_dir): | ||
| project_path = os.path.join(projects_dir, project_name) | ||
| if os.path.isdir(project_path): | ||
| candidates.append( | ||
| os.path.join(project_path, 'output', file_path)) | ||
| except (OSError, PermissionError): | ||
| pass | ||
|
|
||
| # Find first existing file | ||
| full_path = None | ||
| for candidate in candidates: | ||
| candidate = os.path.normpath(candidate) | ||
| if os.path.exists(candidate) and os.path.isfile(candidate): | ||
| full_path = candidate | ||
| break | ||
|
|
||
| if not full_path: | ||
| # Default to first candidate if none found | ||
| full_path = os.path.normpath(candidates[0]) | ||
|
|
||
| # TODO: Security check: ensure `full_path` is within configured allowed roots. | ||
|
|
||
| return full_path |
There was a problem hiding this comment.
The resolve_root_dir and resolve_file_path functions in api/files.py are critically vulnerable to path traversal. The current implementation allows absolute paths, enabling an attacker to access arbitrary files on the system. os.path.abspath is insufficient for prevention. To remediate, disallow absolute paths in user input and strictly validate that the final resolved path is within the intended base directory, for example, by using os.path.realpath with startswith or os.path.commonpath.
| @router.post("/mcp/servers") | ||
| async def add_mcp_server(server: MCPServerConfig): | ||
| """Add a new MCP server""" | ||
| try: | ||
| success = config_manager.add_mcp_server( | ||
| server.name, | ||
| server.dict(exclude={'name'}) | ||
| ) | ||
| if not success: | ||
| raise HTTPException(status_code=500, detail="Failed to add server") | ||
|
|
||
| return APIResponse( | ||
| success=True, | ||
| message="MCP server added successfully" | ||
| ) | ||
| except Exception as e: | ||
| raise HTTPException(status_code=500, detail=str(e)) | ||
|
|
There was a problem hiding this comment.
The POST /api/v1/config/mcp/servers endpoint allows users to add arbitrary MCP server configurations. For servers of type stdio, this includes specifying a command and args that will be executed by the system when an agent is started. Since the API lacks authentication by default, an unauthenticated attacker can achieve Remote Code Execution (RCE) by configuring a malicious command (e.g., rm -rf / or a reverse shell).
To remediate this, you must:
- Implement strong authentication and authorization for all configuration endpoints.
- Restrict the
commandfield to a strict allow-list of safe executables if possible.
| def get_session_root(session_id: str) -> Path: | ||
| """Get the work directory for a session""" | ||
| if not session_id or not str(session_id).strip(): | ||
| raise HTTPException(status_code=400, detail='session_id is required') | ||
|
|
||
| # Get the API root directory | ||
| api_root = Path(__file__).resolve().parent | ||
| work_dir = (api_root / 'work_dir' / str(session_id)).resolve() | ||
| work_dir.mkdir(parents=True, exist_ok=True) | ||
| return work_dir |
There was a problem hiding this comment.
The get_session_root function uses the user-supplied session_id to construct a file path without sanitization. An attacker could provide a session_id like ../../ to escape the work_dir and access or create directories elsewhere on the filesystem.
To remediate this, sanitize the session_id to ensure it only contains alphanumeric characters or use a UUID validation check.
| router = APIRouter(prefix="/api/v1/agent", tags=["agent"]) | ||
|
|
||
| # Store running agent executors | ||
| running_agents: Dict[str, AgentExecutor] = {} |
There was a problem hiding this comment.
The use of a global in-memory dictionary running_agents to store the state of active agent executors introduces a significant architectural limitation. This approach will not work correctly in a multi-worker production environment (e.g., when using Gunicorn with multiple workers), as each worker process will have its own separate memory space.
This will lead to issues such as:
- Requests for the same session being routed to different workers that don't have the agent's state.
- Inability to scale horizontally by adding more workers or nodes.
- Loss of all running agent states if a worker process restarts.
To address this, consider using a shared state management solution like Redis or a database to store and coordinate the state of running agents across all workers.
| # CORS configuration | ||
| app.add_middleware( | ||
| CORSMiddleware, | ||
| allow_origins=['*'], # In production, specify actual origins | ||
| allow_credentials=True, | ||
| allow_methods=['*'], | ||
| allow_headers=['*'], | ||
| ) |
There was a problem hiding this comment.
The CORS configuration in api/main.py is insecure. Allowing all origins (*) while also enabling allow_credentials=True poses a security risk, as it permits any website to make authenticated requests to your API. This configuration may also be rejected by modern browsers. It is critical to restrict allow_origins to a specific list of trusted domains or use a regular expression for remediation before deployment.
allow_origins=['https://your-frontend-domain.com'], # TODO: Replace with your actual frontend domain(s) in production| except Exception: | ||
| pass |
There was a problem hiding this comment.
Using a bare except Exception: pass is generally discouraged as it can silently swallow all exceptions, including unexpected ones like KeyboardInterrupt or SystemExit. This makes debugging very difficult if an error occurs while loading the mcp_servers.json file (e.g., due to a JSON syntax error or file permission issue).
It's better to either catch a more specific exception (like json.JSONDecodeError or IOError) or at least log the exception that was caught.
| except Exception: | |
| pass | |
| except Exception as e: | |
| # It's better to log this error to help with debugging configuration issues. | |
| # Consider using the logging module here. | |
| print(f"Warning: Failed to load or parse MCP server config from {self.mcp_file}: {e}") |
| if not os.path.abspath(full_path).startswith(os.path.abspath(project['path'])): | ||
| raise HTTPException(status_code=403, detail="Access denied") |
There was a problem hiding this comment.
The security check to prevent path traversal is a good addition. However, using os.path.abspath is not fully secure against attacks involving symbolic links. A malicious user could create a symlink within the project directory that points to a sensitive location outside of it (e.g., symlink -> /).
To make this check more robust, you should use os.path.realpath to resolve any symbolic links in the path before comparing it with the base project path. This ensures you are always comparing canonical paths.
| if not os.path.abspath(full_path).startswith(os.path.abspath(project['path'])): | |
| raise HTTPException(status_code=403, detail="Access denied") | |
| if not os.path.realpath(full_path).startswith(os.path.realpath(project['path'])): |
No description provided.