From 9c8b405f69dde5c58dfec9c44ddd62b00da4afe1 Mon Sep 17 00:00:00 2001 From: karsonto Date: Tue, 9 Apr 2024 13:52:51 +0800 Subject: [PATCH 1/3] bug fix --- .../admin/handler/AdminHandlerManager.java | 2 +- .../admin/handler/v1/EventHandler.java | 2 + .../protocol/http/async/CompleteHandler.java | 1 + .../runtime/util/HttpRequestUtil.java | 2 +- .../admin/handler/EventHandlerTest.java | 143 ++++++++++++++++++ .../runtime/mock/MockCloudEvent.java | 92 +++++++++++ 6 files changed, 240 insertions(+), 2 deletions(-) create mode 100644 eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/admin/handler/EventHandlerTest.java create mode 100644 eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/mock/MockCloudEvent.java diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/AdminHandlerManager.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/AdminHandlerManager.java index 4f81584b9d..36c41820c4 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/AdminHandlerManager.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/AdminHandlerManager.java @@ -117,7 +117,7 @@ public void registerHttpHandler() { eventMeshGrpcServer.getEventMeshGrpcConfiguration())); } - private void initHandler(HttpHandler httpHandler) { + public void initHandler(HttpHandler httpHandler) { EventMeshHttpHandler eventMeshHttpHandler = httpHandler.getClass().getAnnotation(EventMeshHttpHandler.class); httpHandlerMap.putIfAbsent(eventMeshHttpHandler.path(), httpHandler); } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/v1/EventHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/v1/EventHandler.java index 658fcac5fc..4e1fe4a03a 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/v1/EventHandler.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/v1/EventHandler.java @@ -99,6 +99,8 @@ protected void get(HttpRequest httpRequest, ChannelHandlerContext ctx) throws Ex .serialize(event); eventJsonList.add(new String(serializedEvent, StandardCharsets.UTF_8)); } + String result = JsonUtils.toJSONString(eventJsonList); + writeJson(ctx, result); } @Override diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/async/CompleteHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/async/CompleteHandler.java index 98f99744c8..c9ae6e884e 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/async/CompleteHandler.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/async/CompleteHandler.java @@ -22,6 +22,7 @@ * * @param */ +@FunctionalInterface public interface CompleteHandler { void onResponse(T t); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/HttpRequestUtil.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/HttpRequestUtil.java index a8e1288f79..afa43ba6b4 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/HttpRequestUtil.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/HttpRequestUtil.java @@ -52,7 +52,7 @@ public static Map parseHttpRequestBody(final HttpRequest htt } else if (io.netty.handler.codec.http.HttpMethod.POST.equals(httpRequest.method())) { decodeHttpRequestBody(httpRequest, httpRequestBody); } - if (Objects.isNull(t)) { + if (!Objects.isNull(end)) { end.accept(t); } return httpRequestBody; diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/admin/handler/EventHandlerTest.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/admin/handler/EventHandlerTest.java new file mode 100644 index 0000000000..5b26660c8c --- /dev/null +++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/admin/handler/EventHandlerTest.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.admin.handler; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.when; + +import org.apache.eventmesh.runtime.admin.handler.v1.EventHandler; +import org.apache.eventmesh.runtime.boot.EventMeshServer; +import org.apache.eventmesh.runtime.boot.EventMeshTCPServer; +import org.apache.eventmesh.runtime.core.plugin.MQAdminWrapper; +import org.apache.eventmesh.runtime.mock.MockCloudEvent; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.lang.reflect.Field; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.junit.jupiter.MockitoExtension; + +import io.cloudevents.CloudEvent; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.handler.codec.http.DefaultFullHttpRequest; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpRequestDecoder; +import io.netty.handler.codec.http.HttpResponseEncoder; +import io.netty.handler.codec.http.HttpVersion; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + +@ExtendWith(MockitoExtension.class) +public class EventHandlerTest { + + public static String storagePlugin = "standalone"; + + EmbeddedChannel embeddedChannel; + + AdminHandlerManager adminHandlerManager; + + @Mock + EventMeshServer eventMeshServer; + + @Mock + EventMeshTCPServer eventMeshTCPServer; + + @Spy + EventHandler eventHandler = new EventHandler(storagePlugin);; + + @Mock + MQAdminWrapper mockAdmin; + + List result = new ArrayList<>(); + + @BeforeEach + public void init() throws Exception { + result.add(new MockCloudEvent()); + when(eventMeshServer.getEventMeshTCPServer()).thenReturn(eventMeshTCPServer); + adminHandlerManager = new AdminHandlerManager(eventMeshServer); + Field admin = EventHandler.class.getDeclaredField("admin"); + admin.setAccessible(true); + admin.set(eventHandler,mockAdmin); + embeddedChannel = new EmbeddedChannel( + new HttpRequestDecoder(), + new HttpResponseEncoder(), + new HttpObjectAggregator(Integer.MAX_VALUE), + new SimpleChannelInboundHandler() { + @Override + protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception { + String uriStr = msg.uri(); + URI uri = URI.create(uriStr); + adminHandlerManager.getHttpHandler(uri.getPath()).get().handle(msg,ctx); + } + }); + adminHandlerManager.initHandler(eventHandler); + } + + @Test + public void testGet() throws Exception { + when(mockAdmin.getEvent(anyString(),anyInt(),anyInt())).thenReturn(result); + FullHttpRequest httpRequest = new DefaultFullHttpRequest( + HttpVersion.HTTP_1_1, HttpMethod.GET, "/event?topicName=123&offset=0&length=1"); + embeddedChannel.writeInbound(httpRequest); + boolean finish = embeddedChannel.finish(); + assertTrue(finish); + ByteBuf byteBuf = null ; + ByteArrayOutputStream bOutput = new ByteArrayOutputStream(1024); + while((byteBuf = embeddedChannel.readOutbound())!=null) { + byte[] data = new byte[byteBuf.readableBytes()]; + byteBuf.readBytes(data); + bOutput.write(data); + }; + String response = new String(bOutput.toByteArray(),"UTF-8"); + String responseBody = response.split("\\r?\\n\\r?\\n")[1]; + JsonNode jsonNode = new ObjectMapper().readTree(responseBody); + assertTrue(jsonNode.get(0).asText().contains("mockData")); + } + + @Test + public void testPost() throws IOException { + FullHttpRequest httpRequest = new DefaultFullHttpRequest( + HttpVersion.HTTP_1_1, HttpMethod.POST, "/event", Unpooled.copiedBuffer("specversion=1.0&id=cd7c0d63-6c7c-4300-9f4e-ceb51f46b1b1&source=/&type=cloudevents&datacontenttype=application/cloudevents+json&subject=test&ttl=4000".getBytes())); + embeddedChannel.writeInbound(httpRequest); + ByteBuf byteBuf = embeddedChannel.readOutbound(); + byte[] data = new byte[byteBuf.readableBytes()]; + byteBuf.readBytes(data); + String response = new String(data,"UTF-8"); + String[] requestMessage = response.split("\r\n"); + assertEquals("HTTP/1.1 200 OK",requestMessage[0].toString()); + } +} diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/mock/MockCloudEvent.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/mock/MockCloudEvent.java new file mode 100644 index 0000000000..ff114d18c6 --- /dev/null +++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/mock/MockCloudEvent.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.mock; + +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.time.OffsetDateTime; +import java.util.Collections; +import java.util.Set; + +import io.cloudevents.CloudEvent; +import io.cloudevents.CloudEventData; +import io.cloudevents.SpecVersion; +import io.cloudevents.core.data.BytesCloudEventData; + +public class MockCloudEvent implements CloudEvent { + + @Override + public CloudEventData getData() { + return BytesCloudEventData.wrap("mockData".getBytes(StandardCharsets.UTF_8)); + } + + @Override + public SpecVersion getSpecVersion() { + return SpecVersion.V1; + } + + @Override + public String getId() { + return "mockId"; + } + + @Override + public String getType() { + return "mockType"; + } + + @Override + public URI getSource() { + return URI.create("mockSource"); + } + + @Override + public String getDataContentType() { + return null; + } + + @Override + public URI getDataSchema() { + return URI.create("mockDataSchema"); + } + + @Override + public String getSubject() { + return "mockSubject"; + } + + @Override + public OffsetDateTime getTime() { + return null; + } + + @Override + public Object getAttribute(String attributeName) throws IllegalArgumentException { + return null; + } + + @Override + public Object getExtension(String extensionName) { + return null; + } + + @Override + public Set getExtensionNames() { + return Collections.emptySet(); + } +} From 844a79456f01154962e466cb9481cb19998df456 Mon Sep 17 00:00:00 2001 From: karsonto Date: Tue, 9 Apr 2024 18:13:42 +0800 Subject: [PATCH 2/3] bug fix --- .../admin/handler/EventHandlerTest.java | 31 ++++++++++--------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/admin/handler/EventHandlerTest.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/admin/handler/EventHandlerTest.java index 5b26660c8c..b9cf1fa9dc 100644 --- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/admin/handler/EventHandlerTest.java +++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/admin/handler/EventHandlerTest.java @@ -77,7 +77,7 @@ public class EventHandlerTest { EventMeshTCPServer eventMeshTCPServer; @Spy - EventHandler eventHandler = new EventHandler(storagePlugin);; + EventHandler eventHandler = new EventHandler(storagePlugin); @Mock MQAdminWrapper mockAdmin; @@ -91,7 +91,7 @@ public void init() throws Exception { adminHandlerManager = new AdminHandlerManager(eventMeshServer); Field admin = EventHandler.class.getDeclaredField("admin"); admin.setAccessible(true); - admin.set(eventHandler,mockAdmin); + admin.set(eventHandler, mockAdmin); embeddedChannel = new EmbeddedChannel( new HttpRequestDecoder(), new HttpResponseEncoder(), @@ -101,7 +101,7 @@ public void init() throws Exception { protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception { String uriStr = msg.uri(); URI uri = URI.create(uriStr); - adminHandlerManager.getHttpHandler(uri.getPath()).get().handle(msg,ctx); + adminHandlerManager.getHttpHandler(uri.getPath()).get().handle(msg, ctx); } }); adminHandlerManager.initHandler(eventHandler); @@ -109,20 +109,21 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws E @Test public void testGet() throws Exception { - when(mockAdmin.getEvent(anyString(),anyInt(),anyInt())).thenReturn(result); + when(mockAdmin.getEvent(anyString(), anyInt(), anyInt())).thenReturn(result); FullHttpRequest httpRequest = new DefaultFullHttpRequest( HttpVersion.HTTP_1_1, HttpMethod.GET, "/event?topicName=123&offset=0&length=1"); - embeddedChannel.writeInbound(httpRequest); + embeddedChannel.writeInbound(httpRequest); boolean finish = embeddedChannel.finish(); assertTrue(finish); - ByteBuf byteBuf = null ; - ByteArrayOutputStream bOutput = new ByteArrayOutputStream(1024); - while((byteBuf = embeddedChannel.readOutbound())!=null) { + ByteBuf byteBuf = null; + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(1024); + while ((byteBuf = embeddedChannel.readOutbound()) != null) { byte[] data = new byte[byteBuf.readableBytes()]; byteBuf.readBytes(data); - bOutput.write(data); - }; - String response = new String(bOutput.toByteArray(),"UTF-8"); + byteArrayOutputStream.write(data); + } + ; + String response = new String(byteArrayOutputStream.toByteArray(), "UTF-8"); String responseBody = response.split("\\r?\\n\\r?\\n")[1]; JsonNode jsonNode = new ObjectMapper().readTree(responseBody); assertTrue(jsonNode.get(0).asText().contains("mockData")); @@ -131,13 +132,15 @@ public void testGet() throws Exception { @Test public void testPost() throws IOException { FullHttpRequest httpRequest = new DefaultFullHttpRequest( - HttpVersion.HTTP_1_1, HttpMethod.POST, "/event", Unpooled.copiedBuffer("specversion=1.0&id=cd7c0d63-6c7c-4300-9f4e-ceb51f46b1b1&source=/&type=cloudevents&datacontenttype=application/cloudevents+json&subject=test&ttl=4000".getBytes())); + HttpVersion.HTTP_1_1, HttpMethod.POST, "/event", Unpooled.copiedBuffer( + ("specversion=1.0&id=cd7c0d63-6c7c-4300-9f4e-ceb51f46b1b1&source" + + "=/&type=cloudevents&datacontenttype=application/cloudevents+json&subject=test&ttl=4000").getBytes())); embeddedChannel.writeInbound(httpRequest); ByteBuf byteBuf = embeddedChannel.readOutbound(); byte[] data = new byte[byteBuf.readableBytes()]; byteBuf.readBytes(data); - String response = new String(data,"UTF-8"); + String response = new String(data, "UTF-8"); String[] requestMessage = response.split("\r\n"); - assertEquals("HTTP/1.1 200 OK",requestMessage[0].toString()); + assertEquals("HTTP/1.1 200 OK", requestMessage[0].toString()); } } From 62fa0152d3a5edfae79a60160b4640be7877a9c5 Mon Sep 17 00:00:00 2001 From: karsonto Date: Wed, 10 Apr 2024 14:09:07 +0800 Subject: [PATCH 3/3] bug fix --- .../runtime/admin/handler/AdminHandlerManager.java | 2 +- .../runtime/core/protocol/http/async/CompleteHandler.java | 1 - .../eventmesh/runtime/admin/handler/EventHandlerTest.java | 7 ++++++- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/AdminHandlerManager.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/AdminHandlerManager.java index 36c41820c4..4f81584b9d 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/AdminHandlerManager.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/AdminHandlerManager.java @@ -117,7 +117,7 @@ public void registerHttpHandler() { eventMeshGrpcServer.getEventMeshGrpcConfiguration())); } - public void initHandler(HttpHandler httpHandler) { + private void initHandler(HttpHandler httpHandler) { EventMeshHttpHandler eventMeshHttpHandler = httpHandler.getClass().getAnnotation(EventMeshHttpHandler.class); httpHandlerMap.putIfAbsent(eventMeshHttpHandler.path(), httpHandler); } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/async/CompleteHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/async/CompleteHandler.java index c9ae6e884e..98f99744c8 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/async/CompleteHandler.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/async/CompleteHandler.java @@ -22,7 +22,6 @@ * * @param */ -@FunctionalInterface public interface CompleteHandler { void onResponse(T t); diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/admin/handler/EventHandlerTest.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/admin/handler/EventHandlerTest.java index b9cf1fa9dc..1df3386bea 100644 --- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/admin/handler/EventHandlerTest.java +++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/admin/handler/EventHandlerTest.java @@ -32,6 +32,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.net.URI; import java.util.ArrayList; import java.util.List; @@ -84,6 +85,8 @@ public class EventHandlerTest { List result = new ArrayList<>(); + Method initHandler; + @BeforeEach public void init() throws Exception { result.add(new MockCloudEvent()); @@ -104,7 +107,9 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws E adminHandlerManager.getHttpHandler(uri.getPath()).get().handle(msg, ctx); } }); - adminHandlerManager.initHandler(eventHandler); + initHandler = AdminHandlerManager.class.getDeclaredMethod("initHandler", HttpHandler.class); + initHandler.setAccessible(true); + initHandler.invoke(adminHandlerManager, eventHandler); } @Test