Skip to content

Commit aec1041

Browse files
committed
Fix a few bugs related to non event-loop thread writes.
- Incorrect HTTP server metrics report for non event-loop thread writes #5222 - HTTP/2 push is not flushed when written from a non event-loop thread #5223
1 parent 2c594e5 commit aec1041

File tree

7 files changed

+98
-38
lines changed

7 files changed

+98
-38
lines changed

src/main/java/io/vertx/core/http/impl/Http2ServerStream.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
*/
1111
package io.vertx.core.http.impl;
1212

13+
import io.netty.channel.EventLoop;
1314
import io.netty.handler.codec.http.HttpHeaderNames;
1415
import io.netty.handler.codec.http.HttpHeaderValues;
1516
import io.netty.handler.codec.http2.Http2Headers;
@@ -19,12 +20,10 @@
1920
import io.vertx.core.buffer.Buffer;
2021
import io.vertx.core.http.HttpFrame;
2122
import io.vertx.core.http.HttpMethod;
22-
import io.vertx.core.http.HttpServerRequest;
2323
import io.vertx.core.http.StreamPriority;
2424
import io.vertx.core.http.impl.headers.Http2HeadersAdaptor;
2525
import io.vertx.core.impl.ContextInternal;
2626
import io.vertx.core.net.HostAndPort;
27-
import io.vertx.core.net.impl.HostAndPortImpl;
2827
import io.vertx.core.spi.metrics.HttpServerMetrics;
2928
import io.vertx.core.spi.metrics.Metrics;
3029
import io.vertx.core.spi.observability.HttpRequest;
@@ -242,13 +241,23 @@ public Object metric() {
242241
return metric;
243242
}
244243

245-
public HttpServerRequest routed(String route) {
244+
public void routed(String route) {
246245
if (METRICS_ENABLED) {
247-
HttpServerMetrics metrics = conn.metrics();
248-
if (metrics != null && !responseEnded) {
249-
metrics.requestRouted(metric, route);
246+
EventLoop eventLoop = vertx.getOrCreateContext().nettyEventLoop();
247+
synchronized (this) {
248+
if (shouldQueue(eventLoop)) {
249+
queueForWrite(eventLoop, () -> routedInternal(route));
250+
return;
251+
}
250252
}
253+
routedInternal(route);
254+
}
255+
}
256+
257+
private void routedInternal(String route) {
258+
HttpServerMetrics metrics = conn.metrics();
259+
if (metrics != null && !responseEnded) {
260+
metrics.requestRouted(metric, route);
251261
}
252-
return null;
253262
}
254263
}

src/main/java/io/vertx/core/http/impl/VertxHttp2ConnectionHandler.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -335,14 +335,8 @@ io.netty.util.concurrent.Future<Integer> writePushPromise(int streamId, Http2Hea
335335
future.setFailure(fut.cause());
336336
}
337337
});
338-
EventExecutor executor = chctx.executor();
339-
if (executor.inEventLoop()) {
340-
_writePushPromise(streamId, promisedStreamId, headers, promise);
341-
} else {
342-
executor.execute(() -> {
343-
_writePushPromise(streamId, promisedStreamId, headers, promise);
344-
});
345-
}
338+
_writePushPromise(streamId, promisedStreamId, headers, promise);
339+
checkFlush();
346340
return future;
347341
}
348342

src/main/java/io/vertx/core/http/impl/VertxHttp2Stream.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ abstract class VertxHttp2Stream<C extends Http2ConnectionBase> {
7171
conn.consumeCredits(this.stream, len);
7272
}
7373
});
74-
bytesRead += data.length();
7574
handleData(data);
7675
}
7776
});
@@ -117,6 +116,7 @@ void onHeaders(Http2Headers headers, StreamPriority streamPriority) {
117116
}
118117

