spring 集成rabbitmq 使用场景

                                         spring 集成rabbitmq 使用场景 

一、引入spring-rabbit的jar包

       <dependency>
          <groupId>org.springframework.amqp</groupId>
          <artifactId>spring-rabbit</artifactId>
          <version>1.7.5.RELEASE</version>
       </dependency>

二、编写application-rabbit.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
 xmlns:rabbit="http://www.springframework.org/schema/rabbit" xmlns:mvc="http://www.springframework.org/schema/mvc"
 xmlns:aop="http://www.springframework.org/schema/aop"
 xsi:schemaLocation="http://www.springframework.org/schema/beans
 http://www.springframework.org/schema/beans/spring-beans.xsd
 http://www.springframework.org/schema/context
 http://www.springframework.org/schema/context/spring-context.xsd
 http://www.springframework.org/schema/mvc
 http://www.springframework.org/schema/mvc/spring-mvc.xsd
 http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
 http://www.springframework.org/schema/aop
 http://www.springframework.org/schema/aop/spring-aop.xsd">

 

 <!-- <bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
  <constructor-arg value="${rabbitmq.host}" /> <property name="username" value="${rabbitmq.username}"
  /> <property name="password" value="${rabbitmq.password}" /> <property name="port"
  value="${rabbitmq.port}" /> 缓存中要维护的通道数 <property name="channelCacheSize"
  value="5" /> 开启发送确认机制 <property name="publisherConfirms" value="true" />
  开启结果返回机制 <property name="publisherReturns" value="true" /> </bean> -->

 <!--<beanid="propertyPlaceholderConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
  <property name="locations"> <list> dataSourse连接池相关属性,代码不在此贴出,会放在打包好的项目里面
  <value>classpath:spring/rabbitmq.properties</value> </list> </property> </bean> -->


 <!--配置connection-factory,指定连接rabbit server参数 -->
 <!-- channel-cache-size="" 默认通道缓存数25 -->
 <rabbit:connection-factory id="connectionFactory"
  host="localhost" port="5672" username="guest" password="guest"
  publisher-returns="true" publisher-confirms="true" />


 <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
 <rabbit:admin connection-factory="connectionFactory" />

<rabbit:template

id="amqpTemplate"

connectionfactory="connectionFactory"           messageconverter="jackson2JsonMessageConverter"/>

<bean

id="jackson2JsonMessageConverter"

