Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MessageConsumer {

private static final Logger logger = LoggerFactory.getLogger(MessageConsumer.class);

@RabbitListener(queues = {SpringAmqpConfig.queueName})
public void receiveMessage(String message) {
logger.info("Received Message: " + message);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package com.baeldung.springamqpsimple;

import org.springframework.amqp.core.*;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
Expand Down Expand Up @@ -32,7 +36,7 @@ Binding binding(Queue queue, DirectExchange exchange) {

@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
Expand All @@ -44,5 +48,4 @@ SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter(MessageConsumer messageReceiver) {
return new MessageListenerAdapter(messageReceiver, "receiveMessage");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package com.baeldung.springamqpsimple.broadcast;

import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Declarable;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;

import java.util.Arrays;
import java.util.List;

@Configuration
@Profile("!test")
public class BroadcastConfig {

public final static String fanoutQueue1Name = "com.baeldung.spring-amqp-simple.fanout.queue1";
public final static String fanoutQueue2Name = "com.baeldung.spring-amqp-simple.fanout.queue2";
public final static String fanoutExchangeName = "com.baeldung.spring-amqp-simple.fanout.exchange";

public final static String topicQueue1Name = "com.baeldung.spring-amqp-simple.topic.queue1";
public final static String topicQueue2Name = "com.baeldung.spring-amqp-simple.topic.queue2";
public final static String topicExchangeName = "com.baeldung.spring-amql-simple.topic.exchange";

@Bean
public List<Declarable> topicBindings() {
Queue topicQueue1 = new Queue(topicQueue1Name, false);
Queue topicQueue2 = new Queue(topicQueue2Name, false);

TopicExchange topicExchange = new TopicExchange(topicExchangeName);

return Arrays.asList(
topicQueue1,
topicQueue2,
topicExchange,
BindingBuilder.bind(topicQueue1).to(topicExchange).with("*.important.*"),
BindingBuilder.bind(topicQueue2).to(topicExchange).with("user.#")
);
}

@Bean
public List<Declarable> fanoutBindings() {
Queue fanoutQueue1 = new Queue(fanoutQueue1Name, false);
Queue fanoutQueue2 = new Queue(fanoutQueue2Name, false);

FanoutExchange fanoutExchange = new FanoutExchange(fanoutExchangeName);

return Arrays.asList(
fanoutQueue1,
fanoutQueue2,
fanoutExchange,
BindingBuilder.bind(fanoutQueue1).to(fanoutExchange),
BindingBuilder.bind(fanoutQueue2).to(fanoutExchange)
);
}

@Bean
public SimpleRabbitListenerContainerFactory container(ConnectionFactory connectionFactory, SimpleRabbitListenerContainerFactoryConfigurer configurer) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
return factory;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.baeldung.springamqpsimple.broadcast;

import com.baeldung.springamqpsimple.MessageConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class BroadcastMessageConsumers {
private static final Logger logger = LoggerFactory.getLogger(MessageConsumer.class);

@RabbitListener(queues = {BroadcastConfig.fanoutQueue1Name})
public void receiveMessageFromFanout1(String message) {
logger.info("Received fanout 1 message: " + message);
}

@RabbitListener(queues = {BroadcastConfig.fanoutQueue2Name})
public void receiveMessageFromFanout2(String message) {
logger.info("Received fanout 2 message: " + message);
}

@RabbitListener(queues = {BroadcastConfig.topicQueue1Name})
public void receiveMessageFromTopic1(String message) {
logger.info("Received topic 1 message: " + message);
}

@RabbitListener(queues = {BroadcastConfig.topicQueue2Name})
public void receiveMessageFromTopic2(String message) {
logger.info("Received topic 2 message: " + message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.baeldung.springamqpsimple.broadcast;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseStatus;

@Controller
public class BroadcastMessageController {

private final BroadcastMessageProducer messageProducer;

@Autowired
public BroadcastMessageController(BroadcastMessageProducer messageProducer) {
this.messageProducer = messageProducer;
}

@RequestMapping(value="/broadcast", method= RequestMethod.POST)
@ResponseStatus(value= HttpStatus.CREATED)
public void sendMessage(@RequestBody String message) {
messageProducer.sendMessages(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.baeldung.springamqpsimple.broadcast;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class BroadcastMessageProducer {

private final RabbitTemplate rabbitTemplate;

@Autowired
public BroadcastMessageProducer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}

public void sendMessages(String message) {
rabbitTemplate.convertAndSend(BroadcastConfig.fanoutExchangeName, "", message);
rabbitTemplate.convertAndSend(BroadcastConfig.topicExchangeName, "user.not-important.info", message);
rabbitTemplate.convertAndSend(BroadcastConfig.topicExchangeName, "user.important.error", message);
}
}
5 changes: 3 additions & 2 deletions spring-amqp-simple/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
spring:
rabbitmq:
username: baeldung
password: baeldung
username: guest
password: guest
host: 10.10.10.105
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package broadcast;

import com.baeldung.springamqpsimple.broadcast.BroadcastConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;

import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT;

@RunWith(SpringRunner.class)
@ActiveProfiles("test")
@SpringBootTest(webEnvironment = RANDOM_PORT)
public class BroadcastMessageControllerIntegrationTest {

@Autowired
private TestRestTemplate restTemplate;

@MockBean
private RabbitTemplate rabbitTemplate;

@Test
public void whenPostingMessage_thenMessageIsCreated() {
final String message = "Hello World!";
ResponseEntity<Void> responseEntity = restTemplate.postForEntity("/broadcast", message, Void.class);

assertEquals(HttpStatus.CREATED, responseEntity.getStatusCode());
}

@Test
public void whenPostingMessage_thenMessageIsSentToBroker() {
final String message = "Hello World!";
restTemplate.postForEntity("/broadcast", message, Void.class);

verify(rabbitTemplate).convertAndSend(BroadcastConfig.fanoutExchangeName, "", message);
verify(rabbitTemplate).convertAndSend(BroadcastConfig.topicExchangeName, "user.not-important.info", message);
verify(rabbitTemplate).convertAndSend(BroadcastConfig.topicExchangeName, "user.important.error", message);
}
}