|
| 1 | +from time import monotonic |
| 2 | +from typing import Any, override |
| 3 | + |
| 4 | +from splunklib.ai.messages import AgentResponse |
| 5 | +from splunklib.ai.middleware import ( |
| 6 | + AgentMiddleware, |
| 7 | + AgentMiddlewareHandler, |
| 8 | + AgentRequest, |
| 9 | + ModelMiddlewareHandler, |
| 10 | + ModelRequest, |
| 11 | + ModelResponse, |
| 12 | +) |
| 13 | +from splunklib.ai.structured_output import StructuredOutputGenerationException |
| 14 | + |
| 15 | +DEFAULT_TIMEOUT_SECONDS: float = 600.0 |
| 16 | +DEFAULT_STEP_LIMIT: int = 100 |
| 17 | +DEFAULT_TOKEN_LIMIT: int = 200_000 |
| 18 | +DEFAULT_STRUCTURED_OUTPUT_RETRY_LIMIT: int = 3 |
| 19 | + |
| 20 | + |
| 21 | +class AgentStopException(Exception): |
| 22 | + """Custom exception to indicate conversation stopping conditions.""" |
| 23 | + |
| 24 | + |
| 25 | +class TokenLimitExceededException(AgentStopException): |
| 26 | + """Raised by `Agent.invoke`, when token limit exceeds""" |
| 27 | + |
| 28 | + def __init__(self, token_limit: int) -> None: |
| 29 | + super().__init__(f"Token limit of {token_limit} exceeded.") |
| 30 | + |
| 31 | + |
| 32 | +class StepsLimitExceededException(AgentStopException): |
| 33 | + """Raised by `Agent.invoke`, when steps limit exceeds""" |
| 34 | + |
| 35 | + def __init__(self, steps_limit: int) -> None: |
| 36 | + super().__init__(f"Steps limit of {steps_limit} exceeded.") |
| 37 | + |
| 38 | + |
| 39 | +class TimeoutExceededException(AgentStopException): |
| 40 | + """Raised by `Agent.invoke`, when timeout exceeds""" |
| 41 | + |
| 42 | + def __init__(self, timeout_seconds: float) -> None: |
| 43 | + super().__init__(f"Timed out after {timeout_seconds} seconds.") |
| 44 | + |
| 45 | + |
| 46 | +class StructuredOutputRetryLimitExceededException(AgentStopException): |
| 47 | + """Raised by `Agent.invoke`, when structured output retry limit exceeds""" |
| 48 | + |
| 49 | + def __init__(self, retry_count: int) -> None: |
| 50 | + super().__init__(f"Structured output retry limit of {retry_count} exceeded") |
| 51 | + |
| 52 | + |
| 53 | +class TokenLimitMiddleware(AgentMiddleware): |
| 54 | + """Stops agent execution when the token count of messages passed to the model exceeds the given limit.""" |
| 55 | + |
| 56 | + _limit: int |
| 57 | + |
| 58 | + def __init__(self, limit: int) -> None: |
| 59 | + self._limit = limit |
| 60 | + |
| 61 | + @override |
| 62 | + async def model_middleware( |
| 63 | + self, |
| 64 | + request: ModelRequest, |
| 65 | + handler: ModelMiddlewareHandler, |
| 66 | + ) -> ModelResponse: |
| 67 | + if request.state.token_count >= self._limit: |
| 68 | + raise TokenLimitExceededException(token_limit=self._limit) |
| 69 | + return await handler(request) |
| 70 | + |
| 71 | + |
| 72 | +class StepLimitMiddleware(AgentMiddleware): |
| 73 | + """Stops agent execution when the number of steps taken reaches the given limit.""" |
| 74 | + |
| 75 | + _limit: int |
| 76 | + |
| 77 | + def __init__(self, limit: int) -> None: |
| 78 | + self._limit = limit |
| 79 | + |
| 80 | + @override |
| 81 | + async def model_middleware( |
| 82 | + self, |
| 83 | + request: ModelRequest, |
| 84 | + handler: ModelMiddlewareHandler, |
| 85 | + ) -> ModelResponse: |
| 86 | + if request.state.total_steps >= self._limit: |
| 87 | + raise StepsLimitExceededException(steps_limit=self._limit) |
| 88 | + return await handler(request) |
| 89 | + |
| 90 | + |
| 91 | +class TimeoutLimitMiddleware(AgentMiddleware): |
| 92 | + """Stops agent execution when wall-clock time within an invoke exceeds the given seconds. |
| 93 | +
|
| 94 | + The deadline resets on every invoke call - it measures time from the start of |
| 95 | + each invocation, not from agent construction. |
| 96 | +
|
| 97 | + Do not share instances between agents. |
| 98 | + """ |
| 99 | + |
| 100 | + _seconds: float |
| 101 | + _deadline_per_thread_id: dict[str, float] |
| 102 | + |
| 103 | + def __init__(self, seconds: float) -> None: |
| 104 | + self._seconds = seconds |
| 105 | + self._deadline_per_thread_id = {} |
| 106 | + |
| 107 | + @override |
| 108 | + async def agent_middleware( |
| 109 | + self, |
| 110 | + request: AgentRequest, |
| 111 | + handler: AgentMiddlewareHandler, |
| 112 | + ) -> AgentResponse[Any | None]: |
| 113 | + try: |
| 114 | + # Agent loop starting. |
| 115 | + self._deadline_per_thread_id[request.thread_id] = ( |
| 116 | + monotonic() + self._seconds |
| 117 | + ) |
| 118 | + return await handler(request) |
| 119 | + finally: |
| 120 | + del self._deadline_per_thread_id[request.thread_id] # don't leak memory |
| 121 | + |
| 122 | + @override |
| 123 | + async def model_middleware( |
| 124 | + self, |
| 125 | + request: ModelRequest, |
| 126 | + handler: ModelMiddlewareHandler, |
| 127 | + ) -> ModelResponse: |
| 128 | + if monotonic() >= self._deadline_per_thread_id[request.state.thread_id]: |
| 129 | + raise TimeoutExceededException(timeout_seconds=self._seconds) |
| 130 | + return await handler(request) |
| 131 | + |
| 132 | + |
| 133 | +class StructuredOutputRetryLimitMiddleware(AgentMiddleware): |
| 134 | + """Stops agent execution when the agent exceeds structured output |
| 135 | + retry limit during a single agent loop invocation. |
| 136 | + """ |
| 137 | + |
| 138 | + _limit: int |
| 139 | + _retries_per_thread_id: dict[str, int] |
| 140 | + |
| 141 | + def __init__(self, limit: int) -> None: |
| 142 | + self._limit = limit |
| 143 | + self._retries_per_thread_id = {} |
| 144 | + |
| 145 | + @override |
| 146 | + async def agent_middleware( |
| 147 | + self, |
| 148 | + request: AgentRequest, |
| 149 | + handler: AgentMiddlewareHandler, |
| 150 | + ) -> AgentResponse[Any | None]: |
| 151 | + try: |
| 152 | + # Agent loop starting. |
| 153 | + self._retries_per_thread_id[request.thread_id] = 0 |
| 154 | + return await handler(request) |
| 155 | + finally: |
| 156 | + del self._retries_per_thread_id[request.thread_id] # don't leak memory |
| 157 | + |
| 158 | + @override |
| 159 | + async def model_middleware( |
| 160 | + self, |
| 161 | + request: ModelRequest, |
| 162 | + handler: ModelMiddlewareHandler, |
| 163 | + ) -> ModelResponse: |
| 164 | + try: |
| 165 | + return await handler(request) |
| 166 | + except StructuredOutputGenerationException: |
| 167 | + self._retries_per_thread_id[request.state.thread_id] += 1 |
| 168 | + if self._retries_per_thread_id[request.state.thread_id] > self._limit: |
| 169 | + raise StructuredOutputRetryLimitExceededException(self._limit) |
| 170 | + raise # re-raise, to retry structured output generation |
0 commit comments