class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"></bean>
 


 <!--定义queue 说明:durable:是否持久化 exclusive: 仅创建者可以使用的私有队列,断开后自动删除 auto_delete:
  当所有消费客户端连接断开后,是否自动删除队列 -->
 <rabbit:queue name="mq.asdf" durable="true" auto-delete="false"
  exclusive="false" />
 <rabbit:queue name="mq.asdf2" durable="true" auto-delete="false"
  exclusive="false" />
 <rabbit:queue name="mq.qwer" durable="true" auto-delete="false"
  exclusive="false" />


 <!--定义topic-exchange -->
 <rabbit:topic-exchange name="mq.asdfExChange"
  durable="true" auto-delete="false">
  <rabbit:bindings>
   <rabbit:binding queue="mq.asdf" pattern="mq.asdf.send"></rabbit:binding>
   <rabbit:binding queue="mq.asdf2" pattern="mq.asdf2.send"></rabbit:binding>
   <rabbit:binding queue="mq.asdf2" pattern="mq.asdf.send"></rabbit:binding>
  </rabbit:bindings>
 </rabbit:topic-exchange>

 <!--定义direct-exchange -->
 <rabbit:direct-exchange name="mq.qwerExChange"
  durable="true" auto-delete="false">
  <rabbit:bindings>
   <rabbit:binding queue="mq.qwer" key="mq.qwer.send"></rabbit:binding>
  </rabbit:bindings>
 </rabbit:direct-exchange>


 <!--定义direct-exchange -->
 <!-- <rabbit:direct-exchange name="mq.helloWorld" durable="true" auto-delete="false">
  <rabbit:bindings> <rabbit:binding queue="hello"></rabbit:binding> </rabbit:bindings>
  </rabbit:direct-exchange> -->

 <!-- 消息接收者 -->
 <!-- <bean id="asdfConsumer" class="com.devframe.entity.AsdfConsumer"></bean>
  <bean id="asdfConsumer2" class="com.devframe.entity.AsdfConsumer2"></bean>
  <bean id="qwerConsumer" class="com.devframe.entity.QwerConsumer"></bean> -->


 <!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
 <!-- <rabbit:listener-container connection-factory="connectionFactory">
  <rabbit:listener queues="mq.asdf" ref="asdfConsumer" /> </rabbit:listener-container>
  <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener
  queues="mq.asdf2" ref="asdfConsumer2" /> </rabbit:listener-container> <rabbit:listener-container
  connection-factory="connectionFactory"> <rabbit:listener queues="mq.qwer"
  ref="qwerConsumer" /> </rabbit:listener-container> -->


 <rabbit:listener-container
  connection-factory="connectionFactory">
  <rabbit:listener queues="hello" ref="cHelloWorld2" />
 </rabbit:listener-container>


 <!-- hello world 1对1 模式 和 1对多模式 (无需交换机) -->
 <rabbit:queue name="hello" durable="true" auto-delete="false"
  exclusive="false" />
 <!-- 两个消费者 -->
 <bean id="cHelloWorld" class="com.devframe.entity.CHelloWorld"></bean>
 <bean id="cHelloWorld2" class="com.devframe.entity.CHelloWorld2"></bean>
 <!-- 两个消费者监听配置 -->
 <rabbit:listener-container
  connection-factory="connectionFactory">
  <!-- priority属性用来配置消费者优先级,达到负载均衡的作用 -->
  <rabbit:listener queues="hello" ref="cHelloWorld"
   priority="1" />
  <rabbit:listener queues="hello" ref="cHelloWorld2"
   priority="1" />
 </rabbit:listener-container>

 <!-- 发布订阅模式(需交换机)(广播模式) -->
 <rabbit:queue name="pubSubQueue" durable="true"
  auto-delete="false" exclusive="false" />
 <rabbit:queue name="pubSubQueue2" durable="true"
  auto-delete="false" exclusive="false" />
 <!-- 配置三个订阅者 -->
 <bean id="suber1" class="com.devframe.entity.Suber1"></bean>
 <bean id="suber2" class="com.devframe.entity.Suber2"></bean>
 <bean id="suber3" class="com.devframe.entity.Suber3"></bean>
 <bean id="suber4" class="com.devframe.entity.Suber4"></bean>
 <!-- 配置广播模式交换机 -->
 <rabbit:fanout-exchange name="pubSubExchange"
  durable="true" auto-delete="false">
  <rabbit:bindings>
   <rabbit:binding queue="pubSubQueue"></rabbit:binding>
   <rabbit:binding queue="pubSubQueue2"></rabbit:binding>
  </rabbit:bindings>
 </rabbit:fanout-exchange>
 <!-- 配置监听 -->
 <rabbit:listener-container
  connection-factory="connectionFactory">
  <rabbit:listener queues="pubSubQueue" ref="suber1" />
 </rabbit:listener-container>

 <rabbit:listener-container
  connection-factory="connectionFactory">
  <rabbit:listener queues="pubSubQueue" ref="suber2" />
 </rabbit:listener-container>

 <rabbit:listener-container
  connection-factory="connectionFactory">
  <rabbit:listener queues="pubSubQueue2" ref="suber3" />
 </rabbit:listener-container>

 <rabbit:listener-container
  connection-factory="connectionFactory">
  <rabbit:listener queues="pubSubQueue2" ref="suber4" />
 </rabbit:listener-container>


 <!-- 路由模式 -->
 <rabbit:queue name="routing1" durable="true" auto-delete="false"
  exclusive="false" />
 <rabbit:queue name="routing2" durable="true" auto-delete="false"
  exclusive="false" />
 <!-- 配置路由交换机 -->
 <rabbit:direct-exchange name="routingExchange"
  durable="true" auto-delete="false">
  <rabbit:bindings>
   <rabbit:binding queue="routing1" key="error"></rabbit:binding>
   <rabbit:binding queue="routing2" key="info"></rabbit:binding>
   <rabbit:binding queue="routing2" key="warn"></rabbit:binding>
   <rabbit:binding queue="routing2" key="error"></rabbit:binding>
  </rabbit:bindings>
 </rabbit:direct-exchange>
 <!-- 配置error信息消费者和 所有信息消费者 -->
 <bean id="allInfoCustomer" class="com.devframe.entity.AllInfoCustomer" />
 <bean id="errorCustomer" class="com.devframe.entity.ErrorCustomer" />
 <!-- 配置监听 -->
 <rabbit:listener-container
  connection-factory="connectionFactory">
  <rabbit:listener queues="routing1" ref="errorCustomer" />
  <rabbit:listener queues="routing2" ref="allInfoCustomer" />
 </rabbit:listener-container>


 <!-- topic模式 -->
 <rabbit:queue name="topicQueue1" durable="true" exclusive="false" />
 <rabbit:queue name="topicQueue2" durable="true" exclusive="false" />

 <rabbit:topic-exchange name="topicExchange"
  durable="true" auto-delete="false">
  <rabbit:bindings>
   <rabbit:binding queue="topicQueue1" pattern="P.123.#"></rabbit:binding>
   <rabbit:binding queue="topicQueue2" pattern="P.123.#"></rabbit:binding>
   <rabbit:binding queue="topicQueue2" pattern="P.456.#"></rabbit:binding>
   <rabbit:binding queue="topicQueue2" pattern="P.789.#"></rabbit:binding>
  </rabbit:bindings>
 </rabbit:topic-exchange>

 <bean id="topicCustomer1" class="com.devframe.entity.TopicCustomer1"></bean>
 <bean id="topicCustomer2" class="com.devframe.entity.TopicCustomer2"></bean>

 <rabbit:listener-container
  connection-factory="connectionFactory">
  <rabbit:listener queues="topicQueue1" ref="topicCustomer1" />
  <rabbit:listener queues="topicQueue2" ref="topicCustomer2" />
 </rabbit:listener-container>


 <!-- header模式 -->
 <rabbit:queue name="headerQueue1" durable="true"
  auto-delete="false" exclusive="false" />
 <rabbit:queue name="headerQueue2" durable="true"
  auto-delete="false" exclusive="false" />

 <rabbit:headers-exchange name="headerExchange"
  durable="true" auto-delete="false">
  <rabbit:bindings>
   <rabbit:binding queue="headerQueue1" key="abc" value="nb"></rabbit:binding>
   <rabbit:binding queue="headerQueue2" key="def" value="pl"></rabbit:binding>
  </rabbit:bindings>
 </rabbit:headers-exchange>

 <bean id="headerCustomer1" class="com.devframe.entity.HeaderCustomer1"></bean>
 <bean id="headerCustomer2" class="com.devframe.entity.HeaderCustomer2"></bean>


 <rabbit:listener-container
  connection-factory="connectionFactory">
  <rabbit:listener ref="headerCustomer1" queues="headerQueue1" />
  <rabbit:listener ref="headerCustomer2" queues="headerQueue2" />
 </rabbit:listener-container>
 <!-- rpc调用 未实现 -->
