`
bartholomew4
  • 浏览: 11212 次
社区版块
存档分类
最新评论

ActiveMQ(五)

阅读更多

今天本篇为ActiveMQQueue基础使用

 

 

    在我看来ActvieMQQueue是其常用的消息发送模式,其应用性比topic远要来的广(大牛勿喷,公司业务、公司行业决定了topic方式在我接触到的项目中使用并不广泛)。

 

 

1.Topicqueue的技术特点对比

 

Topic

Queue

中文全称

发布订阅消息

点对点

有无状态

topic是无状态的并且数据默认不落地。

queue数据默认会在服务器上以文件形式保存,比如Active MQ默认储存在$AMQ_HOME\data\kr-store\data下,亦可配置成DB存储。

完整性保障

不保证发布者发布的每条数据,订阅者都能接受到。

保证每条数据都能被接收者接收。

消息是否会丢失

一般来说发布者发布消息到某一个订阅消息时,只有正在监听该topic地址的订阅者能够接收到消息;如果没有订阅者在监听,该topic就丢失了。

消息发起人发送消息到目标Queue,接收者可以异步接收这个Queue上的消息。Queue上的消息如果暂时没有接收者来取,也不会丢失,直到消息被接收。

消息发布接收策略

一对多的消息发布接收策略,监听同一个topic地址的多个订阅者都能收到发布者发送的消息。订阅者接收完通知服务器

点对点的消息发布接收策略,一个消息发起人发送的消息,只能有一个接受者接收。接收者接收完后,通知服务器已接收,服务器对queue里的消息采取删除或其他操作。

 Topicqueue的最大区别在于topic是以广播的形式,通知所有在线监听的客户端有新的消息,没有监听的客户端将收不到消息;而queue则是以点对点的形式通知多个处于监听状态的客户端中的一个。

从应用场景上来说topic更适合与电商中的广告推送,广撒传单不必关心是否有人拿到。而Queue更适合用来处理严谨事务,如客户邮件,重要消息发布等需要确认消息抵达的场景。

 

2.(转)效率对比(为之前个人学习抄录并未对来源摘入)

    通过增加监听客户端的并发数来验证,topic的消息推送,是否会因为监听客户端的并发上升而出现明显的下降,测试环境的服务器为ci环境的ActiveMQ,客户端为我的本机。

        从实测的结果来看,topic方式发送的消息,发送和接收的效率,在一个订阅者和100个订阅者的前提下没有明显差异,但在500个订阅者(线程)并发的前提下,效率差异很明显(由于500线程并发的情况下,我本机的cpu占用率已高达70-90%,所以无法确认是我本机测试造成的性能瓶颈还是topic消息发送方式存在性能瓶颈,造成效率下降如此明显)。

        Topic方式发送的消息与queue方式发送的消息,发送和接收的效率,在一个订阅者和100个订阅者的前提下没有明显差异,但在500个订阅者并发的前提下,topic方式的效率明显低于queue

        Queue方式发送的消息,在一个订阅者、100个订阅者和500个订阅者的前提下,发送和接收的效率没有明显变化。

Topic实测数据: 

 

发送者发送的消息总数

所有订阅者接收到消息的总数

消息发送和接收平均耗时

单订阅者

100

100

101ms

100订阅者

100

10000

103ms

500订阅者

100

50000

14162ms

 

Queue实测数据: 

 

发送者发送的消息总数

所有订阅者接收到消息的总数

消息发送和接收平均耗时

单订阅者

100

100

96ms

100订阅者

100

100

96ms

500订阅者

100

100

100ms

 

PS:这份仅供参考吧,在个人看来这份数据并没有太大的说服力,首先对queue来说无论有多少个消息接收者,MQ的消息发送总条数都是以消息发起人发起的条数为准,而topic不同的MQ发送总条数是发布者发布的条数与订阅者个数的乘积。

 

下面就是代码了:

首先是消息发起人(Sender):

public class Sender {
	private static ConnectionFactory connectionFactory;
	private static Connection connection;
	private static Session session;
	private static int total=100;

	public Sender() throws JMSException {
		connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
		connection = connectionFactory.createConnection();
		connection.start();
		session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
	}

	public void close() throws JMSException {
		if (connection != null) {
			connection.close();
		}
	}

	public MessageProducer getMessageProducer(String stock, int dMode) throws JMSException {
		MessageProducer producer = session.createProducer(session.createQueue("myQueue"));
		producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
		return producer;
	}

	
	public TextMessage setMessageByText(String text) throws JMSException {
		TextMessage message = session.createTextMessage();
		message.setText(text);
		return message;
	}
	public TextMessage setMessageByMap(Map<String, Object> map) throws JMSException {
		TextMessage message = session.createTextMessage();
		for (String key : map.keySet()) {
			Object o=map.get(key);
			if(o instanceof Integer){
				message.setIntProperty(key, (Integer)o);
			}else if(o instanceof Boolean){
				message.setBooleanProperty(key, (Boolean)o);
			}else if(o instanceof Long){
				message.setLongProperty(key, (Long)o);
			}else if(o instanceof String){
				message.setStringProperty(key, (String)o);
			}else if(o instanceof Double){
				message.setDoubleProperty(key, (Double)o);
			}else if(o instanceof Short){
				message.setShortProperty(key, (Short)o);
			}else if(o instanceof Short){
				message.setShortProperty(key, (Short)o);
			}else if(o instanceof Float){
				message.setFloatProperty(key, (Float)o);
			}
		}
		return message;
	}

	public void sendMessage(MessageProducer producer,TextMessage message) throws JMSException{
		producer.send(message);
	}
	public static void main(String[] args) throws JMSException {
		Sender sender = new Sender();
		MessageProducer producer = sender.getMessageProducer("test", DeliveryMode.NON_PERSISTENT);
		int count=0;
		while (true) {
			TextMessage message;
			if(count%2==0){
				Map<String,Object> map=new HashMap<String,Object>();
				map.put("name", "My message");
				map.put("writer", "Bartholomew");
				map.put("content", "this is ActiveMQ!"+count);
				message= sender.setMessageByMap(map);
				sender.sendMessage(producer,message);
				System.out.println("发送第"+ (++count)+"条信息: " + message.toString());
			}else{
				message=sender.setMessageByText("hello world!"+count);
				sender.sendMessage(producer,message);
				System.out.println("发送第"+ (++count)+"条信息: " + message.getText());
			}
			if(total<=count){
				break;
			}
			
		}
	}
}

 接着是消息接收人的监听类:监听类是根据消息的发送情况来写的,请大家自己修改

public class MyMessageListener implements MessageListener{

	@Override
	public void onMessage(Message message) {
		TextMessage tm = (TextMessage) message;
		String content;
		try {
			content = tm.getText();
			if(content!=null){
				System.out.println("Received message: " + content);
			}else{
				Enumeration<String> pnames =tm.getPropertyNames();
				while(pnames.hasMoreElements()){
					String o = (String)pnames.nextElement();
//					message.getObjectProperty(o);
					System.out.print(o+":"+message.getObjectProperty(o)+",");
				}
				System.out.println();
			}
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

}

 最后是消息接收人

public class Receiver {
	public static void main(String[] args) throws JMSException {
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
		Connection connection = connectionFactory.createConnection();
		connection.start();
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		Destination destination = session.createQueue("myQueue");
		MessageConsumer consumer = session.createConsumer(destination);
		consumer.setMessageListener(new MyMessageListener());
	}
}

 

关于运行,对queue来说并没有谁先跑谁后跑的规定。而且一条消息仅有一个接收人,大家可以随意决定执行顺序。

分享到:
评论

相关推荐

    activemq新手大全

    一、JMS基本概念 二、activemq介绍及安装 1、消息中间件简介 2、activemq 2.1、activemq简介 ...五、activemq常见问题 5.1 activemq 消息传递 5.2 activemq 消息确认机制 5.3 activemq 持久化机制

    activeMQ5.9.1jar包

    activeMAjar包,包含所有 。怎么这么长,还要五十个字节

    ActiveMQ.rar

    n 五:ActiveMQ的Transport 包括:多种传输协议的功能、配置和使用 六: ActiveMQ的消息存储 包括:队列和topic、KahaDB、AMQ、JDBC、MMS等 n 七: ActiveMQ的Network 包括:在一台服务器启动多个Broker;静态网络...

    Spring+activemq

    JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。 • StreamMessage -- Java原始值的数据流 • MapMessage--一套名称-值对 • ...

    MuleESB集成webservice+restful(sprintboot+mybatis+mysql)+activeMQ+javamail

    MuleESB集成webservice+restful(sprintboot+mybatis+mysql)+activeMQ+javamail,五天的研究成果,集成了我所关注的点,希望有更多的朋友一起学习进步。

    ActiveMQ+Websoket异步信息处理

    先启动activeMQ和redis再启动项目,记得计算机名把横杆去掉,不然activemq启动会失败

    ActiveMQ---知识点整理

    本文来自于csdn,文章通过介绍ActiveMQ的安装,使用,搭建等等,简单整理了ActiveMQ。不同系统之间的信息交换,是我们开发中比较常见的场景,比如系统A要把数据发送给系统B,这个问题我们应该如何去处理?1999年,...

    互联网项目练习,使用ssm,fastDFS,activemq,freemarke

    资源简介:SSM Java ...五、结语 通过这一系列SSM Java项目的下载和学习,您将能够深入了解SSM框架的核心技术,提升自己的编程能力,并在实际业务场景中灵活应用。我们期待您能够通过这些项目获得更多的成长和进步!

    面试宝典 ActiveMQ消息中间件面试专题 JAVA面试重点话术 Linux面试专题及答案 面试必备之乐观锁与悲观锁

    面试宝典 ActiveMQ消息中间件面试专题 JAVA面试重点话术 Linux面试专题及答案 面试必备之乐观锁与悲观锁 程序员面试宝典(pdf清晰版) 多线程面试专题及答案 SQL优化面试专题及答案 SpringBoot面试专题及答案 Nginx...

    西安电子科技大学计算机分布式计算五次上机代码及报告

    西安电子科技大学 计算机学院 分布式计算 五次上机代码及报告 UDP RMI 消息队列 mapreduce activemq

    Java面试MQ(Message Queue)消息队列.pdf

    一、MQ介绍 ...五、如何保证MQ的高可用? ActiveMQ: RabbitMQ: RocketMQ: Kafka: 六、如何保证消息不被重复消费? 七、如何保证消息不丢失? 八、如何保证消息的顺序性? 九、消息大量积压怎么解决?

    java+大数据.pdf

    )(10天) MQ(ActiveMQ/RabbitMQ)(5天) spring(springcloud/springMVC/spring)(10天) Dubbo RPC框架 (五天) 以下项⽬任选⼀个,也可以三个都讲解 javaEE项⽬: 1) ⽀付宝内部OA系统 (⼤概15天) 2)阿⾥菜鸟物流...

    SpringCloud+vue+uniapp智慧物联/物业/巡检/停车管理系统源码

    ActiveMq订阅消息队列,让订单更快流转。 二、项目应用多端 管理系统后端,小区管理系统前端,小区管理系统业主手机版、小区管理系统物业手机版,适用小程序,对接公众号。 三、项目系统构成 物业管理系统:管理后台...

    java+大数据(1).pdf

    )(10天) MQ(ActiveMQ/RabbitMQ)(5天) spring(springcloud/springMVC/spring)(10天) Dubbo RPC框架 (五天) 以下项⽬任选⼀个,也可以三个都讲解 javaEE项⽬: 1) ⽀付宝内部OA系统 (⼤概15天) 2)阿⾥菜鸟物流...

    SpringBoot实现分布式微服务电商项目第五季(含配套资料)

    架构: SpringBoot + Dubbo+ Redis + ES + Nginx + FDFS + ActiveMQ 主流分布式微服务架构。 本系列教程共计15季,本套教程为第五季。 场景: 商品首页、商品详情、购物车、订单、支付、库存管理、全文搜索、秒杀...

    自主SNS毕业设计

    三月份到五月份在国美实习(岗位被人资故意分到了测试,相当郁闷的职位!),白天工作,晚上进行毕设的编码与设计聊以慰藉。由于无人讨论,于是需求经常改动,于是表的设计经过多次修改,完全是从无到有,页面不会做...

    java面试笔试资料包括JAVA基础核心知识点深度学习Spring面试题等资料合集.zip

    java面试笔试资料包括JAVA基础核心知识点深度学习Spring面试题等资料合集: JAVA核心知识点整理-282页 Java与哈希算法.docx Java中Lambda表达式的使用....面试题:Kafka、ActiveMQ、RabbitMQ、RocketMQ 有什么优缺点.doc

    蚂蚁课堂(每特学院)第一期-Java高端培训视频教程

    0014--Spring基础知识.zip ├─0015--深入理解Spring事物.zip ├─0016--SSM整合+视频网站开发.zip ├─0017--SpringBoot.zip ├─0018--Redis.zip5 ├─0019--ActiveMQ.zip ├─0020--Nginx.zip ├─0021--Nginx与...

    java面试题,180多页,绝对良心制作,欢迎点评,涵盖各种知识点,排版优美,阅读舒心

    【消息队列】ActiveMQ中的消息重发时间间隔和重发次数吗? 164 【Dubbo】dubbo介绍 166 Dubbo 是什么 166 Dubbo 架构流程图 167 调用流程 167 注册中心 168 Dubbo常识 168 【Dubbo】dubbo运行时,突然所有的zookeeper...

Global site tag (gtag.js) - Google Analytics