Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1608,16 +1608,13 @@ static final class OtelDecoratedReadChannel implements ReadChannel {
@VisibleForTesting final ReadChannel reader;
private final Span span;

private volatile Scope scope;

private OtelDecoratedReadChannel(ReadChannel reader, Span span) {
this.reader = reader;
this.span = span;
}

@Override
public void seek(long position) throws IOException {
clearScope();
reader.seek(position);
}

Expand All @@ -1633,7 +1630,6 @@ public RestorableState<ReadChannel> capture() {

@Override
public ReadChannel limit(long limit) {
clearScope();
return reader.limit(limit);
}

Expand All @@ -1644,8 +1640,9 @@ public long limit() {

@Override
public int read(ByteBuffer dst) throws IOException {
setScope();
return reader.read(dst);
try (Scope ignore = span.makeCurrent()) {
return reader.read(dst);
}
}

@Override
Expand All @@ -1655,27 +1652,12 @@ public boolean isOpen() {

@Override
public void close() {
setScope();
try {
try (Scope ignore = span.makeCurrent()) {
reader.close();
} finally {
span.end();
clearScope();
}
}

private void clearScope() {
try (Scope ignore = scope) {
scope = null;
}
}

public void setScope() {
if (scope != null) {
clearScope();
}
scope = span.makeCurrent();
}
}

private final class OtelDecoratedBlobWriteSession implements BlobWriteSession {
Expand Down Expand Up @@ -1718,17 +1700,16 @@ private class OtelDecoratingWritableByteChannel implements WritableByteChannel {
private final WritableByteChannel delegate;
private final Span openSpan;

private Scope scope;

private OtelDecoratingWritableByteChannel(WritableByteChannel delegate, Span openSpan) {
this.delegate = delegate;
this.openSpan = openSpan;
}

@Override
public int write(ByteBuffer src) throws IOException {
setScope();
return delegate.write(src);
try (Scope ignore = openSpan.makeCurrent()) {
return delegate.write(src);
}
}

@Override
Expand All @@ -1738,8 +1719,7 @@ public boolean isOpen() {

@Override
public void close() throws IOException {
setScope();
try {
try (Scope ignore = openSpan.makeCurrent()) {
delegate.close();
} catch (IOException | RuntimeException e) {
openSpan.recordException(e);
Expand All @@ -1750,21 +1730,7 @@ public void close() throws IOException {
} finally {
openSpan.end();
sessionSpan.end();
clearScope();
}
}

private void clearScope() {
try (Scope ignore = scope) {
scope = null;
}
}

public void setScope() {
if (scope != null) {
clearScope();
}
scope = openSpan.makeCurrent();
}
}
}
Expand All @@ -1774,8 +1740,6 @@ static final class OtelDecoratedWriteChannel implements WriteChannel {
@VisibleForTesting final WriteChannel delegate;
private final Span openSpan;

private Scope scope;

private OtelDecoratedWriteChannel(WriteChannel delegate, Span openSpan) {
this.delegate = delegate;
this.openSpan = openSpan;
Expand All @@ -1793,8 +1757,9 @@ public RestorableState<WriteChannel> capture() {

@Override
public int write(ByteBuffer src) throws IOException {
setScope();
return delegate.write(src);
try (Scope ignore = openSpan.makeCurrent()) {
return delegate.write(src);
}
}

@Override
Expand All @@ -1804,31 +1769,16 @@ public boolean isOpen() {

@Override
public void close() throws IOException {
setScope();
try {
try (Scope ignore = openSpan.makeCurrent()) {
delegate.close();
} catch (IOException | RuntimeException e) {
openSpan.recordException(e);
openSpan.setStatus(StatusCode.ERROR, e.getClass().getSimpleName());
throw e;
} finally {
openSpan.end();
clearScope();
}
}

private void clearScope() {
try (Scope ignore = scope) {
scope = null;
}
}

public void setScope() {
if (scope != null) {
clearScope();
}
scope = openSpan.makeCurrent();
}
}

private final class OtelDecoratedCopyWriter extends CopyWriter {
Expand Down Expand Up @@ -2205,21 +2155,34 @@ private final class OtelDecoratingAppendableUploadWriteableByteChannel
implements AppendableUploadWriteableByteChannel {
private final AppendableUploadWriteableByteChannel delegate;
private final Span openSpan;

private volatile Scope scope;
private final Tracer tracer;

private OtelDecoratingAppendableUploadWriteableByteChannel(
AppendableUploadWriteableByteChannel delegate, Span openSpan) {
this.delegate = delegate;
this.openSpan = openSpan;
this.tracer =
TracerDecorator.decorate(
Context.current(),
otel,
OtelStorageDecorator.this.baseAttributes,
AppendableUploadWriteableByteChannel.class.getName() + "/");
}

@Override
@BetaApi
public void finalizeAndClose() throws IOException {
setScope();
try {
delegate.finalizeAndClose();
try (Scope ignore = openSpan.makeCurrent()) {
Span span = tracer.spanBuilder("finalizeAndClose").startSpan();
try (Scope ignore2 = span.makeCurrent()) {
delegate.finalizeAndClose();
} catch (Throwable t) {
span.recordException(t);
span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName());
throw t;
} finally {
span.end();
}
} catch (IOException | RuntimeException e) {
openSpan.recordException(e);
openSpan.setStatus(StatusCode.ERROR, e.getClass().getSimpleName());
Expand All @@ -2229,16 +2192,23 @@ public void finalizeAndClose() throws IOException {
} finally {
openSpan.end();
uploadSpan.end();
clearScope();
}
}

@Override
@BetaApi
public void closeWithoutFinalizing() throws IOException {
setScope();
try {
delegate.closeWithoutFinalizing();
try (Scope ignore = openSpan.makeCurrent()) {
Span span = tracer.spanBuilder("closeWithoutFinalizing").startSpan();
try (Scope ignore2 = span.makeCurrent()) {
delegate.closeWithoutFinalizing();
} catch (Throwable t) {
span.recordException(t);
span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName());
throw t;
} finally {
span.end();
}
} catch (IOException | RuntimeException e) {
openSpan.recordException(e);
openSpan.setStatus(StatusCode.ERROR, e.getClass().getSimpleName());
Expand All @@ -2248,16 +2218,23 @@ public void closeWithoutFinalizing() throws IOException {
} finally {
openSpan.end();
uploadSpan.end();
clearScope();
}
}

@Override
@BetaApi
public void close() throws IOException {
setScope();
try {
delegate.close();
try (Scope ignore = openSpan.makeCurrent()) {
Span span = tracer.spanBuilder("close").startSpan();
try (Scope ignore2 = span.makeCurrent()) {
delegate.close();
} catch (Throwable t) {
span.recordException(t);
span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName());
throw t;
} finally {
span.end();
}
} catch (IOException | RuntimeException e) {
openSpan.recordException(e);
openSpan.setStatus(StatusCode.ERROR, e.getClass().getSimpleName());
Expand All @@ -2267,39 +2244,36 @@ public void close() throws IOException {
} finally {
openSpan.end();
uploadSpan.end();
clearScope();
}
}

@Override
public void flush() throws IOException {
setScope();
delegate.flush();
try (Scope ignore = openSpan.makeCurrent()) {
Span span = tracer.spanBuilder("flush").startSpan();
try (Scope ignore2 = span.makeCurrent()) {
delegate.flush();
} catch (Throwable t) {
span.recordException(t);
span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName());
throw t;
} finally {
span.end();
}
}
}

@Override
public int write(ByteBuffer src) throws IOException {
setScope();
return delegate.write(src);
try (Scope ignore = openSpan.makeCurrent()) {
return delegate.write(src);
}
}

@Override
public boolean isOpen() {
return delegate.isOpen();
}

private void clearScope() {
try (Scope ignore = scope) {
scope = null;
}
}

public void setScope() {
if (scope != null) {
clearScope();
}
scope = openSpan.makeCurrent();
}
}
}
}
Loading