实战Spring4+ActiveMQ整合实现消息队列(生产者+消费者)
最近公司做了一个以信息安全为主的项目,其中有一个业务需求就是,项目定时监控操作用户的行为,对于一些违规操作严重的行为,以发送邮件(FoxMail)的形式进行邮件告警,可能是多人,也可能是一个人,第一次是以单人的形式,,直接在业务层需要告警的地方发送邮件即可,可是后边需求变更了,对于某些告警邮件可能会发送多人,这其中可能就会有阻塞发邮件的可能,直到把所有邮件发送完毕后再继续做下边的业务,领导说这样会影响用户体验,发邮件的时候用户一直处于等待状态,不能干别的事情。最后研究说用消息队列,当有需要发送邮件告警的时候,就向队列中添加一个标识消息,ActiveMQ通过监听器的形式,实时监听队列里边的小时,收到消息后,判断是不是需要发送告警的标识,是的话就自行就行发送邮件!这是就研究的消息队列ActiveMQ,下边就是具体内容:
一、ActiveMQ1.1). ActiveMQActiveMQ是Apache所提供的一个开源的消息系统,完全采用Java来实现,因此,它能很好地支持J2EE提出的JMS(Java Message Service,即Java消息服务)规范。JMS是一组Java应用程序接口,它提供消息的创建、发送、读取等一系列服务。JMS提供了一组公共应用程序接口和响应的语法,类似于Java数据库的统一访问接口JDBC,它是一种与厂商无关的API,使得Java程序能够与不同厂商的消息组件很好地进行通信。
1. 2). Java Message Service(JMS)JMS支持两种消息发送和接收模型。
一种称为P2P(Ponit to Point)模型(点对点一对一),即采用点对点的方式发送消息。P2P模型是基于队列的,消息生产者发送消息到队列,消息消费者从队列中接收消息,队列的存在使得消息的异步传输称为可能,P2P模型在点对点的情况下进行消息传递时采用。另一种称为Pub/Sub(Publish/Subscribe,即发布-订阅)模型,发布-订阅模型定义了如何向一个内容节点发布和订阅消息,这个内容节点称为topic(主题)。主题可以认为是消息传递的中介,消息发布这将消息发布到某个主题,而消息订阅者则从主题订阅消息。主题使得消息的订阅者与消息的发布者互相保持独立,不需要进行接触即可保证消息的传递,发布-订阅模型在消息的一对多广播时采用。1.3). JMS术语Provider/MessageProvider:生产者Consumer/MessageConsumer:消费者PTP:Point To Point,点对点通信消息模型Pub/Sub:Publish/Subscribe,发布订阅消息模型Queue:队列,目标类型之一,和PTP结合Topic:主题,目标类型之一,和Pub/Sub结合ConnectionFactory:连接工厂,JMS用它创建连接Connnection:JMS Client到JMS Provider的连接Destination:消息目的地,由Session创建Session:会话,由Connection创建,实质上就是发送、接受消息的一个线程,因此生产者、消费者都是 Session创建的1.4). ActiveMQ应用场景类似送快递,快递员(producer)将快递(Message)放到指定地点(destination)后,就可以离开了,拿快递的人(customer)在接收到通知后,到指定地点(destination)去取快递(Message)就可以了。当然,取快递时可能要进行身份验证,这就涉及到创建连接(connection)时,需要指定用户名和密码了。还有就是,实际生活中,当快递员把快递放好之后,照理说应该通知客户去哪里取快递,而ActiveMq帮我们做好了一切,通知的工作Activemq会帮我们实现,而无需我们亲自编码通知消费者,生产者只需要将Message放到Mq中即可,通知消费者的工作,mq会帮我们处理
用途就是用来处理消息,也就是处理JMS在大型电子商务类网站,如京东、淘宝、去哪儿等网站有着深入的应用,队列的主要作用是消除高并发访问高峰,加快网站的响应速度。
在不使用消息队列的情况下,用户的请求数据直接写入数据库,高发的情况下,会对数据库造成巨大的压力,同时也使得系统响应延迟加剧,但使用队列后,用户的请求发给队列后立即返回。
1.5). ActiveMQ下载1.6). 启动/apache-activemq-5.15.3/bin/win64/目录下双击activemq.bat文件,在浏览器中输入http://localhost:8161/admin/, 用户名和密码输入admin即可
二、Srping+ActiveMQ应用实例2,1). 项目结构 2,2). 导入maven依赖,pom.xml文件 1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 3 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4 <modelVersion>4.0.0</modelVersion> 5 6 <groupId>www.cnblogs.com.hongmoshu</groupId> 7 <artifactId>test_actmq</artifactId> 8 <version>0.0.1-SNAPSHOT</version> 9 <packaging>war</packaging> 10 <name>test_actmq Maven Webapp</name> 11 <url>http://www.example.com</url> 12 13 <!-- 版本管理 --> 14 <properties> 15 <springframework>4.1.8.RELEASE</springframework> 16 </properties> 17 18 19 <dependencies> 20 21 <!-- junit单元测试 --> 22 <dependency> 23 <groupId>junit</groupId> 24 <artifactId>junit</artifactId> 25 <version>4.11</version> 26 <scope>test</scope> 27 </dependency> 28 29 <!-- JSP相关 --> 30 <dependency> 31 <groupId>jstl</groupId> 32 <artifactId>jstl</artifactId> 33 <version>1.2</version> 34 </dependency> 35 <dependency> 36 <groupId>javax.servlet</groupId> 37 <artifactId>servlet-api</artifactId> 38 <scope>provided</scope> 39 <version>2.5</version> 40 </dependency> 41 42 <!-- spring --> 43 <dependency> 44 <groupId>org.springframework</groupId> 45 <artifactId>spring-core</artifactId> 46 <version>${springframework}</version> 47 </dependency> 48 <dependency> 49 <groupId>org.springframework</groupId> 50 <artifactId>spring-context</artifactId> 51 <version>${springframework}</version> 52 </dependency> 53 <dependency> 54 <groupId>org.springframework</groupId> 55 <artifactId>spring-tx</artifactId> 56 <version>${springframework}</version> 57 </dependency> 58 <dependency> 59 <groupId>org.springframework</groupId> 60 <artifactId>spring-webmvc</artifactId> 61 <version>${springframework}</version> 62 </dependency> 63 <dependency> 64 <groupId>org.springframework</groupId> 65 <artifactId>spring-jms</artifactId> 66 <version>${springframework}</version> 67 </dependency> 68 69 <!-- xbean 如<amq:connectionFactory /> --> 70 <dependency> 71 <groupId>org.apache.xbean</groupId> 72 <artifactId>xbean-spring</artifactId> 73 <version>3.16</version> 74 </dependency> 75 76 <!-- activemq --> 77 <dependency> 78 <groupId>org.apache.activemq</groupId> 79 <artifactId>activemq-core</artifactId> 80 <version>5.7.0</version> 81 </dependency> 82 <dependency> 83 <groupId>org.apache.activemq</groupId> 84 <artifactId>activemq-pool</artifactId> 85 <version>5.12.1</version> 86 </dependency> 87 88 <!-- gson --> 89 <dependency> 90 <groupId>com.google.code.gson</groupId> 91 <artifactId>gson</artifactId> 92 <version>1.7.1</version> 93 </dependency> 94 95 <!-- JSON --> 96 <dependency> 97 <groupId>net.sf.json-lib</groupId> 98 <artifactId>json-lib</artifactId> 99 <version>2.4</version>100 <classifier>jdk15</classifier>101 </dependency>102 103 </dependencies>104 105 <build>106 <finalName>test_actmq</finalName>107 108 </build>109 </project>
2,3). ActiveMQ的配置文件ActiveMQ.xml
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xmlns:amq="http://activemq.apache.org/schema/core" 5 xmlns:jms="http://www.springframework.org/schema/jms" 6 xmlns:context="http://www.springframework.org/schema/context" 7 xmlns:mvc="http://www.springframework.org/schema/mvc" 8 xsi:schemaLocation=" 9 http://www.springframework.org/schema/beans10 http://www.springframework.org/schema/beans/spring-beans-4.1.xsd11 http://www.springframework.org/schema/context12 http://www.springframework.org/schema/context/spring-context-4.1.xsd13 http://www.springframework.org/schema/mvc14 http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd15 http://www.springframework.org/schema/jms16 http://www.springframework.org/schema/jms/spring-jms-4.1.xsd17 http://activemq.apache.org/schema/core18 http://activemq.apache.org/schema/core/activemq-core-5.12.1.xsd"19 >20 21 <context:component-scan base-package="com.svse.service" />22 <mvc:annotation-driven />23 24 <!-- jms.useAsyncSend=true 允许异步接收消息 -->25 <amq:connectionFactory id="amqConnectionFactory"26 brokerURL="tcp://192.168.6.111:61616?jms.useAsyncSend=true"27 userName="admin"28 password="admin" />29 30 <!-- 配置JMS连接工 厂 -->31 <bean id="connectionFactory"32 class="org.springframework.jms.connection.CachingConnectionFactory">33 <constructor-arg ref="amqConnectionFactory" />34 <property name="sessionCacheSize" value="100" />35 </bean>36 37 <!-- 定义消息队列名称(Queue) -->38 <bean id="demoQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">39 <!-- 设置消息队列的名字 -->40 <constructor-arg>41 <value>Jaycekon</value>42 </constructor-arg>43 </bean>44 45 <!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 -->46 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">47 <property name="connectionFactory" ref="connectionFactory" />48 <property name="defaultDestination" ref="demoQueueDestination" />49 <property name="receiveTimeout" value="10000" />50 <!-- true是topic,false是queue,默认是false,此处显示写出false -->51 <property name="pubSubDomain" value="false" />52 <!-- 消息转换器 --> 53 <property name="messageConverter" ref="userMessageConverter"/> 54 </bean>55 56 <!-- 类型转换器 --> 57 <bean id="userMessageConverter" class="com.svse.util.ObjectMessageConverter"/> 58 59 60 <!-- 配置消息队列监听者(Queue) -->61 <bean id="queueMessageListener" class="com.svse.util.QueueMessageListener" /> 62 63 <!-- 显示注入消息监听容器(Queue),配置连接工厂,监听的目标是demoQueueDestination,监听器是上面定义的监听器 -->64 <bean id="queueListenerContainer"65 class="org.springframework.jms.listener.DefaultMessageListenerContainer">66 <property name="connectionFactory" ref="connectionFactory" />67 <property name="destination" ref="demoQueueDestination" />68 <property name="messageListener" ref="queueMessageListener" />69 </bean> 70 71 </beans>
2,4). Spring的配置文件 spring-mvc.xml
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xmlns:context="http://www.springframework.org/schema/context" 5 xmlns:mvc="http://www.springframework.org/schema/mvc" 6 xsi:schemaLocation="http://www.springframework.org/schema/beans 7 http://www.springframework.org/schema/beans/spring-beans.xsd 8 http://www.springframework.org/schema/context 9 http://www.springframework.org/schema/context/spring-context-4.1.xsd10 http://www.springframework.org/schema/mvc 11 http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd">12 13 <context:component-scan base-package="com.svse.controller" />14 <mvc:annotation-driven />15 <bean id="viewResolver" class="org.springframework.web.servlet.view.UrlBasedViewResolver">16 <property name="viewClass"17 value="org.springframework.web.servlet.view.JstlView" />18 <property name="prefix" value="/WEB-INF/views/" />19 <property name="suffix" value=".jsp" />20 </bean>21 22 </beans>
2,5). web.xml
1 <?xml version="1.0" encoding="UTF-8"?> 2 <web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 3 xmlns="http://xmlns.jcp.org/xml/ns/javaee" 4 xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd" 5 id="WebApp_ID" version="3.1"> 6 <display-name>mydemo</display-name> 7 8 <welcome-file-list> 9 <welcome-file>index.jsp</welcome-file>10 </welcome-file-list>11 12 <!-- 加载spring及active的配置文件,classpath为项目src下的路径 -->13 <context-param>14 <param-name>contextConfigLocation</param-name>15 <param-value>16 classpath:spring-mvc.xml;17 classpath:ActiveMQ.xml;18 </param-value>19 </context-param>20 21 22 <listener>23 <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>24 </listener>25 26 <servlet>27 <servlet-name>springMVC</servlet-name>28 <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>29 <init-param>30 <param-name>contextConfigLocation</param-name>31 <param-value>classpath:spring-mvc.xml</param-value>32 </init-param>33 <load-on-startup>1</load-on-startup>34 </servlet>35 <servlet-mapping>36 <servlet-name>springMVC</servlet-name>37 <url-pattern>/</url-pattern>38 </servlet-mapping>39 40 <!-- 处理编码格式 -->41 <filter>42 <filter-name>characterEncodingFilter</filter-name>43 <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>44 <init-param>45 <param-name>encoding</param-name>46 <param-value>UTF-8</param-value>47 </init-param>48 <init-param>49 <param-name>forceEncoding</param-name>50 <param-value>true</param-value>51 </init-param>52 </filter>53 <filter-mapping>54 <filter-name>characterEncodingFilter</filter-name>55 <url-pattern>/*</url-pattern>56 </filter-mapping>57 58 </web-app>
2,6). 实体类Users对象
1 package com.svse.entity; 2 import java.io.Serializable; 3 4 public class Users implements Serializable{ 5 6 private String userId; 7 private String userName; 8 private String sex; 9 private String age;10 private String type;11 12 13 public Users() {14 super();15 }16 public Users(String userId, String userName, String sex, String age,17 String type) {18 super();19 this.userId = userId;20 this.userName = userName;21 this.sex = sex;22 this.age = age;23 this.type = type;24 }25 public String getUserId() {26 return userId;27 }28 public void setUserId(String userId) {29 this.userId = userId;30 }31 public String getUserName() {32 return userName;33 }34 public void setUserName(String userName) {35 this.userName = userName;36 }37 public String getSex() {38 return sex;39 }40 public void setSex(String sex) {41 this.sex = sex;42 }43 public String getAge() {44 return age;45 }46 public void setAge(String age) {47 this.age = age;48 }49 public String getType() {50 return type;51 }52 public void setType(String type) {53 this.type = type;54 }55 @Override56 public String toString() {57 return "Users [userId=" + userId + ", userName=" + userName + ", sex="58 + sex + ", age=" + age + ", type=" + type + "]";59 }60 61 62 }
2,7). 核心代码(生产者ProducerService)
1 package com.svse.service; 2 3 import javax.annotation.Resource; 4 import javax.jms.Destination; 5 import javax.jms.JMSException; 6 import javax.jms.Message; 7 import javax.jms.Session; 8 9 import org.springframework.jms.core.JmsTemplate;10 import org.springframework.jms.core.MessageCreator;11 import org.springframework.stereotype.Service;12 13 import com.svse.entity.Users;14 15 @Service16 public class ProducerService {17 18 @Resource(name="jmsTemplate")19 private JmsTemplate jmsTemplate;20 21 22 /**23 * 向指定队列发送消息 (发送文本消息)24 */25 public void sendMessage(Destination destination,final String msg){26 27 jmsTemplate.setDeliveryPersistent(true);28 29 System.out.println(Thread.currentThread().getName()+" 向队列"+destination.toString()+"发送消息---------------------->"+msg);30 jmsTemplate.send(destination, new MessageCreator() {31 public Message createMessage(Session session) throws JMSException {32 return session.createTextMessage(msg);33 }34 });35 }36 37 /**38 * 向指定队列发送消息以对象的方式 (发送对象消息)39 */40 public void sendMessageNew(Destination destination,Users user){41 System.out.println(Thread.currentThread().getName()+" 向队列"+destination.toString()+"发送消息---------------------->"+user);42 jmsTemplate.convertAndSend(user);43 }44 45 /**46 * 向默认队列发送消息47 */48 public void sendMessage(final String msg){49 String destination = jmsTemplate.getDefaultDestinationName();50 System.out.println(Thread.currentThread().getName()+" 向队列"+destination+"发送消息---------------------->"+msg);51 jmsTemplate.send(new MessageCreator() {52 public Message createMessage(Session session) throws JMSException {53 return session.createTextMessage(msg);54 }55 });56 }57 }
2,8). 核心代码(消费产者ConsumerService)
1 package com.svse.service; 2 3 import javax.annotation.Resource; 4 import javax.jms.Destination; 5 import javax.jms.JMSException; 6 import javax.jms.ObjectMessage; 7 import javax.jms.TextMessage; 8 9 import net.sf.json.JSONObject;10 11 import org.springframework.jms.core.JmsTemplate;12 import org.springframework.stereotype.Service;13 14 import com.svse.entity.Users;15 16 @Service17 public class ConsumerService {18 19 @Resource(name="jmsTemplate")20 private JmsTemplate jmsTemplate;21 //接收文本消息22 public TextMessage receive(Destination destination){23 TextMessage textMessage = (TextMessage) jmsTemplate.receive(destination);24 try{25 JSONObject json=JSONObject.fromObject(textMessage.getText());26 System.out.println("name:"+json.getString("userName"));27 System.out.println("从队列" + destination.toString() + "收到了消息:\t"28 + textMessage.getText());29 } catch (JMSException e) {30 e.printStackTrace();31 }32 return textMessage;33 }34 //接收对象消息35 public ObjectMessage receiveNew(Destination destination){36 ObjectMessage objMsg=(ObjectMessage) jmsTemplate.receive(destination);38 try{39 Users users= (Users) objMsg.getObject();44 System.out.println("name:"+users.getUserName());47 System.out.println("从队列" + destination.toString() + "收到了消息:\t"48 + users);49 } catch (JMSException e) {50 e.printStackTrace();51 }52 return objMsg;53 }54 }
2,9). 核心代码(控制器ConsumerService)
1 package com.svse.controller.mq; 2 3 import java.io.IOException; 4 import java.text.SimpleDateFormat; 5 import java.util.Date; 7 import javax.annotation.Resource; 8 import javax.jms.DeliveryMode; 9 import javax.jms.Destination; 10 import javax.jms.JMSException; 11 import javax.jms.ObjectMessage; 12 import javax.jms.TextMessage; 13 import javax.management.MBeanServerConnection; 14 import javax.management.remote.JMXConnector; 15 import javax.management.remote.JMXConnectorFactory; 16 import javax.management.remote.JMXServiceURL; 18 import org.springframework.stereotype.Controller; 19 import org.springframework.web.bind.annotation.RequestMapping; 20 import org.springframework.web.bind.annotation.RequestMethod; 21 import org.springframework.web.bind.annotation.RequestParam; 22 import org.springframework.web.servlet.ModelAndView; 24 import com.google.gson.Gson; 25 import com.svse.entity.Users; 26 import com.svse.service.ConsumerService; 27 import com.svse.service.ProducerService; 28 29 @Controller 30 public class DemoController { 35 36 //队列名Jaycekon (ActiveMQ中设置的队列的名称) 37 @Resource(name="demoQueueDestination") 38 private Destination demoQueueDestination; 39 40 //队列消息生产者 41 @Resource(name="producerService") 42 private ProducerService producer; 43 44 //队列消息消费者 45 @Resource(name="consumerService") 46 private ConsumerService consumer; 47 48 /* 49 * 准备发消息 50 */ 51 @RequestMapping(value="/producer",method=RequestMethod.GET) 52 public ModelAndView producer(){ 53 System.out.println("------------go producer"); 54 55 Date now = new Date(); 56 SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 57 String time = dateFormat.format( now ); 58 System.out.println(time); 59 60 ModelAndView mv = new ModelAndView(); 61 mv.addObject("time", time); 62 mv.setViewName("producer"); 63 return mv; 64 } 65 66 /* 67 * 发消息 68 */ 69 @RequestMapping(value="/onsend",method=RequestMethod.POST) 70 public ModelAndView producer(@RequestParam("message") String message) { 71 System.out.println("------------send to jms"); 72 ModelAndView mv = new ModelAndView(); 73 for(int i=0;i<5;i++){ 74 try { 75 Users users=new Users("10"+(i+1),"赵媛媛"+(i+1),"女","27","影视演员"); 76 Gson gson=new Gson(); 77 String sendMessage=gson.toJson(users); 78 System.out.println("发送的消息sendMessage:"+sendMessage.toString()); 79 // producer.sendMessage(demoQueueDestination,sendMessage.toString());//以文本的形式 80 producer.sendMessageNew(demoQueueDestination, users);//以对象的方式 81 82 } catch (Exception e) { 83 e.printStackTrace(); 84 } 85 } 86 mv.setViewName("index"); 87 return mv; 88 } 89 /* 90 * 手动接收消息 91 */ 92 @RequestMapping(value="/receive",method=RequestMethod.GET) 93 public ModelAndView queue_receive() throws JMSException { 94 System.out.println("------------receive message"); 95 ModelAndView mv = new ModelAndView(); 96 97 //TextMessage tm = consumer.receive(demoQueueDestination);//接收文本消息 98 99 ObjectMessage objMsg=consumer.receiveNew(demoQueueDestination);//接收对象消息100 Users users= (Users) objMsg.getObject();101 //mv.addObject("textMessage", tm.getText());102 mv.addObject("textMessage", users.getUserId()+" || "+users.getUserName());103 mv.setViewName("receive");104 return mv;105 }106 107 /*108 * ActiveMQ Manager Test109 */110 @RequestMapping(value="/jms",method=RequestMethod.GET)111 public ModelAndView jmsManager() throws IOException {112 System.out.println("------------jms manager");113 ModelAndView mv = new ModelAndView();114 mv.setViewName("index");115 116 JMXServiceURL url = new JMXServiceURL("");117 JMXConnector connector = JMXConnectorFactory.connect(url);118 connector.connect();119 MBeanServerConnection connection = connector.getMBeanServerConnection();120 121 return mv;122 }123 124 }
三、.对象转换器MessageConverter和消息监听器MessageListener
在上边的ProducerService和ConsumerService中,不论是发送消息还是接收消息,都可以以文本TextMessage的方式和ObjectMessage的方式.如果是简单的文本消息可以以TextMessage,但是如果需要发送的内容比较多,结构比较复杂,这时候就建议用对象文本ObjectMessage的方式向队列queue中发送消息了.但是这时候就需要用到对象消息转换器MessageConverter.
3,1). 消息转换器MessageageConverteMessageConverter的作用主要有两方面,一方面它可以把我们的非标准化Message对象转换成我们的目标Message对象,这主要是用在发送消息的时候;另一方面它又可以把我们的Message对象
转换成对应的目标对象,这主要是用在接收消息的时候。
1 package com.svse.util; 2 3 import java.io.Serializable; 4 5 import javax.jms.JMSException; 6 import javax.jms.Message; 7 import javax.jms.ObjectMessage; 8 import javax.jms.Session; 9 10 import org.springframework.jms.support.converter.MessageConversionException;11 import org.springframework.jms.support.converter.MessageConverter;12 13 /**14 *功能说明:通用的消息对象转换类15 *@author:zsq16 *create date:2019年7月12日 上午9:28:3117 *修改人 修改时间 修改描述18 *Copyright (c)2019北京智华天成科技有限公司-版权所有19 */20 public class ObjectMessageConverter implements MessageConverter {21 22 23 //把一个Java对象转换成对应的JMS Message (生产消息的时候)24 public Message toMessage(Object object, Session session)25 throws JMSException, MessageConversionException {26 27 return session.createObjectMessage((Serializable) object); 28 }29 30 //把一个JMS Message转换成对应的Java对象 (消费消息的时候)31 public Object fromMessage(Message message) throws JMSException,32 MessageConversionException {33 ObjectMessage objMessage = (ObjectMessage) message; 34 return objMessage.getObject(); 35 }36 37 }
注意:写了消息转化器之后还需要的ActiveMQ.xml中进行配置
MessageageListe作用就是动态的自行监听消息队列的生产者发送的消息,不需要人工手动接收!
1 package com.svse.util; 2 import javax.jms.JMSException; 3 import javax.jms.Message; 4 import javax.jms.MessageListener; 5 import javax.jms.ObjectMessage; 6 import javax.jms.TextMessage; 7 8 import com.svse.entity.Users; 9 10 11 public class QueueMessageListener implements MessageListener {12 13 //添加了监听器,只要生产者发布了消息,监听器监听到有消息消费者就会自动消费(获取消息)14 public void onMessage(Message message) {15 //(第1种方式)没加转换器之前接收到的是文本消息16 //TextMessage tm = (TextMessage) message;17 18 //(第2种方式)加了转换器之后接收到的ObjectMessage对象消息19 ObjectMessage objMsg=(ObjectMessage) message;20 Users users;21 try {22 users = (Users) objMsg.getObject();23 //System.out.println("QueueMessageListener监听到了文本消息:\t" + tm.getText());24 System.out.println("QueueMessageListener监听到了文本消息:\t" + users); 25 //do something ...26 } catch (JMSException e1) {27 // TODO Auto-generated catch block28 e1.printStackTrace();29 }30 }31 32 }
同样写好监听器后也是需在ActiveMQ.xml中进行配置注册的
(1)注册JmsTemplate时,pubSubDomain这个属性的值要特别注意。默认值是false,也就是说默认只是支持queue模式,不支持topic模式。但是,如果将它改为true,则不支持queue模式。因此如果项目需要同时支持queue和topic模式,那么需要注册2个JmsTemplate,同时监听容器也需要注册2个
(2)使用Queue时,生产者只要将Message发送到MQ服务器端,消费者就可以进行消费,而无需生产者程序一直运行;
(3)消息是按照先入先出的顺序,一旦有消费者将Message消费,该Message就会从MQ服务器队列中删去;
(4)有文章说,“生产者”<-->"消费者"是一对一的关系,其实并不准确,从应用中可以看出,一个生产者产生的消息,可以被多个消费者进行消费,只不过多个消费者在消费消息时是竞争的关系,先得到的先消费,一旦消费完成,该消息就会出队列,
就不能被其他消费者再消费了,即“一次性消费”。就是我们熟悉的“点对点”通信了;
声明:本站所有文章资源内容,如无特殊说明或标注,均为采集网络资源。如若本站内容侵犯了原著者的合法权益,可联系本站删除。