119118
void onData(Buffer data) {
119+
bytesRead += data.length();
120120
conn.reportBytesRead(data.length());
121121
context.execute(data, pending::write);
122122
}
@@ -191,10 +191,10 @@ final void writeHeaders(Http2Headers headers, boolean end, boolean checkFlush, H
191191

192192
void doWriteHeaders(Http2Headers headers, boolean end, boolean checkFlush, Handler<AsyncResult<Void>> handler) {
193193
FutureListener<Void> promise = handler == null ? null : context.promise(handler);
194-
conn.handler.writeHeaders(stream, headers, end, priority.getDependency(), priority.getWeight(), priority.isExclusive(), checkFlush, promise);
195194
if (end) {
196195
endWritten();
197196
}
197+
conn.handler.writeHeaders(stream, headers, end, priority.getDependency(), priority.getWeight(), priority.isExclusive(), checkFlush, promise);
198198
}
199199

200200
protected void endWritten() {
@@ -241,10 +241,10 @@ void doWriteData(ByteBuf buf, boolean end, Handler<AsyncResult<Void>> handler) {
241241
bytesWritten += numOfBytes;
242242
conn.reportBytesWritten(numOfBytes);
243243
FutureListener<Void> promise = handler == null ? null : context.promise(handler);
244-
conn.handler.writeData(stream, chunk, end, promise);
245244
if (end) {
246245
endWritten();
247246
}
247+
conn.handler.writeData(stream, chunk, end, promise);
248248
}
249249

250250
final void writeReset(long code) {

src/test/java/io/vertx/core/http/Http1xMetricsTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,19 @@
1010
*/
1111
package io.vertx.core.http;
1212

13+
import io.vertx.core.ThreadingModel;
1314
import org.junit.Test;
1415

1516
import java.util.concurrent.CountDownLatch;
1617

1718
public class Http1xMetricsTest extends HttpMetricsTestBase {
1819

1920
public Http1xMetricsTest() {
20-
super(HttpVersion.HTTP_1_1);
21+
this(ThreadingModel.EVENT_LOOP);
22+
}
23+
24+
protected Http1xMetricsTest(ThreadingModel threadingModel) {
25+
super(HttpVersion.HTTP_1_1, threadingModel);
2126
}
2227

2328
@Test
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright (c) 2011-2019 Contributors to the Eclipse Foundation
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Eclipse Public License 2.0 which is available at
6+
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7+
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10+
*/
11+
package io.vertx.core.http;
12+
13+
import io.vertx.core.ThreadingModel;
14+
import org.junit.Test;
15+
16+
import java.util.concurrent.CountDownLatch;
17+
18+
public class Http1xWorkerMetricsTest extends Http1xMetricsTest {
19+
20+
public Http1xWorkerMetricsTest() {
21+
super(ThreadingModel.EVENT_LOOP);
22+
}
23+
}

src/test/java/io/vertx/core/http/Http2MetricsTest.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,10 @@
1010
*/
1111
package io.vertx.core.http;
1212

13+
import io.vertx.core.ThreadingModel;
1314
import io.vertx.test.core.TestUtils;
1415
import io.vertx.test.fakemetrics.*;
16+
import org.junit.Ignore;
1517
import org.junit.Test;
1618
import org.junit.runner.RunWith;
1719
import org.junit.runners.Parameterized;
@@ -27,19 +29,21 @@ public class Http2MetricsTest extends HttpMetricsTestBase {
2729
public static Collection<Object[]> params() {
2830
ArrayList<Object[]> params = new ArrayList<>();
2931
// h2
30-
params.add(new Object[] { Http2TestBase.createHttp2ClientOptions(), Http2TestBase.createHttp2ServerOptions(HttpTestBase.DEFAULT_HTTP_PORT, HttpTestBase.DEFAULT_HTTP_HOST) });
32+
params.add(new Object[] { Http2TestBase.createHttp2ClientOptions(), Http2TestBase.createHttp2ServerOptions(HttpTestBase.DEFAULT_HTTP_PORT, HttpTestBase.DEFAULT_HTTP_HOST), ThreadingModel.EVENT_LOOP });
33+
// h2 + worker
34+
params.add(new Object[] { Http2TestBase.createHttp2ClientOptions(), Http2TestBase.createHttp2ServerOptions(HttpTestBase.DEFAULT_HTTP_PORT, HttpTestBase.DEFAULT_HTTP_HOST), ThreadingModel.WORKER });
3135
// h2c with upgrade
32-
params.add(new Object[] { new HttpClientOptions().setProtocolVersion(HttpVersion.HTTP_2).setHttp2ClearTextUpgrade(true), new HttpServerOptions().setPort(HttpTestBase.DEFAULT_HTTP_PORT).setHost(HttpTestBase.DEFAULT_HTTP_HOST) });
33-
// h2c directq
34-
params.add(new Object[] { new HttpClientOptions().setProtocolVersion(HttpVersion.HTTP_2).setHttp2ClearTextUpgrade(false), new HttpServerOptions().setPort(HttpTestBase.DEFAULT_HTTP_PORT).setHost(HttpTestBase.DEFAULT_HTTP_HOST) });
36+
params.add(new Object[] { new HttpClientOptions().setProtocolVersion(HttpVersion.HTTP_2).setHttp2ClearTextUpgrade(true), new HttpServerOptions().setPort(HttpTestBase.DEFAULT_HTTP_PORT).setHost(HttpTestBase.DEFAULT_HTTP_HOST), ThreadingModel.EVENT_LOOP });
37+
// h2c direct
38+
params.add(new Object[] { new HttpClientOptions().setProtocolVersion(HttpVersion.HTTP_2).setHttp2ClearTextUpgrade(false), new HttpServerOptions().setPort(HttpTestBase.DEFAULT_HTTP_PORT).setHost(HttpTestBase.DEFAULT_HTTP_HOST), ThreadingModel.EVENT_LOOP });
3539
return params;
3640
}
3741

3842
private HttpClientOptions clientOptions;
3943
private HttpServerOptions serverOptions;
4044

41-
public Http2MetricsTest(HttpClientOptions clientOptions, HttpServerOptions serverOptions) {
42-
super(HttpVersion.HTTP_2);
45+
public Http2MetricsTest(HttpClientOptions clientOptions, HttpServerOptions serverOptions, ThreadingModel threadingModel) {
46+
super(HttpVersion.HTTP_2, threadingModel);
4347

4448
this.clientOptions = clientOptions;
4549
this.serverOptions = serverOptions.setHandle100ContinueAutomatically(true);
@@ -70,10 +74,11 @@ public void testPushPromise() throws Exception {
7074
AtomicInteger numBuffer = new AtomicInteger(numBuffers);
7175
vertx.setPeriodic(1, timerID -> {
7276
if (numBuffer.getAndDecrement() == 0) {
73-
pushedResp.end();
74-
assertNull(serverMetrics.getResponseMetric("/wibble"));
77+
pushedResp.end().onComplete(onSuccess(v -> {
78+
assertNull(serverMetrics.getResponseMetric("/wibble"));
79+
complete();
80+
}));
7581
vertx.cancelTimer(timerID);
76-
complete();
7782
} else {
7883
pushedResp.write(TestUtils.randomBuffer(1000));
7984
}

src/test/java/io/vertx/core/http/HttpMetricsTestBase.java

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@
1313

1414
import io.vertx.core.Context;
1515
import io.vertx.core.Future;
16+
import io.vertx.core.ThreadingModel;
1617
import io.vertx.core.VertxOptions;
1718
import io.vertx.core.buffer.Buffer;
1819
import io.vertx.core.http.impl.HttpClientImpl;
1920
import io.vertx.core.http.impl.HttpServerRequestInternal;
21+
import io.vertx.core.impl.VertxInternal;
2022
import io.vertx.core.metrics.MetricsOptions;
2123
import io.vertx.core.net.NetClient;
2224
import io.vertx.core.net.SocketAddress;
@@ -46,9 +48,19 @@
4648
public abstract class HttpMetricsTestBase extends HttpTestBase {
4749

4850
private final HttpVersion protocol;
51+
private final ThreadingModel threadingModel;
4952

50-
public HttpMetricsTestBase(HttpVersion protocol) {
53+
public HttpMetricsTestBase(HttpVersion protocol, ThreadingModel threadingModel) {
5154
this.protocol = protocol;
55+
this.threadingModel = threadingModel;
56+
}
57+
58+
@Override
59+
protected void startServer(SocketAddress bindAddress, Context context, HttpServer server) throws Exception {
60+
if (threadingModel == ThreadingModel.WORKER) {
61+
context = ((VertxInternal) vertx).createWorkerContext();
62+
}
63+
super.startServer(bindAddress, context, server);
5264
}
5365

5466
@Override
@@ -83,6 +95,8 @@ public void testHttpMetricsLifecycle() throws Exception {
8395
assertTrue(serverMetric.get().socket.connected.get());
8496
assertNull(serverMetric.get().route.get());
8597
req.routed("/route/:param");
98+
// Worker can wait
99+
assertWaitUntil(() -> serverMetric.get().route.get() != null);
86100
assertEquals("/route/:param", serverMetric.get().route.get());
87101
req.bodyHandler(buff -> {
88102
assertEquals(contentLength, buff.length());
@@ -93,14 +107,21 @@ public void testHttpMetricsLifecycle() throws Exception {
93107
vertx.setPeriodic(1, timerID -> {
94108
Buffer chunk = TestUtils.randomBuffer(chunkSize);
95109
if (numBuffer.decrementAndGet() == 0) {
96-
resp.end(chunk);
97-
assertTrue(serverMetric.get().responseEnded.get());
98-
assertEquals(contentLength, serverMetric.get().bytesWritten.get());
99-
assertNull(serverMetrics.getRequestMetric(req));
110+
resp
111+
.end(chunk)
112+
.onComplete(onSuccess(v -> {
113+
assertTrue(serverMetric.get().responseEnded.get());
114+
assertFalse(serverMetric.get().failed.get());
115+
assertEquals(contentLength, serverMetric.get().bytesWritten.get());
116+
assertNull(serverMetrics.getRequestMetric(req));
117+
}));
100118
vertx.cancelTimer(timerID);
101119
} else {
102-
resp.write(chunk);
103-
assertSame(serverMetric.get().response.get(), resp);
120+
resp
121+
.write(chunk)
122+
.onComplete(onSuccess(v -> {
123+
assertSame(serverMetric.get().response.get(), resp);
124+
}));
104125
}
105126
});
106127
});
@@ -205,9 +226,7 @@ public void testHttpClientLifecycle() throws Exception {
205226
});
206227
});
207228
});
208-
CountDownLatch listenLatch = new CountDownLatch(1);
209-
server.listen(HttpTestBase.DEFAULT_HTTP_PORT, "localhost", onSuccess(s -> { listenLatch.countDown(); }));
210-
awaitLatch(listenLatch);
229+
startServer(testAddress);
211230
FakeHttpClientMetrics clientMetrics = FakeMetricsBase.getMetrics(client);
212231
CountDownLatch responseBeginLatch = new CountDownLatch(1);
213232
CountDownLatch responseEndLatch = new CountDownLatch(1);
@@ -301,8 +320,13 @@ public void testRouteMetrics() throws Exception {
301320
HttpServerMetric metric = metrics.getRequestMetric(req);
302321
assertNull(metric.route.get());
303322
req.routed("MyRoute");
323+
// Worker can wait
324+
assertWaitUntil(() -> metric.route.get() != null);
304325
assertEquals("MyRoute", metric.route.get());
326+
metric.route.set(null);
305327
req.routed("MyRoute - rerouted");
328+
// Worker can wait
329+
assertWaitUntil(() -> metric.route.get() != null);
306330
assertEquals("MyRoute - rerouted", metric.route.get());
307331
req.response().end();
308332
testComplete();

0 commit comments

Comments
 (0)