</beans>

二、编写 消费者 实体类 和 controller 测试代码

package com.devframe.controller;

import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;

@Controller
@Api(tags = "RabbitServerController rabbitmq测试controller")
public class RabbitServerController {

 @Autowired
 private AmqpTemplate amqpTemplate;
 

 @RequestMapping(value = "/sendMsg", method = RequestMethod.POST)
 @ResponseBody
 public String sendAmqbMsg(Model model, @RequestParam(value = "msg", defaultValue = "hello world!!!") String msg) {
  if (model != null && !"".equals(msg)) {
   amqpTemplate.convertAndSend("mq.asdfExChange", "mq.asdf.send", msg);
  } else {
   amqpTemplate.convertAndSend("mq.asdfExChange", "mq.asdf.send", "hello world");
  }
  return "success";
 }

 @RequestMapping(value = "/sendMsg2", method = RequestMethod.POST)
 @ResponseBody
 public String sendAmqbMsg2(Model model, @RequestParam(value = "msg", defaultValue = "hello world!!!") String msg) {
  if (model != null && !"".equals(msg)) {
   amqpTemplate.convertAndSend("mq.asdfExChange", "mq.asdf2.send", "这个世界很奇妙!!!");
  } else {
   amqpTemplate.convertAndSend("mq.asdfExChange", "mq.asdf2.send", "这个世界很奇妙");
  }
  return "success";
 }

