JMS是什么意思
小编给大家分享一下JMS是什么意思,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!
JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。
基于之前一篇“一个故事告诉你什么是消息队列”,了解了消息队列的使用场景以及相关的特性。本文主要讲述消息服务在 JAVA 中的使用。
市面上的有关消息队列的技术选型非常多,如果我们的代码框架要支持不同的消息实现,在保证框架具有较高扩展性的前提下,我们势必要进行一定的封装。
在 JAVA 中,大可不必如此。因为 JAVA 已经制定了一套标准的 JMS 规范。该规范定义了一套通用的接口和相关语义,提供了诸如持久、验证和事务的消息服务,其最主要的目的是允许Java应用程序访问现有的消息中间件。就和 JDBC 一样。
基本概念在介绍具体的使用之前,先简单介绍一下 JMS 的一些基本知识。这里我打算分为 3 部分来介绍,即 消息队列(MQ)的连接、消息发送与消息接收。
这里我们的技术选型是 SpringBoot、JMS、ActiveMQ
为了更好的理解 JMS,这里没有使用 SpringBoot 零配置来搭建项目
MQ 的连接使用 MQ 的第一步一定是先连接 MQ。因为这里使用的是 JMS 规范,对于任何遵守 JMS 规范的 MQ 来说,都会实现相应的ConnectionFactory接口,因此我们只需要创建一个ConnectionFactory工厂类,由它来实现 MQ 的连接,以及封装一系列特性的 MQ 参数。
例子:这里我们以 ActiveMQ 为例,
maven 依赖:
org.springframework.bootspring-boot-starter-parent1.5.3.RELEASEorg.springframework.bootspring-boot-starter-activemq
创建 ActiveMQ 连接工厂:
@BeanpublicConnectionFactoryconnectionFactory(){ActiveMQConnectionFactoryconnectionFactory=newActiveMQConnectionFactory();connectionFactory.setBrokerURL(ActiveMQ_URL);connectionFactory.setUserName(ActiveMQ_USER);connectionFactory.setPassword(ActiveMQ_PASSWORD);returnconnectionFactory;}消息发送
关于消息的发送,是通过 JMS 核心包中的JmsTemplate类来实现的,它简化了 JMS 的使用,因为在发送或同步接收消息时它帮我们处理了资源的创建和释放。从它的作用也不难推测出,它需要引用我们上面创建的连接工厂,具体代码如下:
@BeanpublicJmsTemplatejmsQueueTemplate(){returnnewJmsTemplate(connectionFactory());}
JmsTemplate创建完成后,我们就可以调用它的方法来发送消息了。这里有两个概念需要注意:
代码示例:
@AutowiredprivateJmsTemplatejmsQueueTemplate;/***发送原始消息Message*/publicvoidsend(){jmsQueueTemplate.send("queue1",newMessageCreator(){@OverridepublicMessagecreateMessage(Sessionsession)throwsJMSException{returnsession.createTextMessage("我是原始消息");}});}
优化:当然,我们不用每次都通过MessageCreator匿名类的方式来创建Message对象,JmsTemplate类中提供了对象实体自动转换为Message对象的方法,convertAndSend(String destinationName, final Object message)。
优化代码示例:
/***发送消息自动转换成原始消息*/publicvoidconvertAndSend(){jmsQueueTemplate.convertAndSend("queue1","我是自动转换的消息");}
注:关于消息转换,还可以通过实现MessageConverter接口来自定义转换内容
消息接收讲完了消息发送,我们最后来说说消息是如何接收的。消息既然是以Message对象的形式发送到指定的目的地,那么消息的接收势必会去指定的目的地上去接收消息。这里采用的是监听者的方式来监听指定地点的消息,采用注解@JmsListener来设置监听方法。
代码示例:
@ComponentpublicclassListener1{@JmsListener(destination="queue1")publicvoidreceive(Stringmsg){System.out.println("监听到的消息内容为:"+msg);}}
有了监听的目标和方法后,监听器还得和 MQ 关联起来,这样才能运作起来。这里的监听器可能不止一个,如果每个都要和 MQ 建立连接,肯定不太合适。所以需要一个监听容器工厂的概念,即接口JmsListenerContainerFactory,它会引用上面创建好的与 MQ 的连接工厂,由它来负责接收消息以及将消息分发给指定的监听器。当然也包括事务管理、资源获取与释放和异常转换等。
代码示例:
@BeanpublicDefaultJmsListenerContainerFactoryjmsQueueListenerContainerFactory(){DefaultJmsListenerContainerFactoryfactory=newDefaultJmsListenerContainerFactory();factory.setConnectionFactory(connectionFactory());//设置连接数factory.setConcurrency("3-10");//重连间隔时间factory.setRecoveryInterval(1000L);returnfactory;}场景
代码地址:https://github.com/jasonGeng88/springboot-jms
对 JMS 有了基本的理解后,我们就来在具体的场景中使用一下。
首先,我们需要先启动 ActiveMQ,这里我们以 Docker 容器化的方式进行启动。
启动命令:
dockerrun-d-p8161:8161-p61616:61616--nameactivemqwebcenter/activemq
启动成功后,在 ActiveMQ 可视化界面查看效果(http://localhost:8161):
点对点模式(单消费者)
下面介绍消息队列中最常用的一种场景,即点对点模式。基本概念如下:
代码实现(为简化代码,部分代码沿用上面所述): 启动文件(Application.java)
@SpringBootApplication@EnableJmspublicclassApplication{.../***JMS队列的模板类*connectionFactory()为ActiveMQ连接工厂*/@BeanpublicJmsTemplatejmsQueueTemplate(){returnnewJmsTemplate(connectionFactory());}publicstaticvoidmain(String[]args){SpringApplication.run(Application.class,args);}}
注解@EnableJms设置在@Configuration类上,用来声明对 JMS 注解的支持。
消息生产者(PtpProducer.java)@ComponentpublicclassPtpProducer{@AutowiredprivateJmsTemplatejmsQueueTemplate;/***发送消息自动转换成原始消息*/publicvoidconvertAndSend(){jmsQueueTemplate.convertAndSend("ptp","我是自动转换的消息");}}生产者调用类(PtpController.java)
@RestController@RequestMapping(value="/ptp")publicclassPtpController{@AutowiredprivatePtpProducerptpProducer;@RequestMapping(value="/convertAndSend")publicObjectconvertAndSend(){ptpProducer.convertAndSend();return"success";}}消息监听容器工厂
@SpringBootApplication@EnableJmspublicclassApplication{.../***JMS队列的监听容器工厂*/@Bean(name="jmsQueueListenerCF")publicDefaultJmsListenerContainerFactoryjmsQueueListenerContainerFactory(){DefaultJmsListenerContainerFactoryfactory=newDefaultJmsListenerContainerFactory();factory.setConnectionFactory(connectionFactory());//设置连接数factory.setConcurrency("3-10");//重连间隔时间factory.setRecoveryInterval(1000L);returnfactory;}...}消息监听器
@ComponentpublicclassPtpListener1{/***消息队列监听器*destination队列地址*containerFactory监听器容器工厂,若存在2个以上的监听容器工厂,需进行指定*/@JmsListener(destination="ptp",containerFactory="jmsQueueListenerCF")publicvoidreceive(Stringmsg){System.out.println("点对点模式1:"+msg);}}演示
启动项目启动后,通过 REST 接口的方式来调用消息生产者发送消息,请求如下:
curl-XGET127.0.0.1:8080/ptp/convertAndSend
消费者控制台信息:
ActiveMQ 控制台信息:
列表说明:
基于上面一个消费者消费的模式,因为生产者可能会有很多,同时像某个队列发送消息,这时一个消费者可能会成为瓶颈。所以需要多个消费者来分摊消费压力(消费线程池能解决一定压力,但毕竟在单机上,做不到分布式分布,所以多消费者是有必要的),也就产生了下面的场景。
添加新的监听器
@ComponentpublicclassPtpListener2{@JmsListener(destination=Constant.QUEUE_NAME,containerFactory="jmsQueueListenerCF")publicvoidreceive(Stringmsg){System.out.println("点对点模式2:"+msg);}}
演示 这里我们发起 10 次请求,来观察消费者的消费情况:
这里因为监听容器设置了线程池的缘故,在实际消费过程中,监听器消费的顺序会有所差异。
发布订阅模式
除了点对点模式,发布订阅模式也是消息队列中常见的一种使用。试想一下,有一个即时聊天群,你在群里发送一条消息。所有在这个群里的人(即订阅了该群的人),都会收到你发送的信息。
基本概念:
代码实现 修改 JmsTemplate 模板类,使其支持发布订阅功能
@SpringBootApplication@EnableJmspublicclassApplication{...@BeanpublicJmsTemplatejmsTopicTemplate(){JmsTemplatejmsTemplate=newJmsTemplate(connectionFactory());jmsTemplate.setPubSubDomain(true);returnjmsTemplate;}...}
消息生产者(PubSubProducer.java)
@ComponentpublicclassPtpProducer{@AutowiredprivateJmsTemplatejmsTopicTemplate;publicvoidconvertAndSend(){jmsTopicTemplate.convertAndSend("topic","我是自动转换的消息");}}
生产者调用类(PubSubController.java)
@RestController@RequestMapping(value="/pubsub")publicclassPtpController{@AutowiredprivatePubSubProducerpubSubProducer;@RequestMapping(value="/convertAndSend")publicStringconvertAndSend(){pubSubProducer.convertAndSend();return"success";}}
修改 DefaultJmsListenerContainerFactory 类,使其支持发布订阅功能
@SpringBootApplication@EnableJmspublicclassApplication{.../***JMS队列的监听容器工厂*/@Bean(name="jmsTopicListenerCF")publicDefaultJmsListenerContainerFactoryjmsTopicListenerContainerFactory(){DefaultJmsListenerContainerFactoryfactory=newDefaultJmsListenerContainerFactory();factory.setConnectionFactory(connectionFactory());factory.setConcurrency("1");factory.setPubSubDomain(true);returnfactory;}...}
消息监听器(这里设置2个订阅者)
@ComponentpublicclassPubSubListener1{@JmsListener(destination="topic",containerFactory="jmsTopicListenerCF")publicvoidreceive(Stringmsg){System.out.println("订阅者1-"+msg);}}@ComponentpublicclassPubSubListener2{@JmsListener(destination="topic",containerFactory="jmsTopicListenerCF")publicvoidreceive(Stringmsg){System.out.println("订阅者2-"+msg);}}
演示
curl-XGET127.0.0.1:8080/pubSub/convertAndSend
消费者控制台信息:
ActiveMQ 控制台信息:
看完了这篇文章,相信你对“JMS是什么意思”有了一定的了解,如果想了解更多相关知识,欢迎关注亿速云行业资讯频道,感谢各位的阅读!
声明:本站所有文章资源内容,如无特殊说明或标注,均为采集网络资源。如若本站内容侵犯了原著者的合法权益,可联系本站删除。