spring 集成rabbitmq 使用场景
一、引入spring-rabbit的jar包
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.7.5.RELEASE</version>
</dependency>
二、编写application-rabbit.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit" xmlns:mvc="http://www.springframework.org/schema/mvc"
xmlns:aop="http://www.springframework.org/schema/aop"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop.xsd">
<!-- <bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="${rabbitmq.host}" /> <property name="username" value="${rabbitmq.username}"
/> <property name="password" value="${rabbitmq.password}" /> <property name="port"
value="${rabbitmq.port}" /> 缓存中要维护的通道数 <property name="channelCacheSize"
value="5" /> 开启发送确认机制 <property name="publisherConfirms" value="true" />
开启结果返回机制 <property name="publisherReturns" value="true" /> </bean> --><!--<beanid="propertyPlaceholderConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations"> <list> dataSourse连接池相关属性,代码不在此贴出,会放在打包好的项目里面
<value>classpath:spring/rabbitmq.properties</value> </list> </property> </bean> -->
<!--配置connection-factory,指定连接rabbit server参数 -->
<!-- channel-cache-size="" 默认通道缓存数25 -->
<rabbit:connection-factory id="connectionFactory"
host="localhost" port="5672" username="guest" password="guest"
publisher-returns="true" publisher-confirms="true" />
<!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
<rabbit:admin connection-factory="connectionFactory" /><rabbit:template
id="amqpTemplate"
connectionfactory="connectionFactory" messageconverter="jackson2JsonMessageConverter"/>
<bean
id="jackson2JsonMessageConverter"
class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"></bean>
<!--定义queue 说明:durable:是否持久化 exclusive: 仅创建者可以使用的私有队列,断开后自动删除 auto_delete:
当所有消费客户端连接断开后,是否自动删除队列 -->
<rabbit:queue name="mq.asdf" durable="true" auto-delete="false"
exclusive="false" />
<rabbit:queue name="mq.asdf2" durable="true" auto-delete="false"
exclusive="false" />
<rabbit:queue name="mq.qwer" durable="true" auto-delete="false"
exclusive="false" />
<!--定义topic-exchange -->
<rabbit:topic-exchange name="mq.asdfExChange"
durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="mq.asdf" pattern="mq.asdf.send"></rabbit:binding>
<rabbit:binding queue="mq.asdf2" pattern="mq.asdf2.send"></rabbit:binding>
<rabbit:binding queue="mq.asdf2" pattern="mq.asdf.send"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange><!--定义direct-exchange -->
<rabbit:direct-exchange name="mq.qwerExChange"
durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="mq.qwer" key="mq.qwer.send"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<!--定义direct-exchange -->
<!-- <rabbit:direct-exchange name="mq.helloWorld" durable="true" auto-delete="false">
<rabbit:bindings> <rabbit:binding queue="hello"></rabbit:binding> </rabbit:bindings>
</rabbit:direct-exchange> --><!-- 消息接收者 -->
<!-- <bean id="asdfConsumer" class="com.devframe.entity.AsdfConsumer"></bean>
<bean id="asdfConsumer2" class="com.devframe.entity.AsdfConsumer2"></bean>
<bean id="qwerConsumer" class="com.devframe.entity.QwerConsumer"></bean> -->
<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
<!-- <rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener queues="mq.asdf" ref="asdfConsumer" /> </rabbit:listener-container>
<rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener
queues="mq.asdf2" ref="asdfConsumer2" /> </rabbit:listener-container> <rabbit:listener-container
connection-factory="connectionFactory"> <rabbit:listener queues="mq.qwer"
ref="qwerConsumer" /> </rabbit:listener-container> -->
<rabbit:listener-container
connection-factory="connectionFactory">
<rabbit:listener queues="hello" ref="cHelloWorld2" />
</rabbit:listener-container>
<!-- hello world 1对1 模式 和 1对多模式 (无需交换机) -->
<rabbit:queue name="hello" durable="true" auto-delete="false"
exclusive="false" />
<!-- 两个消费者 -->
<bean id="cHelloWorld" class="com.devframe.entity.CHelloWorld"></bean>
<bean id="cHelloWorld2" class="com.devframe.entity.CHelloWorld2"></bean>
<!-- 两个消费者监听配置 -->
<rabbit:listener-container
connection-factory="connectionFactory">
<!-- priority属性用来配置消费者优先级,达到负载均衡的作用 -->
<rabbit:listener queues="hello" ref="cHelloWorld"
priority="1" />
<rabbit:listener queues="hello" ref="cHelloWorld2"
priority="1" />
</rabbit:listener-container><!-- 发布订阅模式(需交换机)(广播模式) -->
<rabbit:queue name="pubSubQueue" durable="true"
auto-delete="false" exclusive="false" />
<rabbit:queue name="pubSubQueue2" durable="true"
auto-delete="false" exclusive="false" />
<!-- 配置三个订阅者 -->
<bean id="suber1" class="com.devframe.entity.Suber1"></bean>
<bean id="suber2" class="com.devframe.entity.Suber2"></bean>
<bean id="suber3" class="com.devframe.entity.Suber3"></bean>
<bean id="suber4" class="com.devframe.entity.Suber4"></bean>
<!-- 配置广播模式交换机 -->
<rabbit:fanout-exchange name="pubSubExchange"
durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="pubSubQueue"></rabbit:binding>
<rabbit:binding queue="pubSubQueue2"></rabbit:binding>
</rabbit:bindings>
</rabbit:fanout-exchange>
<!-- 配置监听 -->
<rabbit:listener-container
connection-factory="connectionFactory">
<rabbit:listener queues="pubSubQueue" ref="suber1" />
</rabbit:listener-container><rabbit:listener-container
connection-factory="connectionFactory">
<rabbit:listener queues="pubSubQueue" ref="suber2" />
</rabbit:listener-container><rabbit:listener-container
connection-factory="connectionFactory">
<rabbit:listener queues="pubSubQueue2" ref="suber3" />
</rabbit:listener-container><rabbit:listener-container
connection-factory="connectionFactory">
<rabbit:listener queues="pubSubQueue2" ref="suber4" />
</rabbit:listener-container>
<!-- 路由模式 -->
<rabbit:queue name="routing1" durable="true" auto-delete="false"
exclusive="false" />
<rabbit:queue name="routing2" durable="true" auto-delete="false"
exclusive="false" />
<!-- 配置路由交换机 -->
<rabbit:direct-exchange name="routingExchange"
durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="routing1" key="error"></rabbit:binding>
<rabbit:binding queue="routing2" key="info"></rabbit:binding>
<rabbit:binding queue="routing2" key="warn"></rabbit:binding>
<rabbit:binding queue="routing2" key="error"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- 配置error信息消费者和 所有信息消费者 -->
<bean id="allInfoCustomer" class="com.devframe.entity.AllInfoCustomer" />
<bean id="errorCustomer" class="com.devframe.entity.ErrorCustomer" />
<!-- 配置监听 -->
<rabbit:listener-container
connection-factory="connectionFactory">
<rabbit:listener queues="routing1" ref="errorCustomer" />
<rabbit:listener queues="routing2" ref="allInfoCustomer" />
</rabbit:listener-container>
<!-- topic模式 -->
<rabbit:queue name="topicQueue1" durable="true" exclusive="false" />
<rabbit:queue name="topicQueue2" durable="true" exclusive="false" /><rabbit:topic-exchange name="topicExchange"
durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="topicQueue1" pattern="P.123.#"></rabbit:binding>
<rabbit:binding queue="topicQueue2" pattern="P.123.#"></rabbit:binding>
<rabbit:binding queue="topicQueue2" pattern="P.456.#"></rabbit:binding>
<rabbit:binding queue="topicQueue2" pattern="P.789.#"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange><bean id="topicCustomer1" class="com.devframe.entity.TopicCustomer1"></bean>
<bean id="topicCustomer2" class="com.devframe.entity.TopicCustomer2"></bean><rabbit:listener-container
connection-factory="connectionFactory">
<rabbit:listener queues="topicQueue1" ref="topicCustomer1" />
<rabbit:listener queues="topicQueue2" ref="topicCustomer2" />
</rabbit:listener-container>
<!-- header模式 -->
<rabbit:queue name="headerQueue1" durable="true"
auto-delete="false" exclusive="false" />
<rabbit:queue name="headerQueue2" durable="true"
auto-delete="false" exclusive="false" /><rabbit:headers-exchange name="headerExchange"
durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="headerQueue1" key="abc" value="nb"></rabbit:binding>
<rabbit:binding queue="headerQueue2" key="def" value="pl"></rabbit:binding>
</rabbit:bindings>
</rabbit:headers-exchange><bean id="headerCustomer1" class="com.devframe.entity.HeaderCustomer1"></bean>
<bean id="headerCustomer2" class="com.devframe.entity.HeaderCustomer2"></bean>
<rabbit:listener-container
connection-factory="connectionFactory">
<rabbit:listener ref="headerCustomer1" queues="headerQueue1" />
<rabbit:listener ref="headerCustomer2" queues="headerQueue2" />
</rabbit:listener-container>
<!-- rpc调用 未实现 -->
</beans>
二、编写 消费者 实体类 和 controller 测试代码
package com.devframe.controller;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;@Controller
@Api(tags = "RabbitServerController rabbitmq测试controller")
public class RabbitServerController {@Autowired
private AmqpTemplate amqpTemplate;
@RequestMapping(value = "/sendMsg", method = RequestMethod.POST)
@ResponseBody
public String sendAmqbMsg(Model model, @RequestParam(value = "msg", defaultValue = "hello world!!!") String msg) {
if (model != null && !"".equals(msg)) {
amqpTemplate.convertAndSend("mq.asdfExChange", "mq.asdf.send", msg);
} else {
amqpTemplate.convertAndSend("mq.asdfExChange", "mq.asdf.send", "hello world");
}
return "success";
}@RequestMapping(value = "/sendMsg2", method = RequestMethod.POST)
@ResponseBody
public String sendAmqbMsg2(Model model, @RequestParam(value = "msg", defaultValue = "hello world!!!") String msg) {
if (model != null && !"".equals(msg)) {
amqpTemplate.convertAndSend("mq.asdfExChange", "mq.asdf2.send", "这个世界很奇妙!!!");
} else {
amqpTemplate.convertAndSend("mq.asdfExChange", "mq.asdf2.send", "这个世界很奇妙");
}
return "success";
}@RequestMapping(value = "/sendMsg3", method = RequestMethod.POST)
@ResponseBody
public String sendAmqbMsg3(Model model, @RequestParam(value = "msg", defaultValue = "hello world!!!") String msg) {
if (model != null && !"".equals(msg)) {
amqpTemplate.convertAndSend("mq.qwerExChange", "mq.qwer.send", "神奇的世界!!!");
} else {
amqpTemplate.convertAndSend("mq.qwerExChange", "mq.qwer.send", "神奇的世界");
}
return "success";
}@RequestMapping(value = "/helloWorld", method = RequestMethod.POST)
@ResponseBody
@ApiOperation("1对1 or 1对多 无交换机模式")
public String helloWorld(Model model, @RequestParam(value = "msg", defaultValue = "1对1,无交换机模式!") String msg) {
if (model != null && !"".equals(msg)) {
amqpTemplate.convertAndSend("hello", msg);
}
return "success";
}@RequestMapping(value = "/pubSub", method = RequestMethod.POST)
@ResponseBody
@ApiOperation("广播发布/订阅模式")
public String pubSub(Model model, @RequestParam(value = "msg", defaultValue = "广播发布/订阅模式") String msg) {
if (model != null && !"".equals(msg)) {
// 广播模式对于路由无效,所有的消费者都可以获取都消息
amqpTemplate.convertAndSend("pubSubExchange", "", msg);
}
return "success";
}@RequestMapping(value = "/routing", method = RequestMethod.POST)
@ResponseBody
@ApiOperation("路由消息模式")
public String routing(Model model, @RequestParam(value = "msg", defaultValue = "路由消息模式") String msg) {
if (model != null && !"".equals(msg)) {
String[] infoTyp = new String[] { "info", "warn", "error" };
for (String routing : infoTyp) {
amqpTemplate.convertAndSend("routingExchange", routing, msg);
}
}
return "success";
}@RequestMapping(value = "/topicMatch", method = RequestMethod.POST)
@ResponseBody
@ApiOperation("主题模式")
public String topicModal(Model model, @RequestParam(value = "msg", defaultValue = "主题模式") String msg) {
if (model != null && !"".equals(msg)) {
String[] infoTyp = new String[] { "P.123.asdasd", "P.456.JQBE", "P.789.WBD", "P.ASBDJBAS" };
for (String routing : infoTyp) {
amqpTemplate.convertAndSend("topicExchange", routing, msg);
}
}
return "success";
}@RequestMapping(value = "/headerModal", method = RequestMethod.POST)
@ResponseBody
@ApiOperation("header模式")
public String headerModal(Model model, @RequestParam(value = "msg", defaultValue = "主题模式") String msg) throws UnsupportedEncodingException {
if (model != null && !"".equals(msg)) {
Map<String, Object> map = new HashMap<String, Object>();
map.put("abc", "nb");
map.put("def", "pl");
map.put("jabs", "aksd");
for (Entry<String, Object> entry : map.entrySet()) {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setHeader(entry.getKey(), entry.getValue());
Message message = new Message(msg.getBytes("utf-8"), messageProperties);
amqpTemplate.convertAndSend("headerExchange", null,message);
}
}
return "success";
}
//rpc调用未实现
}
public class TopicCustomer1 implements MessageListener {
@Override
public void onMessage(Message message) {
// TODO Auto-generated method stub
try {
System.out.println(
String.format("TopicCustomer1 receive msg ------%s%s%s", "【", new String(message.getBody(), "UTF-8"), "】"));
} catch (UnsupportedEncodingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
每个消费者都是同样的代码,均只需实现MessageListener接口,重写 onMessage方法即可。
或者 不需要 实现 MessageListener 接口,在配置消费者监听时自定义方法。如下:
-
<rabbit:listener-container
-
connection-factory="connectionFactory">
-
<rabbit:listener ref="headerCustomer1" queues="headerQueue1" method="listen" />
-
<rabbit:listener ref="headerCustomer2" queues="headerQueue2" method="listen" />
-
</rabbit:listener-container>
method 属性 指定 的就是 两个消费者自定义的方法名称。
文章来源: blog.csdn.net,作者:血煞风雨城2018,版权归原作者所有,如需转载,请联系作者。
原文链接:blog.csdn.net/qq_31905135/article/details/82285735