diff --git a/src/Elastic.Documentation.ServiceDefaults/AppDefaultsExtensions.cs b/src/Elastic.Documentation.ServiceDefaults/AppDefaultsExtensions.cs index c0ae33057..701f62dd0 100644 --- a/src/Elastic.Documentation.ServiceDefaults/AppDefaultsExtensions.cs +++ b/src/Elastic.Documentation.ServiceDefaults/AppDefaultsExtensions.cs @@ -13,6 +13,7 @@ using Microsoft.Extensions.DependencyInjection.Extensions; using Microsoft.Extensions.Diagnostics.HealthChecks; using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Http.Resilience; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Console; @@ -36,7 +37,10 @@ public static TBuilder AddDocumentationServiceDefaults( .AddElasticDocumentationLogging(cliOptions.LogLevel) .ConfigureHttpClientDefaults(http => { - _ = http.AddStandardResilienceHandler(); + _ = http.AddStandardResilienceHandler(options => + { + options.Retry.DisableForUnsafeHttpMethods(); + }); }) .AddConfigurationFileProvider(cliOptions.SkipPrivateRepositories, cliOptions.ConfigSource, (s, p) => { diff --git a/src/api/Elastic.Documentation.Mcp.Remote/Program.cs b/src/api/Elastic.Documentation.Mcp.Remote/Program.cs index 50ef52e55..e54103e95 100644 --- a/src/api/Elastic.Documentation.Mcp.Remote/Program.cs +++ b/src/api/Elastic.Documentation.Mcp.Remote/Program.cs @@ -51,17 +51,18 @@ profile.RegisterAllServices(builder.Services); // CreateSlimBuilder disables reflection-based JSON serialization. - // The MCP SDK's legacy SSE handler uses Results.BadRequest(string) which needs - // ASP.NET Core's HTTP JSON options to have type metadata for System.String. + // McpJsonUtilities registers System.String so the SDK's error responses can serialize. _ = builder.Services.ConfigureHttpJsonOptions(options => { options.SerializerOptions.TypeInfoResolverChain.Insert(0, McpJsonUtilities.DefaultOptions.TypeInfoResolver!); }); - // Stateless mode: no Mcp-Session-Id header is issued or expected, which avoids a known - // Cursor bug where it opens the SSE stream without the session header and receives 400. - // Stateless mode is appropriate here because all tools are pure request/response (no - // server-initiated push) and the server runs behind a load balancer without session affinity. + // Stateless Streamable HTTP transport: each request is an independent POST / — no session + // affinity, no Mcp-Session-Id header, no server-initiated push (sampling/elicitation/roots). + // This is the correct posture for a load-balanced service whose tools are pure request/response. + // In SDK 1.4+, stateless and SSE are mutually exclusive; EnableLegacySse (default false) + // cannot be combined with Stateless = true. SSE-only clients should use the mcp-remote bridge: + // npx -y mcp-remote https:///docs/_mcp var mcpBuilder = builder.Services .AddMcpServer(options => options.ServerInstructions = profile.ComposeServerInstructions()) .WithHttpTransport(o => o.Stateless = true); @@ -92,7 +93,6 @@ })); _ = app.UseMiddleware(); - _ = app.UseMiddleware(); var mcpPrefix = SystemEnvironmentVariables.Instance.McpPrefix; var mcp = app.MapGroup(mcpPrefix); diff --git a/src/api/Elastic.Documentation.Mcp.Remote/SseKeepAliveMiddleware.cs b/src/api/Elastic.Documentation.Mcp.Remote/SseKeepAliveMiddleware.cs deleted file mode 100644 index 67d30000f..000000000 --- a/src/api/Elastic.Documentation.Mcp.Remote/SseKeepAliveMiddleware.cs +++ /dev/null @@ -1,284 +0,0 @@ -// Licensed to Elasticsearch B.V under one or more agreements. -// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. -// See the LICENSE file in the project root for more information - -using System.Text; -using Elastic.Documentation.Configuration; - -namespace Elastic.Documentation.Mcp.Remote; - -/// -/// Middleware that sends periodic SSE keepalive comments on text/event-stream responses -/// to prevent clients (notably Cursor) from timing out idle SSE connections. -/// -public class SseKeepAliveMiddleware(RequestDelegate next, ILogger logger) -{ - private static readonly TimeSpan KeepAliveInterval = TimeSpan.FromSeconds(5); - - public async Task InvokeAsync(HttpContext context) - { - if (!context.Request.Path.StartsWithSegments(SystemEnvironmentVariables.Instance.McpPrefix, StringComparison.OrdinalIgnoreCase)) - { - await next(context); - return; - } - - var originalBody = context.Response.Body; - await using var wrapper = new SseKeepAliveStream(originalBody, KeepAliveInterval, logger); - context.Response.Body = wrapper; - - context.Response.OnStarting(() => - { - if (context.Response.ContentType?.Contains("text/event-stream", StringComparison.OrdinalIgnoreCase) == true) - wrapper.StartKeepAlive(context.RequestAborted); - - return Task.CompletedTask; - }); - - try - { - await next(context); - } - finally - { - await wrapper.StopKeepAlive(); - context.Response.Body = originalBody; - } - } -} - -/// -/// Stream wrapper that periodically writes SSE comment lines (: keepalive\n\n) -/// when the underlying stream is idle, preventing between-bytes timeouts. -/// -internal sealed class SseKeepAliveStream(Stream inner, TimeSpan interval, ILogger logger) : Stream -{ - private static readonly byte[] KeepAliveBytes = ": keepalive\n\n"u8.ToArray(); - - // Used as an async mutex to synchronize writes between the MCP SDK and the keepalive timer - private readonly SemaphoreSlim _writeLock = new(1, 1); - - private PeriodicTimer? _timer; - private CancellationTokenSource? _cts; - private Task? _keepAliveTask; - private long _lastWriteTicks = Environment.TickCount64; - - /// Starts the periodic keepalive task, linked to the request's cancellation token. - public void StartKeepAlive(CancellationToken requestAborted) - { - _cts = CancellationTokenSource.CreateLinkedTokenSource(requestAborted); - _timer = new PeriodicTimer(interval); - _keepAliveTask = RunKeepAlive(_cts.Token); - logger.LogDebug("SSE keepalive started with {Interval}s interval", interval.TotalSeconds); - } - - /// Signals the keepalive task to stop and awaits its completion. Safe to call multiple times. - public async Task StopKeepAlive() - { - var cts = Interlocked.Exchange(ref _cts, null); - if (cts is null) - return; - - await cts.CancelAsync(); - - if (_keepAliveTask is not null) - { - try - { - await _keepAliveTask; - } - catch (OperationCanceledException) - { - // Expected on cancellation - } - } - - _timer?.Dispose(); - cts.Dispose(); - - logger.LogDebug("SSE keepalive stopped"); - } - - private bool IsKeepAliveActive => _keepAliveTask is not null; - - private async Task RunKeepAlive(CancellationToken ct) - { - try - { - while (await _timer!.WaitForNextTickAsync(ct)) - { - var elapsed = Environment.TickCount64 - Interlocked.Read(ref _lastWriteTicks); - if (elapsed < interval.TotalMilliseconds) - continue; - - await _writeLock.WaitAsync(ct); - try - { - await inner.WriteAsync(KeepAliveBytes, ct); - await inner.FlushAsync(ct); - _ = Interlocked.Exchange(ref _lastWriteTicks, Environment.TickCount64); - } - catch (ObjectDisposedException) - { - break; - } - catch (IOException) - { - break; - } - finally - { - _ = _writeLock.Release(); - } - } - } - catch (OperationCanceledException) when (ct.IsCancellationRequested) - { - // Normal shutdown - } - } - - public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) - { - if (!IsKeepAliveActive) - { - await inner.WriteAsync(buffer.AsMemory(offset, count), cancellationToken); - return; - } - - await _writeLock.WaitAsync(cancellationToken); - try - { - await inner.WriteAsync(buffer.AsMemory(offset, count), cancellationToken); - _ = Interlocked.Exchange(ref _lastWriteTicks, Environment.TickCount64); - } - finally - { - _ = _writeLock.Release(); - } - } - - public override async ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) - { - if (!IsKeepAliveActive) - { - await inner.WriteAsync(buffer, cancellationToken); - return; - } - - await _writeLock.WaitAsync(cancellationToken); - try - { - await inner.WriteAsync(buffer, cancellationToken); - _ = Interlocked.Exchange(ref _lastWriteTicks, Environment.TickCount64); - } - finally - { - _ = _writeLock.Release(); - } - } - - public override void Write(byte[] buffer, int offset, int count) - { - if (!IsKeepAliveActive) - { - inner.Write(buffer, offset, count); - return; - } - - // SSE streams should always use async writes; blocking here risks threadpool starvation - throw new NotSupportedException("Synchronous writes are not supported on active SSE keepalive streams."); - } - - public override async Task FlushAsync(CancellationToken cancellationToken) - { - if (!IsKeepAliveActive) - { - await inner.FlushAsync(cancellationToken); - return; - } - - await _writeLock.WaitAsync(cancellationToken); - try - { - await inner.FlushAsync(cancellationToken); - } - finally - { - _ = _writeLock.Release(); - } - } - - public override void Flush() - { - if (!IsKeepAliveActive) - { - inner.Flush(); - return; - } - - throw new NotSupportedException("Synchronous flush is not supported on active SSE keepalive streams."); - } - - public override int Read(byte[] buffer, int offset, int count) => inner.Read(buffer, offset, count); - - public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => - inner.ReadAsync(buffer, offset, count, cancellationToken); - - public override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) => - inner.ReadAsync(buffer, cancellationToken); - - public override long Seek(long offset, SeekOrigin origin) => inner.Seek(offset, origin); - - public override void SetLength(long value) => inner.SetLength(value); - - public override bool CanRead => inner.CanRead; - - public override bool CanSeek => inner.CanSeek; - - public override bool CanWrite => inner.CanWrite; - - public override long Length => inner.Length; - - public override long Position - { - get => inner.Position; - set => inner.Position = value; - } - - public override async ValueTask DisposeAsync() - { - await StopKeepAlive(); - _writeLock.Dispose(); - - await base.DisposeAsync(); - } - - protected override void Dispose(bool disposing) - { - if (disposing) - { - // Signal the background task to stop before disposing resources - var cts = Interlocked.Exchange(ref _cts, null); - if (cts is not null) - { - cts.Cancel(); - try - { - _keepAliveTask?.GetAwaiter().GetResult(); - } - catch (OperationCanceledException) - { - // Expected on cancellation - } - - _timer?.Dispose(); - cts.Dispose(); - } - - _writeLock.Dispose(); - } - - base.Dispose(disposing); - } -}