diff --git a/src/quant_strategy_plugins/ai_audit.py b/src/quant_strategy_plugins/ai_audit.py index 59b4e88..886107e 100644 --- a/src/quant_strategy_plugins/ai_audit.py +++ b/src/quant_strategy_plugins/ai_audit.py @@ -454,6 +454,20 @@ def _complete_with_endpoint( timeout_seconds: float, ) -> str: endpoint = endpoint.normalized() + + # Route through AiGateway when CODEX_AUDIT_SERVICE_URL is configured. + # API keys live on the VPS — no keys in plugin config needed. + gateway_url = os.environ.get("CODEX_AUDIT_SERVICE_URL", "").strip() + if gateway_url: + prompt = "\n\n".join( + f"{str(m.get('role') or 'user').upper()}:\n{str(m.get('content') or '').strip()}" + for m in messages if str(m.get("content") or "").strip() + ) + if endpoint.provider == PROVIDER_CODEX: + return _codex_via_gateway(prompt, endpoint.model, timeout_seconds) + return _llm_via_gateway(prompt, endpoint.model, endpoint.provider, timeout_seconds) + + # Fallback: direct API / subprocess calls if endpoint.provider == PROVIDER_CODEX: return _codex_exec_completion(endpoint, messages, timeout_seconds) if endpoint.provider == PROVIDER_ANTHROPIC: @@ -461,6 +475,77 @@ def _complete_with_endpoint( return _openai_compatible_chat_completion(endpoint, messages, timeout_seconds) +def _codex_via_gateway(prompt: str, model: str, timeout_seconds: float) -> str: + """Execute via AiGateway service — delegates to CodexAdapter on VPS.""" + try: + from ai_gateway_client import AiGatewayClient, GatewayConfig + config = GatewayConfig.from_env() + client = AiGatewayClient(config) + result = client.execute(prompt, mode="review_only", model=model, timeout=timeout_seconds) + if result.success: + return result.output + raise AiAuditError(result.error) + except ImportError: + return _codex_exec_direct(prompt, timeout_seconds) + except Exception as exc: + _logger.warning("ai_audit gateway codex call failed: %s; falling back to direct", exc) + return _codex_exec_direct(prompt, timeout_seconds) + + +def _llm_via_gateway(prompt: str, model: str, provider: str, timeout_seconds: float) -> str: + """Analyze via AiGateway service — delegates to LlmAdapter on VPS.""" + try: + from ai_gateway_client import AiGatewayClient, GatewayConfig + config = GatewayConfig.from_env() + client = AiGatewayClient(config) + result = client.analyze(prompt, model=model, timeout=timeout_seconds) + if result.success: + return result.output + raise AiAuditError(result.error) + except ImportError: + return _llm_direct(prompt, model, provider, timeout_seconds) + except Exception as exc: + _logger.warning("ai_audit gateway analyze call failed: %s; falling back to direct", exc) + return _llm_direct(prompt, model, provider, timeout_seconds) + + +def _llm_direct(prompt: str, model: str, provider: str, timeout_seconds: float) -> str: + """Direct API call fallback when gateway is unavailable.""" + endpoint = AiAuditEndpoint( + name="fallback", api_key="", provider=provider, + base_url="", model=model, + ).normalized() + messages: tuple[Mapping[str, str], ...] = ({"role": "user", "content": prompt},) + if provider == PROVIDER_ANTHROPIC: + return _anthropic_messages_completion(endpoint, messages, timeout_seconds) + return _openai_compatible_chat_completion(endpoint, messages, timeout_seconds) + + +def _codex_exec_direct(prompt: str, timeout_seconds: float) -> str: + """Direct codex exec fallback when gateway is unavailable.""" + with tempfile.TemporaryDirectory(prefix="qsp-ai-audit-") as temp_dir: + output_path = Path(temp_dir) / "codex-final-message.md" + command = ["codex", "exec", "--cd", temp_dir, "--output-last-message", str(output_path), "-"] + try: + result = subprocess.run( + command, input=prompt, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, + timeout=float(timeout_seconds), check=False, env=_scrubbed_codex_env(), + ) + except FileNotFoundError as exc: + raise AiAuditError("codex command was not found") from exc + except subprocess.TimeoutExpired as exc: + raise AiAuditError(f"codex command timed out after {timeout_seconds:g}s") from exc + if result.returncode != 0: + detail = _bounded_text(result.stdout or "", limit=300) + raise AiAuditError(f"codex command failed with exit code {result.returncode}: {detail}") + text = output_path.read_text(encoding="utf-8").strip() if output_path.exists() else "" + if not text: + text = str(result.stdout or "").strip() + if not text: + raise AiAuditError("codex command returned empty output") + return text + + def _scrubbed_codex_env() -> dict[str, str]: secret_markers = ("TOKEN", "SECRET", "PASSWORD", "PRIVATE_KEY", "CREDENTIAL", "API_KEY") return { @@ -673,6 +758,62 @@ def _failure_text(exc: BaseException) -> str: return _bounded_text(f"{type(exc).__name__}: {exc}", limit=300) +def _report_shadow_disagreement( + *, + audit_kind: str, + ai_verdict: str, + ai_confidence: float, + deterministic_route: str, +) -> None: + """Fire-and-forget report of AI vs deterministic disagreement to AiGateway. + + When AI shadow audit disagrees with the deterministic route, report it + so the gateway can track cumulative disagreements and auto-escalate. + Only sends if CODEX_AUDIT_SERVICE_URL is configured. + """ + import urllib.request as _ur + service_url = os.environ.get("CODEX_AUDIT_SERVICE_URL", "").strip() + if not service_url: + return + # Only report if AI disagrees (verdict is not "agree") + if ai_verdict == "agree": + return + try: + # Map audit kind to plugin name + plugin_map = { + "crisis_response_shadow": "crisis_response", + "taco_rebound_shadow": "taco_rebound", + } + plugin = plugin_map.get(audit_kind, audit_kind) + token = _env( + "ACTIONS_ID_TOKEN_REQUEST_TOKEN", + "CODEX_AUDIT_SERVICE_TOKEN", + ) or "" + if not token: + token = os.environ.get("CODEX_AUDIT_SERVICE_TOKEN", "") + if not token: + return # No auth available, skip silently + payload = json.dumps({ + "plugin": plugin, + "ai_verdict": ai_verdict, + "ai_confidence": ai_confidence, + "deterministic_route": deterministic_route, + "source_repository": os.environ.get("AI_GATEWAY_SOURCE_REPO", ""), + }).encode("utf-8") + req = _ur.Request( + f"{service_url.rstrip('/')}/v1/ai/feedback/shadow", + data=payload, method="POST", + headers={ + "Authorization": f"Bearer {token}", + "Content-Type": "application/json", + "User-Agent": "quant-strategy-plugins", + }, + ) + _ur.urlopen(req, timeout=5) + except Exception: + pass # Fire-and-forget — never block the main audit flow + + def build_disabled_ai_audit(*, audit_kind: str = "strategy_plugin") -> dict[str, Any]: return { "schema_version": AI_AUDIT_SCHEMA_VERSION, @@ -754,6 +895,16 @@ def _run_ai_audit( raw_response = client(endpoint, messages, float(timeout_seconds)) audit_response = _normalize_ai_audit_response(_extract_json_object(raw_response)) attempts.append({**endpoint.report(), "status": "ok"}) + + # Phase 3: report AI vs deterministic disagreement to AiGateway + _report_shadow_disagreement( + audit_kind=audit_kind, + ai_verdict=audit_response.get("verdict", ""), + ai_confidence=audit_response.get("confidence") or 0.0, + deterministic_route=str(deterministic_payload.get("canonical_route") or + deterministic_payload.get("suggested_action") or ""), + ) + return { **base_payload, "status": "ok",