 @RequestMapping(value = "/sendMsg3", method = RequestMethod.POST)
 @ResponseBody
 public String sendAmqbMsg3(Model model, @RequestParam(value = "msg", defaultValue = "hello world!!!") String msg) {
  if (model != null && !"".equals(msg)) {
   amqpTemplate.convertAndSend("mq.qwerExChange", "mq.qwer.send", "神奇的世界!!!");
  } else {
   amqpTemplate.convertAndSend("mq.qwerExChange", "mq.qwer.send", "神奇的世界");
  }
  return "success";
 }

 @RequestMapping(value = "/helloWorld", method = RequestMethod.POST)
 @ResponseBody
 @ApiOperation("1对1 or 1对多 无交换机模式")
 public String helloWorld(Model model, @RequestParam(value = "msg", defaultValue = "1对1,无交换机模式!") String msg) {
  if (model != null && !"".equals(msg)) {
   amqpTemplate.convertAndSend("hello", msg);
  }
  return "success";
 }

 @RequestMapping(value = "/pubSub", method = RequestMethod.POST)
 @ResponseBody
 @ApiOperation("广播发布/订阅模式")
 public String pubSub(Model model, @RequestParam(value = "msg", defaultValue = "广播发布/订阅模式") String msg) {
  if (model != null && !"".equals(msg)) {
   // 广播模式对于路由无效,所有的消费者都可以获取都消息
   amqpTemplate.convertAndSend("pubSubExchange", "", msg);
  }
  return "success";
 }

 @RequestMapping(value = "/routing", method = RequestMethod.POST)
 @ResponseBody
 @ApiOperation("路由消息模式")
 public String routing(Model model, @RequestParam(value = "msg", defaultValue = "路由消息模式") String msg) {
  if (model != null && !"".equals(msg)) {
   String[] infoTyp = new String[] { "info", "warn", "error" };
   for (String routing : infoTyp) {
    amqpTemplate.convertAndSend("routingExchange", routing, msg);
   }
  }
  return "success";
 }

 @RequestMapping(value = "/topicMatch", method = RequestMethod.POST)
 @ResponseBody
 @ApiOperation("主题模式")
 public String topicModal(Model model, @RequestParam(value = "msg", defaultValue = "主题模式") String msg) {
  if (model != null && !"".equals(msg)) {
   String[] infoTyp = new String[] { "P.123.asdasd", "P.456.JQBE", "P.789.WBD", "P.ASBDJBAS" };
   for (String routing : infoTyp) {
    amqpTemplate.convertAndSend("topicExchange", routing, msg);
   }
  }
  return "success";
 }

 @RequestMapping(value = "/headerModal", method = RequestMethod.POST)
 @ResponseBody
 @ApiOperation("header模式")
 public String headerModal(Model model, @RequestParam(value = "msg", defaultValue = "主题模式") String msg) throws UnsupportedEncodingException {
  if (model != null && !"".equals(msg)) {
   Map<String, Object> map = new HashMap<String, Object>();
   map.put("abc", "nb");
   map.put("def", "pl");
   map.put("jabs", "aksd");
   for (Entry<String, Object> entry : map.entrySet()) {
    MessageProperties messageProperties = new MessageProperties();
    messageProperties.setHeader(entry.getKey(), entry.getValue());
    Message message = new Message(msg.getBytes("utf-8"), messageProperties);
    amqpTemplate.convertAndSend("headerExchange", null,message);
   }
  }
  return "success";
 }
   //rpc调用未实现
}

 


public class TopicCustomer1 implements MessageListener {
 @Override
 public void onMessage(Message message) {
  // TODO Auto-generated method stub
  try {
   System.out.println(
     String.format("TopicCustomer1 receive msg ------%s%s%s", "【", new String(message.getBody(), "UTF-8"), "】"));
  } catch (UnsupportedEncodingException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
 }
}

每个消费者都是同样的代码,均只需实现MessageListener接口,重写 onMessage方法即可。

或者 不需要 实现 MessageListener 接口,在配置消费者监听时自定义方法。如下:


  
  1. <rabbit:listener-container
  2. connection-factory="connectionFactory">
  3. <rabbit:listener ref="headerCustomer1" queues="headerQueue1" method="listen" />
  4. <rabbit:listener ref="headerCustomer2" queues="headerQueue2" method="listen" />
  5. </rabbit:listener-container>

method 属性 指定 的就是 两个消费者自定义的方法名称。 

文章来源: blog.csdn.net,作者:血煞风雨城2018,版权归原作者所有,如需转载,请联系作者。

原文链接:blog.csdn.net/qq_31905135/article/details/82285735

(完)