diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/AsyncSubClient.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/AsyncSubClient.java index 3bf8099366..92a2431f1b 100644 --- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/AsyncSubClient.java +++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/AsyncSubClient.java @@ -20,15 +20,10 @@ import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.apache.eventmesh.common.protocol.SubscriptionType; import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; -import org.apache.eventmesh.common.protocol.tcp.Package; import org.apache.eventmesh.runtime.client.common.ClientConstants; import org.apache.eventmesh.runtime.client.common.MessageUtils; -import org.apache.eventmesh.runtime.client.hook.ReceiveMsgHook; import org.apache.eventmesh.runtime.client.impl.SubClientImpl; -import io.netty.channel.ChannelHandlerContext; - - import lombok.extern.slf4j.Slf4j; @Slf4j @@ -40,14 +35,11 @@ public static void main(String[] args) throws Exception { client.init(); client.heartbeat(); client.justSubscribe(ClientConstants.ASYNC_TOPIC, SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC); - client.registerBusiHandler(new ReceiveMsgHook() { - @Override - public void handle(Package msg, ChannelHandlerContext ctx) { - if (msg.getBody() instanceof EventMeshMessage) { - String body = ((EventMeshMessage) msg.getBody()).getBody(); - if (log.isInfoEnabled()) { - log.info("receive message -------------------------------" + body); - } + client.registerBusiHandler((msg, ctx) -> { + if (msg.getBody() instanceof EventMeshMessage) { + String body = ((EventMeshMessage) msg.getBody()).getBody(); + if (log.isInfoEnabled()) { + log.info("receive message : {}", body); } } });