Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,24 @@
using System.Threading;
using System.Threading.Tasks;

namespace System.Net
namespace System
{
// Static-abstract adapter that lets a single generic implementation serve both
// synchronous and asynchronous code paths. The sync entry point passes
// SyncReadWriteAdapter and asserts the returned ValueTask is already completed;
// the async entry point passes AsyncReadWriteAdapter.
//
// Originally used by SslStream and SmtpClient for networking I/O deduplication.
// Extended for System.Formats.Tar (CopyToAsync, ReadExactlyAsync, DisposeAsync)
// to unify TarReader/TarWriter/GnuSparseStream sync/async pairs.
internal interface IReadWriteAdapter
Comment thread
alinpahontu2912 marked this conversation as resolved.
{
static abstract ValueTask<int> ReadAsync(Stream stream, Memory<byte> buffer, CancellationToken cancellationToken);
Comment thread
alinpahontu2912 marked this conversation as resolved.
static abstract ValueTask<int> ReadAtLeastAsync(Stream stream, Memory<byte> buffer, int minimumBytes, bool throwOnEndOfStream, CancellationToken cancellationToken);
static abstract ValueTask ReadExactlyAsync(Stream stream, Memory<byte> buffer, CancellationToken cancellationToken);
static abstract ValueTask WriteAsync(Stream stream, ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken);
static abstract ValueTask CopyToAsync(Stream source, Stream destination, CancellationToken cancellationToken);
static abstract ValueTask DisposeAsync(Stream stream);
static abstract Task FlushAsync(Stream stream, CancellationToken cancellationToken);
static abstract Task WaitAsync(TaskCompletionSource<bool> waiter);
static abstract Task WaitAsync(Task task);
Expand All @@ -26,9 +37,17 @@ public static ValueTask<int> ReadAsync(Stream stream, Memory<byte> buffer, Cance
public static ValueTask<int> ReadAtLeastAsync(Stream stream, Memory<byte> buffer, int minimumBytes, bool throwOnEndOfStream, CancellationToken cancellationToken) =>
stream.ReadAtLeastAsync(buffer, minimumBytes, throwOnEndOfStream, cancellationToken);

public static ValueTask ReadExactlyAsync(Stream stream, Memory<byte> buffer, CancellationToken cancellationToken) =>
stream.ReadExactlyAsync(buffer, cancellationToken);

public static ValueTask WriteAsync(Stream stream, ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken) =>
stream.WriteAsync(buffer, cancellationToken);

public static ValueTask CopyToAsync(Stream source, Stream destination, CancellationToken cancellationToken) =>
new ValueTask(source.CopyToAsync(destination, cancellationToken));

public static ValueTask DisposeAsync(Stream stream) => stream.DisposeAsync();

public static Task FlushAsync(Stream stream, CancellationToken cancellationToken) => stream.FlushAsync(cancellationToken);

public static Task WaitAsync(TaskCompletionSource<bool> waiter) => waiter.Task;
Expand All @@ -44,12 +63,30 @@ public static ValueTask<int> ReadAsync(Stream stream, Memory<byte> buffer, Cance
public static ValueTask<int> ReadAtLeastAsync(Stream stream, Memory<byte> buffer, int minimumBytes, bool throwOnEndOfStream, CancellationToken cancellationToken) =>
new ValueTask<int>(stream.ReadAtLeast(buffer.Span, minimumBytes, throwOnEndOfStream));

public static ValueTask ReadExactlyAsync(Stream stream, Memory<byte> buffer, CancellationToken cancellationToken)
{
stream.ReadExactly(buffer.Span);
return default;
}

public static ValueTask WriteAsync(Stream stream, ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken)
{
stream.Write(buffer.Span);
return default;
}

public static ValueTask CopyToAsync(Stream source, Stream destination, CancellationToken cancellationToken)
{
source.CopyTo(destination);
return default;
}

public static ValueTask DisposeAsync(Stream stream)
{
stream.Dispose();
return default;
}

public static Task FlushAsync(Stream stream, CancellationToken cancellationToken)
{
stream.Flush();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
<Compile Include="System\Formats\Tar\TarWriter.cs" />
<Compile Include="System\Formats\Tar\TarWriterOptions.cs" />
<Compile Include="System\Formats\Tar\GnuSparseStream.cs" />
<Compile Include="$(CommonPath)System\ReadWriteAdapter.cs" Link="Common\System\ReadWriteAdapter.cs" />
<Compile Include="$(CommonPath)DisableRuntimeMarshalling.cs" Link="Common\DisableRuntimeMarshalling.cs" />
<Compile Include="$(CommonPath)System\IO\SubReadStream.cs" Link="Common\System\IO\SubReadStream.cs" />
<Compile Include="$(CommonPath)System\IO\Archiving.Utils.cs" Link="Common\System\IO\Archiving.Utils.cs" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,25 +54,15 @@ internal GnuSparseStream(Stream rawStream, long realSize)

// Parses the sparse map on first read. Populates _segments, _packedStartOffsets,
// and _dataStart. Throws InvalidDataException if the sparse map is malformed.
private void EnsureInitialized()
private async ValueTask EnsureInitializedCoreAsync<TAdapter>(CancellationToken cancellationToken)
where TAdapter : IReadWriteAdapter
{
if (_segments is not null)
{
return;
}

var segments = ParseSparseMap(isAsync: false, _rawStream, CancellationToken.None).GetAwaiter().GetResult();
InitializeFromParsedMap(segments);
}

private async ValueTask EnsureInitializedAsync(CancellationToken cancellationToken)
{
if (_segments is not null)
{
return;
}

var segments = await ParseSparseMap(isAsync: true, _rawStream, cancellationToken).ConfigureAwait(false);
var segments = await ParseSparseMapCoreAsync<TAdapter>(_rawStream, cancellationToken).ConfigureAwait(false);
InitializeFromParsedMap(segments);
}

Expand Down Expand Up @@ -176,48 +166,25 @@ public override int Read(byte[] buffer, int offset, int count)
public override int Read(Span<byte> destination)
{
ThrowIfDisposed();
EnsureInitialized();
Debug.Assert(_segments is not null && _packedStartOffsets is not null);

if (destination.IsEmpty || _virtualPosition >= _realSize)
{
return 0;
}

int toRead = (int)Math.Min(destination.Length, _realSize - _virtualPosition);
destination = destination.Slice(0, toRead);

int totalFilled = 0;
while (totalFilled < toRead)
byte[] rented = ArrayPool<byte>.Shared.Rent(toRead);
try
{
long vPos = _virtualPosition + totalFilled;
int segIdx = FindSegmentFromCurrent(vPos);

if (segIdx < 0)
{
// vPos is in a sparse hole — fill with zeros until the next segment or end of file.
long nextSegStart = ~segIdx < _segments.Length ? _segments[~segIdx].Offset : _realSize;
int zeroCount = (int)Math.Min(toRead - totalFilled, nextSegStart - vPos);
destination.Slice(totalFilled, zeroCount).Clear();
totalFilled += zeroCount;
}
else
{
// vPos is within segment segIdx — read from packed data.
var (segOffset, segLength) = _segments[segIdx];
long offsetInSeg = vPos - segOffset;
long remainingInSeg = segLength - offsetInSeg;
int countToRead = (int)Math.Min(toRead - totalFilled, remainingInSeg);

long packedOffset = _packedStartOffsets[segIdx] + offsetInSeg;
int bytesRead = ReadFromPackedData(destination.Slice(totalFilled, countToRead), packedOffset);
totalFilled += bytesRead;
break; // Return after an underlying read; caller can call Read again for more.
}
ValueTask<int> vt = ReadCoreAsync<SyncReadWriteAdapter>(rented.AsMemory(0, toRead), CancellationToken.None);
Debug.Assert(vt.IsCompleted, "Synchronous Read completed asynchronously.");
int bytesRead = vt.GetAwaiter().GetResult();
rented.AsSpan(0, bytesRead).CopyTo(destination);
return bytesRead;
Comment thread
alinpahontu2912 marked this conversation as resolved.
}
finally
{
ArrayPool<byte>.Shared.Return(rented);
}

_virtualPosition += totalFilled;
return totalFilled;
}

public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
Expand All @@ -241,12 +208,13 @@ public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken
{
return ValueTask.FromResult(0);
}
return ReadAsyncCore(buffer, cancellationToken);
return ReadCoreAsync<AsyncReadWriteAdapter>(buffer, cancellationToken);
}

private async ValueTask<int> ReadAsyncCore(Memory<byte> buffer, CancellationToken cancellationToken)
private async ValueTask<int> ReadCoreAsync<TAdapter>(Memory<byte> buffer, CancellationToken cancellationToken)
where TAdapter : IReadWriteAdapter
{
await EnsureInitializedAsync(cancellationToken).ConfigureAwait(false);
await EnsureInitializedCoreAsync<TAdapter>(cancellationToken).ConfigureAwait(false);
Debug.Assert(_segments is not null && _packedStartOffsets is not null);

int toRead = (int)Math.Min(buffer.Length, _realSize - _virtualPosition);
Expand All @@ -273,7 +241,7 @@ private async ValueTask<int> ReadAsyncCore(Memory<byte> buffer, CancellationToke
int countToRead = (int)Math.Min(toRead - totalFilled, remainingInSeg);

long packedOffset = _packedStartOffsets[segIdx] + offsetInSeg;
int bytesRead = await ReadFromPackedDataAsync(buffer.Slice(totalFilled, countToRead), packedOffset, cancellationToken).ConfigureAwait(false);
int bytesRead = await ReadFromPackedDataCoreAsync<TAdapter>(buffer.Slice(totalFilled, countToRead), packedOffset, cancellationToken).ConfigureAwait(false);
totalFilled += bytesRead;
break;
}
Expand All @@ -297,56 +265,27 @@ private async ValueTask<int> ReadAsyncCore(Memory<byte> buffer, CancellationToke
// the entry's real (expanded) size.
internal void CopyPopulatedDataTo(FileStream destination)
{
ThrowIfDisposed();
EnsureInitialized();
Debug.Assert(_segments is not null && _packedStartOffsets is not null);

byte[] buffer = ArrayPool<byte>.Shared.Rent(81920);
try
{
for (int i = 0; i < _segments.Length; i++)
{
(long virtualOffset, long segmentLength) = _segments[i];
if (segmentLength == 0)
{
continue;
}

destination.Position = virtualOffset;
long written = 0;
while (written < segmentLength)
{
int toRead = (int)Math.Min(segmentLength - written, buffer.Length);
int bytesRead = ReadFromPackedData(buffer.AsSpan(0, toRead), _packedStartOffsets[i] + written);
if (bytesRead == 0)
{
throw new EndOfStreamException();
}
destination.Write(buffer, 0, bytesRead);
written += bytesRead;
}
}
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
ValueTask vt = CopyPopulatedDataToCoreAsync<SyncReadWriteAdapter>(destination, CancellationToken.None);
Debug.Assert(vt.IsCompleted, "Synchronous CopyPopulatedDataTo completed asynchronously.");
vt.GetAwaiter().GetResult();
}

// Extend the destination to the full real size so any trailing hole is materialized
// (as an unallocated extent on sparse-capable file systems, or as zeros otherwise).
if (destination.Length < _realSize)
// Async counterpart to CopyPopulatedDataTo.
internal ValueTask CopyPopulatedDataToAsync(FileStream destination, CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
destination.SetLength(_realSize);
return ValueTask.FromCanceled(cancellationToken);
}

_virtualPosition = _realSize;
return CopyPopulatedDataToCoreAsync<AsyncReadWriteAdapter>(destination, cancellationToken);
}

// Async counterpart to CopyPopulatedDataTo.
internal async ValueTask CopyPopulatedDataToAsync(FileStream destination, CancellationToken cancellationToken)
private async ValueTask CopyPopulatedDataToCoreAsync<TAdapter>(FileStream destination, CancellationToken cancellationToken)
where TAdapter : IReadWriteAdapter
{
ThrowIfDisposed();
await EnsureInitializedAsync(cancellationToken).ConfigureAwait(false);
await EnsureInitializedCoreAsync<TAdapter>(cancellationToken).ConfigureAwait(false);
Debug.Assert(_segments is not null && _packedStartOffsets is not null);

byte[] buffer = ArrayPool<byte>.Shared.Rent(81920);
Expand All @@ -365,12 +304,12 @@ internal async ValueTask CopyPopulatedDataToAsync(FileStream destination, Cancel
while (written < segmentLength)
{
int toRead = (int)Math.Min(segmentLength - written, buffer.Length);
int bytesRead = await ReadFromPackedDataAsync(buffer.AsMemory(0, toRead), _packedStartOffsets[i] + written, cancellationToken).ConfigureAwait(false);
int bytesRead = await ReadFromPackedDataCoreAsync<TAdapter>(buffer.AsMemory(0, toRead), _packedStartOffsets[i] + written, cancellationToken).ConfigureAwait(false);
if (bytesRead == 0)
{
throw new EndOfStreamException();
}
await destination.WriteAsync(buffer.AsMemory(0, bytesRead), cancellationToken).ConfigureAwait(false);
await TAdapter.WriteAsync(destination, buffer.AsMemory(0, bytesRead), cancellationToken).ConfigureAwait(false);
written += bytesRead;
}
}
Expand All @@ -380,6 +319,8 @@ internal async ValueTask CopyPopulatedDataToAsync(FileStream destination, Cancel
ArrayPool<byte>.Shared.Return(buffer);
}

// Extend the destination to the full real size so any trailing hole is materialized
// (as an unallocated extent on sparse-capable file systems, or as zeros otherwise).
if (destination.Length < _realSize)
{
destination.SetLength(_realSize);
Expand All @@ -392,23 +333,8 @@ internal async ValueTask CopyPopulatedDataToAsync(FileStream destination, Cancel
// After EnsureInitialized, the raw stream is positioned at _dataStart and
// _nextPackedOffset tracks how far into the packed data we've read.
// Returns the number of bytes actually read (may be less than destination.Length).
private int ReadFromPackedData(Span<byte> destination, long packedOffset)
{
long skipBytes = packedOffset - _nextPackedOffset;
if (skipBytes < 0 && !_rawStream.CanSeek)
{
throw new InvalidOperationException(SR.IO_NotSupported_UnseekableStream);
}
if (skipBytes != 0)
{
TarHelpers.AdvanceStream(_rawStream, skipBytes);
}
int bytesRead = _rawStream.Read(destination);
_nextPackedOffset = packedOffset + bytesRead;
return bytesRead;
}

private async ValueTask<int> ReadFromPackedDataAsync(Memory<byte> destination, long packedOffset, CancellationToken cancellationToken)
private async ValueTask<int> ReadFromPackedDataCoreAsync<TAdapter>(Memory<byte> destination, long packedOffset, CancellationToken cancellationToken)
where TAdapter : IReadWriteAdapter
{
long skipBytes = packedOffset - _nextPackedOffset;
if (skipBytes < 0 && !_rawStream.CanSeek)
Expand All @@ -417,9 +343,9 @@ private async ValueTask<int> ReadFromPackedDataAsync(Memory<byte> destination, l
}
if (skipBytes != 0)
{
await TarHelpers.AdvanceStreamAsync(_rawStream, skipBytes, cancellationToken).ConfigureAwait(false);
await TarHelpers.AdvanceStreamCoreAsync<TAdapter>(_rawStream, skipBytes, cancellationToken).ConfigureAwait(false);
}
int bytesRead = await _rawStream.ReadAsync(destination, cancellationToken).ConfigureAwait(false);
int bytesRead = await TAdapter.ReadAsync(_rawStream, destination, cancellationToken).ConfigureAwait(false);
_nextPackedOffset = packedOffset + bytesRead;
return bytesRead;
}
Expand Down Expand Up @@ -512,8 +438,9 @@ private int BinarySearchSegment(long virtualPosition, int lo, int hi)
// and then the packed data begins.
//
// Returns the parsed segments.
private static async Task<(long Offset, long Length)[]> ParseSparseMap(
bool isAsync, Stream rawStream, CancellationToken cancellationToken)
private static async ValueTask<(long Offset, long Length)[]> ParseSparseMapCoreAsync<TAdapter>(
Stream rawStream, CancellationToken cancellationToken)
where TAdapter : IReadWriteAdapter
{
// The buffer is 2 * RecordSize (1024 bytes) and each fill reads exactly RecordSize (512)
// bytes. This guarantees that the total bytes read is always a multiple of RecordSize,
Expand All @@ -538,9 +465,7 @@ async ValueTask<bool> FillBufferAsync()
activeStart = 0;
availableStart = active;

int newBytes = isAsync
? await rawStream.ReadAtLeastAsync(bytes.AsMemory(availableStart, TarHelpers.RecordSize), TarHelpers.RecordSize, throwOnEndOfStream: false, cancellationToken).ConfigureAwait(false)
: rawStream.ReadAtLeast(bytes.AsSpan(availableStart, TarHelpers.RecordSize), TarHelpers.RecordSize, throwOnEndOfStream: false);
int newBytes = await TAdapter.ReadAtLeastAsync(rawStream, bytes.AsMemory(availableStart, TarHelpers.RecordSize), TarHelpers.RecordSize, throwOnEndOfStream: false, cancellationToken).ConfigureAwait(false);

availableStart += newBytes;
return newBytes > 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,9 @@ public Stream? DataStream
// This entry came from a reader, so if the underlying stream is unseekable, we need to
// manually advance the stream pointer to the next header before doing the substitution
// The original stream will get disposed when the reader gets disposed.
_readerOfOrigin.AdvanceDataStreamIfNeeded();
ValueTask vt = _readerOfOrigin.AdvanceDataStreamIfNeededCoreAsync<SyncReadWriteAdapter>(CancellationToken.None);
Debug.Assert(vt.IsCompleted, "Synchronous AdvanceDataStreamIfNeeded completed asynchronously.");
vt.GetAwaiter().GetResult();
// We only do this once
_readerOfOrigin = null;
}
Expand Down
Loading
Loading