Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Release Notes.
* Grpc plugin support trace client async generic call(without grpc stubs), support Method type: `UNARY`、`SERVER_STREAMING`.
* Enhance Apache ShenYu (incubating) plugin: support trace `grpc`,`sofarpc`,`motan`,`tars` rpc proxy.
* Add primary endpoint name to log events.
* Fix Span not finished in gateway plugin when the gateway request timeout.

#### Documentation

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,46 +19,74 @@

import java.lang.reflect.Method;
import java.util.function.BiFunction;

import io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.define.EnhanceObjectCache;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.SignalType;
import reactor.netty.Connection;
import reactor.netty.http.client.HttpClientResponse;

public class HttpClientFinalizerResponseConnectionInterceptor implements InstanceMethodsAroundInterceptor {
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
MethodInterceptResult result) {
BiFunction<? super HttpClientResponse, ? super Connection, ? extends Publisher> finalReceiver = (BiFunction<? super HttpClientResponse, ? super Connection, ? extends Publisher>) allArguments[0];
EnhanceObjectCache cache = (EnhanceObjectCache) objInst.getSkyWalkingDynamicField();
allArguments[0] = new BiFunction<HttpClientResponse, Connection, Publisher>() {
@Override
public Publisher apply(final HttpClientResponse response, final Connection connection) {
Publisher publisher = finalReceiver.apply(response, connection);
// receive the response. Stop the span.
if (cache.getSpan() != null) {
if (response.status().code() >= 400) {
cache.getSpan().errorOccurred();
}
Tags.HTTP_RESPONSE_STATUS_CODE.set(cache.getSpan(), response.status().code());
cache.getSpan().asyncFinish();
}

if (cache.getSpan1() != null) {
cache.getSpan1().asyncFinish();
allArguments[0] = (BiFunction<HttpClientResponse, Connection, Publisher>) (response, connection) -> {
Publisher publisher = finalReceiver.apply(response, connection);
// receive the response.
if (cache.getSpan() != null) {
if (response.status().code() >= HttpResponseStatus.BAD_REQUEST.code()) {
cache.getSpan().errorOccurred();
}
return publisher;
Tags.HTTP_RESPONSE_STATUS_CODE.set(cache.getSpan(), response.status().code());
}
return publisher;
};
}

@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
return ret;
Object ret) {
Flux<?> responseFlux = (Flux<?>) ret;

responseFlux = responseFlux
.doOnError(e -> {
EnhanceObjectCache cache = (EnhanceObjectCache) objInst.getSkyWalkingDynamicField();
if (cache == null) {
return;
}

if (cache.getSpan() != null) {
cache.getSpan().errorOccurred();
cache.getSpan().log(e);
}
})
.doFinally(signalType -> {
EnhanceObjectCache cache = (EnhanceObjectCache) objInst.getSkyWalkingDynamicField();
if (cache == null) {
return;
}
// do finally. Finish the span.
if (cache.getSpan() != null) {
if (signalType == SignalType.CANCEL) {
cache.getSpan().errorOccurred();
}
cache.getSpan().asyncFinish();
}

if (cache.getSpan1() != null) {
cache.getSpan1().asyncFinish();
}
});

return responseFlux;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.plugin.spring.cloud.gateway.v3x.define.EnhanceObjectCache;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.SignalType;
import reactor.netty.Connection;
import reactor.netty.http.client.HttpClientResponse;

Expand All @@ -39,34 +41,62 @@ public class HttpClientFinalizerResponseConnectionInterceptor implements Instanc

@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
MethodInterceptResult result) {
BiFunction<? super HttpClientResponse, ? super Connection, ? extends Publisher> finalReceiver = (BiFunction<? super HttpClientResponse, ? super Connection, ? extends Publisher>) allArguments[0];
EnhanceObjectCache cache = (EnhanceObjectCache) objInst.getSkyWalkingDynamicField();
allArguments[0] = (BiFunction<HttpClientResponse, Connection, Publisher>) (response, connection) -> {
Publisher publisher = finalReceiver.apply(response, connection);
if (cache == null) {
return publisher;
}
// receive the response. Stop the span.
// receive the response.
if (cache.getSpan() != null) {
if (response.status().code() >= HttpResponseStatus.BAD_REQUEST.code()) {
cache.getSpan().errorOccurred();
}
Tags.HTTP_RESPONSE_STATUS_CODE.set(cache.getSpan(), response.status().code());
cache.getSpan().asyncFinish();
}

if (cache.getSpan1() != null) {
cache.getSpan1().asyncFinish();
}

return publisher;
};
}

@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
return ret;
Object ret) {
Flux<?> responseFlux = (Flux<?>) ret;

responseFlux = responseFlux
.doOnError(e -> {
EnhanceObjectCache cache = (EnhanceObjectCache) objInst.getSkyWalkingDynamicField();
if (cache == null) {
return;
}

if (cache.getSpan() != null) {
cache.getSpan().errorOccurred();
cache.getSpan().log(e);
}
})
.doFinally(signalType -> {
EnhanceObjectCache cache = (EnhanceObjectCache) objInst.getSkyWalkingDynamicField();
if (cache == null) {
return;
}
// do finally. Finish the span.
if (cache.getSpan() != null) {
if (signalType == SignalType.CANCEL) {
cache.getSpan().errorOccurred();
}
cache.getSpan().asyncFinish();
}

if (cache.getSpan1() != null) {
cache.getSpan1().asyncFinish();
}
});

return responseFlux;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.modules.junit4.PowerMockRunnerDelegate;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.netty.Connection;
import reactor.netty.NettyOutbound;
import reactor.netty.http.client.HttpClientRequest;
Expand Down Expand Up @@ -151,9 +152,13 @@ private void executeSendRequest() throws Throwable {
Object[] responseConnectionArguments = new Object[]{originalResponseConnectionBiFunction};
responseConnectionInterceptor
.beforeMethod(enhancedInstance, null, responseConnectionArguments, null, null);
responseConnectionInterceptor.afterMethod(enhancedInstance, null, new Object[0], null, enhancedInstance);
Flux flux = Flux.just(0);

flux = (Flux) responseConnectionInterceptor.afterMethod(enhancedInstance, null, new Object[0], null, flux);

((BiFunction<? super HttpClientResponse, ? super Connection, ? extends Publisher<Void>>) responseConnectionArguments[0])
.apply(mockResponse, null);
flux.blockFirst();
}

private void assertUpstreamSpan(AbstractSpan span) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,26 @@ segmentItems:
segments:
- segmentId: not null
spans:
- operationName: /provider/timeout/error
parentSpanId: 0
spanId: 1
isError: true
spanType: Exit
tags:
- { key: url, value: not null }
- { key: http.method, value: GET }
- { key: http.status_code, value: '500' }
logs:
- logEvent:
- { key: event, value: error }
- { key: error.kind, value: not null }
- { key: message, value: not null }
- { key: stack, value: not null }
- logEvent:
- { key: event, value: error }
- { key: error.kind, value: not null }
- { key: message, value: not null }
- { key: stack, value: not null }
- operationName: GET:/provider/b/testcase
parentSpanId: -1
spanId: 0
Expand All @@ -42,21 +62,49 @@ segmentItems:
segments:
- segmentId: not null
spans:
- operationName: /provider/b/testcase
- operationName: /provider/timeout/error
parentSpanId: -1
spanId: 0
spanLayer: Http
startTime: nq 0
endTime: nq 0
componentId: 67
isError: false
isError: true
spanType: Entry
peer: not null
tags:
- {key: url, value: 'http://localhost:8080/provider/b/testcase'}
- {key: url, value: 'http://localhost:8080/provider/timeout/error' }
- {key: http.method, value: GET}
- {key: http.status_code, value: '200'}
skipAnalysis: 'false'
- {key: http.status_code, value: '500'}
- segmentId: not null
spans:
- operationName: SpringCloudGateway/sendRequest
parentSpanId: 0
spanId: 1
spanLayer: Http
startTime: nq 0
endTime: nq 0
componentId: 61
isError: true
spanType: Exit
peer: 1.2.3.4:18070
skipAnalysis: false
tags:
- { key: url, value: not null }
logs:
- logEvent:
- { key: event, value: error }
- { key: error.kind, value: not null }
- { key: message, value: not null }
- { key: stack, value: not null }
- operationName: SpringCloudGateway/RoutingFilter
parentSpanId: -1
spanId: 0
startTime: nq 0
endTime: nq 0
componentId: 61
spanType: Local
refs:
- { parentEndpoint: '/provider/timeout/error', networkAddress: '', refType: CrossThread,
parentSpanId: 0, parentTraceSegmentId: not null, parentServiceInstance: not
null, parentService: not null, traceId: not null }
skipAnalysis: 'false'
- segmentId: not null
spans:
- operationName: SpringCloudGateway/sendRequest
Expand Down Expand Up @@ -87,3 +135,20 @@ segmentItems:
parentSpanId: 0, parentTraceSegmentId: not null, parentServiceInstance: not
null, parentService: not null, traceId: not null}
skipAnalysis: 'false'
- segmentId: not null
spans:
- operationName: /provider/b/testcase
parentSpanId: -1
spanId: 0
spanLayer: Http
startTime: nq 0
endTime: nq 0
componentId: 67
isError: false
spanType: Entry
peer: not null
tags:
- { key: url, value: 'http://localhost:8080/provider/b/testcase' }
- { key: http.method, value: GET }
- { key: http.status_code, value: '200' }
skipAnalysis: 'false'
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,14 @@ server:
spring:
cloud:
gateway:
httpclient:
connect-timeout: 2000
routes:
- id: provider_route
uri: http://localhost:18070
predicates:
- Path=/provider/b/*
- id: provider_timeout
uri: http://1.2.3.4:18070
predicates:
- Path=/provider/timeout/*
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,17 @@

import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;

@RestController
public class TestController {

@RequestMapping("/provider/b/testcase")
public String testcase() {
try {
new RestTemplate().getForEntity("http://localhost:8080/provider/timeout/error", String.class);
} catch (Exception e) {
}
return "1";
}

Expand Down
Loading