diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/v1/EventHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/v1/EventHandler.java index 658fcac5fc..4e1fe4a03a 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/v1/EventHandler.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/v1/EventHandler.java @@ -99,6 +99,8 @@ protected void get(HttpRequest httpRequest, ChannelHandlerContext ctx) throws Ex .serialize(event); eventJsonList.add(new String(serializedEvent, StandardCharsets.UTF_8)); } + String result = JsonUtils.toJSONString(eventJsonList); + writeJson(ctx, result); } @Override diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/HttpRequestUtil.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/HttpRequestUtil.java index a8e1288f79..afa43ba6b4 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/HttpRequestUtil.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/HttpRequestUtil.java @@ -52,7 +52,7 @@ public static Map parseHttpRequestBody(final HttpRequest htt } else if (io.netty.handler.codec.http.HttpMethod.POST.equals(httpRequest.method())) { decodeHttpRequestBody(httpRequest, httpRequestBody); } - if (Objects.isNull(t)) { + if (!Objects.isNull(end)) { end.accept(t); } return httpRequestBody; diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/admin/handler/EventHandlerTest.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/admin/handler/EventHandlerTest.java new file mode 100644 index 0000000000..1df3386bea --- /dev/null +++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/admin/handler/EventHandlerTest.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.admin.handler; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.when; + +import org.apache.eventmesh.runtime.admin.handler.v1.EventHandler; +import org.apache.eventmesh.runtime.boot.EventMeshServer; +import org.apache.eventmesh.runtime.boot.EventMeshTCPServer; +import org.apache.eventmesh.runtime.core.plugin.MQAdminWrapper; +import org.apache.eventmesh.runtime.mock.MockCloudEvent; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.junit.jupiter.MockitoExtension; + +import io.cloudevents.CloudEvent; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.handler.codec.http.DefaultFullHttpRequest; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpRequestDecoder; +import io.netty.handler.codec.http.HttpResponseEncoder; +import io.netty.handler.codec.http.HttpVersion; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + +@ExtendWith(MockitoExtension.class) +public class EventHandlerTest { + + public static String storagePlugin = "standalone"; + + EmbeddedChannel embeddedChannel; + + AdminHandlerManager adminHandlerManager; + + @Mock + EventMeshServer eventMeshServer; + + @Mock + EventMeshTCPServer eventMeshTCPServer; + + @Spy + EventHandler eventHandler = new EventHandler(storagePlugin); + + @Mock + MQAdminWrapper mockAdmin; + + List result = new ArrayList<>(); + + Method initHandler; + + @BeforeEach + public void init() throws Exception { + result.add(new MockCloudEvent()); + when(eventMeshServer.getEventMeshTCPServer()).thenReturn(eventMeshTCPServer); + adminHandlerManager = new AdminHandlerManager(eventMeshServer); + Field admin = EventHandler.class.getDeclaredField("admin"); + admin.setAccessible(true); + admin.set(eventHandler, mockAdmin); + embeddedChannel = new EmbeddedChannel( + new HttpRequestDecoder(), + new HttpResponseEncoder(), + new HttpObjectAggregator(Integer.MAX_VALUE), + new SimpleChannelInboundHandler() { + @Override + protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception { + String uriStr = msg.uri(); + URI uri = URI.create(uriStr); + adminHandlerManager.getHttpHandler(uri.getPath()).get().handle(msg, ctx); + } + }); + initHandler = AdminHandlerManager.class.getDeclaredMethod("initHandler", HttpHandler.class); + initHandler.setAccessible(true); + initHandler.invoke(adminHandlerManager, eventHandler); + } + + @Test + public void testGet() throws Exception { + when(mockAdmin.getEvent(anyString(), anyInt(), anyInt())).thenReturn(result); + FullHttpRequest httpRequest = new DefaultFullHttpRequest( + HttpVersion.HTTP_1_1, HttpMethod.GET, "/event?topicName=123&offset=0&length=1"); + embeddedChannel.writeInbound(httpRequest); + boolean finish = embeddedChannel.finish(); + assertTrue(finish); + ByteBuf byteBuf = null; + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(1024); + while ((byteBuf = embeddedChannel.readOutbound()) != null) { + byte[] data = new byte[byteBuf.readableBytes()]; + byteBuf.readBytes(data); + byteArrayOutputStream.write(data); + } + ; + String response = new String(byteArrayOutputStream.toByteArray(), "UTF-8"); + String responseBody = response.split("\\r?\\n\\r?\\n")[1]; + JsonNode jsonNode = new ObjectMapper().readTree(responseBody); + assertTrue(jsonNode.get(0).asText().contains("mockData")); + } + + @Test + public void testPost() throws IOException { + FullHttpRequest httpRequest = new DefaultFullHttpRequest( + HttpVersion.HTTP_1_1, HttpMethod.POST, "/event", Unpooled.copiedBuffer( + ("specversion=1.0&id=cd7c0d63-6c7c-4300-9f4e-ceb51f46b1b1&source" + + "=/&type=cloudevents&datacontenttype=application/cloudevents+json&subject=test&ttl=4000").getBytes())); + embeddedChannel.writeInbound(httpRequest); + ByteBuf byteBuf = embeddedChannel.readOutbound(); + byte[] data = new byte[byteBuf.readableBytes()]; + byteBuf.readBytes(data); + String response = new String(data, "UTF-8"); + String[] requestMessage = response.split("\r\n"); + assertEquals("HTTP/1.1 200 OK", requestMessage[0].toString()); + } +} diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/mock/MockCloudEvent.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/mock/MockCloudEvent.java new file mode 100644 index 0000000000..ff114d18c6 --- /dev/null +++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/mock/MockCloudEvent.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.mock; + +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.time.OffsetDateTime; +import java.util.Collections; +import java.util.Set; + +import io.cloudevents.CloudEvent; +import io.cloudevents.CloudEventData; +import io.cloudevents.SpecVersion; +import io.cloudevents.core.data.BytesCloudEventData; + +public class MockCloudEvent implements CloudEvent { + + @Override + public CloudEventData getData() { + return BytesCloudEventData.wrap("mockData".getBytes(StandardCharsets.UTF_8)); + } + + @Override + public SpecVersion getSpecVersion() { + return SpecVersion.V1; + } + + @Override + public String getId() { + return "mockId"; + } + + @Override + public String getType() { + return "mockType"; + } + + @Override + public URI getSource() { + return URI.create("mockSource"); + } + + @Override + public String getDataContentType() { + return null; + } + + @Override + public URI getDataSchema() { + return URI.create("mockDataSchema"); + } + + @Override + public String getSubject() { + return "mockSubject"; + } + + @Override + public OffsetDateTime getTime() { + return null; + } + + @Override + public Object getAttribute(String attributeName) throws IllegalArgumentException { + return null; + } + + @Override + public Object getExtension(String extensionName) { + return null; + } + + @Override + public Set getExtensionNames() { + return Collections.emptySet(); + } +}