diff --git a/Tests/Resgrid.Tests/Web/Tts/S3StorageServiceTests.cs b/Tests/Resgrid.Tests/Web/Tts/S3StorageServiceTests.cs index 7b3f3472..4bb6c000 100644 --- a/Tests/Resgrid.Tests/Web/Tts/S3StorageServiceTests.cs +++ b/Tests/Resgrid.Tests/Web/Tts/S3StorageServiceTests.cs @@ -1,6 +1,8 @@ using System; using System.Collections.Generic; using System.IO; +using System.Net; +using System.Net.Http; using System.Threading; using System.Threading.Tasks; using Amazon.S3; @@ -41,15 +43,7 @@ public async Task upload_async_should_buffer_non_seekable_stream_for_retries() return new PutObjectResponse(); }); - var service = new S3StorageService( - s3Client.Object, - Options.Create(new S3StorageOptions - { - Bucket = "tts-bucket", - AccessKey = "access-key", - SecretKey = "secret-key" - }), - Mock.Of>()); + var service = CreateService(s3Client.Object); await using var content = new NonSeekableReadStream(new byte[] { 1, 2, 3, 4 }); @@ -61,6 +55,147 @@ public async Task upload_async_should_buffer_non_seekable_stream_for_retries() s3Client.Verify(x => x.PutObjectAsync(It.IsAny(), It.IsAny()), Times.Exactly(2)); } + [Test] + public async Task upload_async_should_treat_format_exception_as_success_when_object_exists_after_upload() + { + var s3Client = new Mock(MockBehavior.Strict); + s3Client + .Setup(x => x.PutObjectAsync(It.IsAny(), It.IsAny())) + .ThrowsAsync(new FormatException("bad expiration header")); + s3Client + .Setup(x => x.GetObjectMetadataAsync(It.IsAny(), It.IsAny())) + .ReturnsAsync(new GetObjectMetadataResponse()); + + var handler = new RecordingHttpMessageHandler((_, _) => + Task.FromResult(new HttpResponseMessage(HttpStatusCode.OK))); + var service = CreateService(s3Client.Object, handler); + + await using var content = new MemoryStream(new byte[] { 9, 8, 7, 6 }, writable: false); + + await service.UploadAsync("tts/audio.wav", content, "audio/wav", CancellationToken.None); + + handler.Requests.Should().BeEmpty(); + s3Client.Verify(x => x.GetObjectMetadataAsync(It.Is(request => request.BucketName == "tts-bucket" && request.Key == "tts/audio.wav"), It.IsAny()), Times.Once); + s3Client.Verify(x => x.GetPreSignedURL(It.IsAny()), Times.Never); + } + + [Test] + public async Task upload_async_should_fall_back_to_presigned_put_when_metadata_check_times_out() + { + var s3Client = new Mock(MockBehavior.Strict); + s3Client + .Setup(x => x.PutObjectAsync(It.IsAny(), It.IsAny())) + .ThrowsAsync(new FormatException("bad expiration header")); + s3Client + .Setup(x => x.GetObjectMetadataAsync(It.IsAny(), It.IsAny())) + .ThrowsAsync(new TaskCanceledException("metadata timeout")); + s3Client + .Setup(x => x.GetPreSignedURL(It.IsAny())) + .Returns("https://upload.example.com/tts/audio.wav?signature=timeout"); + + var handler = new RecordingHttpMessageHandler(async (request, cancellationToken) => + { + var body = await request.Content!.ReadAsByteArrayAsync(cancellationToken); + body.Should().Equal(6, 7, 8, 9); + request.Method.Should().Be(HttpMethod.Put); + request.RequestUri.Should().Be(new Uri("https://upload.example.com/tts/audio.wav?signature=timeout")); + request.Content!.Headers.ContentType!.MediaType.Should().Be("audio/wav"); + + return new HttpResponseMessage(HttpStatusCode.OK); + }); + var service = CreateService(s3Client.Object, handler); + + await using var content = new MemoryStream(new byte[] { 6, 7, 8, 9 }, writable: false); + + await service.UploadAsync("tts/audio.wav", content, "audio/wav", CancellationToken.None); + + handler.Requests.Should().HaveCount(1); + s3Client.Verify(x => x.GetObjectMetadataAsync(It.IsAny(), It.IsAny()), Times.Once); + s3Client.Verify(x => x.GetPreSignedURL(It.IsAny()), Times.Once); + } + + [Test] + public async Task upload_async_should_fall_back_to_presigned_put_when_put_response_is_malformed_and_object_is_missing() + { + var s3Client = new Mock(MockBehavior.Strict); + s3Client + .Setup(x => x.PutObjectAsync(It.IsAny(), It.IsAny())) + .ThrowsAsync(new FormatException("bad expiration header")); + s3Client + .Setup(x => x.GetObjectMetadataAsync(It.IsAny(), It.IsAny())) + .ThrowsAsync(new AmazonS3Exception("missing") + { + StatusCode = HttpStatusCode.NotFound, + ErrorCode = "NoSuchKey" + }); + s3Client + .Setup(x => x.GetPreSignedURL(It.IsAny())) + .Returns(request => + { + request.BucketName.Should().Be("tts-bucket"); + request.Key.Should().Be("tts/audio.wav"); + request.Verb.Should().Be(HttpVerb.PUT); + request.ContentType.Should().Be("audio/wav"); + return "https://upload.example.com/tts/audio.wav?signature=123"; + }); + + var handler = new RecordingHttpMessageHandler(async (request, cancellationToken) => + { + var body = await request.Content!.ReadAsByteArrayAsync(cancellationToken); + body.Should().Equal(5, 4, 3, 2); + request.Method.Should().Be(HttpMethod.Put); + request.RequestUri.Should().Be(new Uri("https://upload.example.com/tts/audio.wav?signature=123")); + request.Content!.Headers.ContentType!.MediaType.Should().Be("audio/wav"); + + return new HttpResponseMessage(HttpStatusCode.OK); + }); + var service = CreateService(s3Client.Object, handler); + + await using var content = new MemoryStream(new byte[] { 5, 4, 3, 2 }, writable: false); + + await service.UploadAsync("tts/audio.wav", content, "audio/wav", CancellationToken.None); + + handler.Requests.Should().HaveCount(1); + s3Client.Verify(x => x.GetObjectMetadataAsync(It.IsAny(), It.IsAny()), Times.Once); + s3Client.Verify(x => x.GetPreSignedURL(It.IsAny()), Times.Once); + } + + private static S3StorageService CreateService(IAmazonS3 s3Client, RecordingHttpMessageHandler handler = null) + { + handler ??= new RecordingHttpMessageHandler((_, _) => Task.FromResult(new HttpResponseMessage(HttpStatusCode.OK))); + var httpClientFactory = new Mock(MockBehavior.Strict); + httpClientFactory.Setup(x => x.CreateClient(nameof(S3StorageService))).Returns(new HttpClient(handler, disposeHandler: false)); + + return new S3StorageService( + s3Client, + Options.Create(new S3StorageOptions + { + Bucket = "tts-bucket", + AccessKey = "access-key", + SecretKey = "secret-key" + }), + Mock.Of>(), + httpClientFactory.Object); + } + + private sealed class RecordingHttpMessageHandler : HttpMessageHandler + { + private readonly Func> _handler; + + public RecordingHttpMessageHandler(Func> handler) + { + _handler = handler; + } + + public List Requests { get; } = new(); + + protected override async Task SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) + { + Requests.Add(request); + return await _handler(request, cancellationToken); + } + } + private sealed class NonSeekableReadStream : Stream { private readonly MemoryStream _inner; diff --git a/Web/Resgrid.Web.Tts/Program.cs b/Web/Resgrid.Web.Tts/Program.cs index 0e8afbfa..59b72f56 100644 --- a/Web/Resgrid.Web.Tts/Program.cs +++ b/Web/Resgrid.Web.Tts/Program.cs @@ -29,6 +29,7 @@ }); builder.Services.AddEndpointsApiExplorer(); builder.Services.AddSwaggerGen(); +builder.Services.AddHttpClient(); builder.Services.AddTtsConfiguration(); builder.Services.Configure(TtsRequestIdentity.ConfigureForwardedHeaders); builder.Services.AddHealthChecks() diff --git a/Web/Resgrid.Web.Tts/Services/S3StorageService.cs b/Web/Resgrid.Web.Tts/Services/S3StorageService.cs index 05f8123c..ffde6c40 100644 --- a/Web/Resgrid.Web.Tts/Services/S3StorageService.cs +++ b/Web/Resgrid.Web.Tts/Services/S3StorageService.cs @@ -5,25 +5,30 @@ using Microsoft.Extensions.Options; using Resgrid.Web.Tts.Configuration; using System.Net; +using System.Net.Http; namespace Resgrid.Web.Tts.Services { public sealed class S3StorageService : IStorageService { private const int MaxRetryAttempts = 3; + private const int PresignedPutUrlExpiryMinutes = 5; private readonly IAmazonS3 _s3Client; private readonly S3StorageOptions _options; private readonly ILogger _logger; + private readonly IHttpClientFactory _httpClientFactory; public S3StorageService( IAmazonS3 s3Client, IOptions options, - ILogger logger) + ILogger logger, + IHttpClientFactory httpClientFactory) { _s3Client = s3Client; _options = options.Value; _logger = logger; + _httpClientFactory = httpClientFactory; } public async Task ExistsAsync(string objectKey, CancellationToken cancellationToken) @@ -64,26 +69,17 @@ public async Task UploadAsync(string objectKey, Stream content, string contentTy try { - await ExecuteWithRetryAsync( - () => - { - if (uploadContent.CanSeek) - { - uploadContent.Position = 0; - } - - return _s3Client.PutObjectAsync( - new PutObjectRequest - { - BucketName = _options.Bucket, - Key = objectKey, - InputStream = uploadContent, - ContentType = contentType - }, - cancellationToken); - }, - $"uploading {objectKey}", - cancellationToken); + try + { + await ExecuteWithRetryAsync( + () => UploadWithSdkAsync(objectKey, uploadContent, contentType, cancellationToken), + $"uploading {objectKey}", + cancellationToken); + } + catch (FormatException ex) + { + await HandleMalformedPutResponseAsync(objectKey, uploadContent, contentType, ex, cancellationToken); + } } finally { @@ -94,6 +90,163 @@ await ExecuteWithRetryAsync( } } + private async Task HandleMalformedPutResponseAsync( + string objectKey, + Stream content, + string contentType, + FormatException exception, + CancellationToken cancellationToken) + { + _logger.LogWarning( + exception, + "The S3 client could not parse the PUT response for {ObjectKey}. Checking if the object was stored before falling back to a presigned PUT upload.", + objectKey); + + if (await WasUploadPersistedAsync(objectKey, cancellationToken)) + { + _logger.LogInformation( + "Treating upload of {ObjectKey} as successful because the object exists after the response parsing failure.", + objectKey); + return; + } + + await UploadWithPresignedUrlAsync(objectKey, content, contentType, cancellationToken); + } + + private async Task WasUploadPersistedAsync(string objectKey, CancellationToken cancellationToken) + { + try + { + return await ExistsAsync(objectKey, cancellationToken); + } + catch (AmazonServiceException ex) + { + _logger.LogWarning( + ex, + "Unable to verify whether {ObjectKey} exists after the PUT response parsing failure. Falling back to a presigned PUT upload.", + objectKey); + } + catch (HttpRequestException ex) + { + _logger.LogWarning( + ex, + "Unable to verify whether {ObjectKey} exists after the PUT response parsing failure due to connectivity. Falling back to a presigned PUT upload.", + objectKey); + } + catch (TaskCanceledException ex) when (!cancellationToken.IsCancellationRequested) + { + _logger.LogWarning( + ex, + "Unable to verify whether {ObjectKey} exists after the PUT response parsing failure due to timeout. Falling back to a presigned PUT upload.", + objectKey); + } + catch (IOException ex) + { + _logger.LogWarning( + ex, + "Unable to verify whether {ObjectKey} exists after the PUT response parsing failure due to IO. Falling back to a presigned PUT upload.", + objectKey); + } + + return false; + } + + private Task UploadWithSdkAsync(string objectKey, Stream content, string contentType, CancellationToken cancellationToken) + { + if (content.CanSeek) + { + content.Position = 0; + } + + return _s3Client.PutObjectAsync( + new PutObjectRequest + { + BucketName = _options.Bucket, + Key = objectKey, + InputStream = content, + ContentType = contentType + }, + cancellationToken); + } + + private async Task UploadWithPresignedUrlAsync(string objectKey, Stream content, string contentType, CancellationToken cancellationToken) + { + var client = _httpClientFactory.CreateClient(nameof(S3StorageService)); + var payload = await ReadContentBytesAsync(content, cancellationToken); + + for (var attempt = 1; attempt <= MaxRetryAttempts; attempt++) + { + using var request = new HttpRequestMessage(HttpMethod.Put, CreatePresignedPutUrl(objectKey, contentType)); + request.Content = new ByteArrayContent(payload); + + if (!string.IsNullOrWhiteSpace(contentType)) + { + request.Content.Headers.TryAddWithoutValidation("Content-Type", contentType); + } + + try + { + using var response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken); + + if (response.IsSuccessStatusCode) + { + return; + } + + var exception = new HttpRequestException( + $"Presigned PUT upload for {objectKey} failed with status code {(int)response.StatusCode}.", + null, + response.StatusCode); + + if (attempt < MaxRetryAttempts && IsTransientStatusCode(response.StatusCode)) + { + await DelayRetryAsync($"uploading {objectKey} via presigned PUT", attempt, exception, cancellationToken); + continue; + } + + response.EnsureSuccessStatusCode(); + } + catch (HttpRequestException ex) when (attempt < MaxRetryAttempts && (!ex.StatusCode.HasValue || IsTransientStatusCode(ex.StatusCode.Value))) + { + await DelayRetryAsync($"uploading {objectKey} via presigned PUT", attempt, ex, cancellationToken); + } + catch (IOException ex) when (attempt < MaxRetryAttempts) + { + await DelayRetryAsync($"uploading {objectKey} via presigned PUT", attempt, ex, cancellationToken); + } + catch (TaskCanceledException ex) when (!cancellationToken.IsCancellationRequested && attempt < MaxRetryAttempts) + { + await DelayRetryAsync($"uploading {objectKey} via presigned PUT", attempt, ex, cancellationToken); + } + } + + throw new InvalidOperationException($"Presigned PUT upload retry loop terminated unexpectedly for {objectKey}."); + } + + private string CreatePresignedPutUrl(string objectKey, string contentType) + { + return _s3Client.GetPreSignedURL(new GetPreSignedUrlRequest + { + BucketName = _options.Bucket, + Key = objectKey, + Verb = HttpVerb.PUT, + ContentType = contentType, + Expires = DateTime.UtcNow.AddMinutes(PresignedPutUrlExpiryMinutes) + }); + } + + private static async Task ReadContentBytesAsync(Stream content, CancellationToken cancellationToken) + { + if (content.CanSeek) + { + content.Position = 0; + } + + using var memoryStream = new MemoryStream(); + await content.CopyToAsync(memoryStream, cancellationToken); + return memoryStream.ToArray(); + } + public async Task GetObjectAsync(string objectKey, CancellationToken cancellationToken) { try @@ -199,12 +352,18 @@ private async Task DelayRetryAsync(string operationName, int attempt, Exception private bool IsTransient(AmazonServiceException exception) { - return exception.StatusCode == HttpStatusCode.RequestTimeout - || (int)exception.StatusCode >= 500 + return IsTransientStatusCode(exception.StatusCode) || exception.InnerException is HttpRequestException || exception.InnerException is IOException; } + private static bool IsTransientStatusCode(HttpStatusCode statusCode) + { + return statusCode == HttpStatusCode.RequestTimeout + || statusCode == HttpStatusCode.TooManyRequests + || (int)statusCode >= 500; + } + private static bool IsNotFound(AmazonS3Exception exception) { return exception.StatusCode == HttpStatusCode.NotFound