这里记录官方的第2个demo,这个demo中主要讲的MQ的队列和消费者在处理消息的同时挂掉了的情况消息怎么去处理,以及任务的公平调度
[TOC]
0-概要
- 在官方的第一个Demo中只有一个消费者,而且任务简单并没有在真实情况下进行模拟,比如遇到处理复杂的任务需要处理几秒钟的那种,在这个Demo中将使用Thread.sleep()进行模拟复杂任务的等待情况,并且启动第二个消费者
1-循环调度简单实现
任务队列是要是避免立即执行复杂型任务,并且立即执行,我们需要把它封装成一个任务放到任务队列.后台将执行任务.当运行多个工作进程(消费者)时它们之间可以共享任务
循环调度的有点就是任务可以并行执行,假如任务数量巨大,我们可以添加更多的消费者,扩展起来非常轻松
如下图
1.1-生产者Java端编写
1 |
|
1.2-消费者Java端编写
1 |
|
1.3-效果展示
我们复制一份Work.java的代码,把它当做第二个消费者去共同处理任务,生产者发送消息过后,第一个被调度的消费者处理消息需要等待10秒,再次发送消息不会发送到第一个消费者去,而是发送到第二个消费者去处理
效果如下图
- 可以看出任务是被轮训调度给消费者的
2-消息确认
- 当任务调度到消费者过后,开始处理任务的过程当中消费者因为各种原因下线了会发生什么.如果使用上面的Demo进行测试的话,一旦MQ向消费者发送了消息,消息将立刻标记为删除,在这种情况下如果一个消费者在处理任务的过程中下线了,会丢失刚刚处理的任务,还将丢失发送给这个消费者但是还没开始处理的所有消息
- 我们希望的是当消费者意外下线的情况发送,任务将会交给另外一个消费者进行处理
- 为了保证消息永不丢失,MQ支持 message acknowledgments(消息确认),消费者发回ack(nowledgement) 告诉MQ已收到,处理了消息MQ可以自由删除它
- 如果消费者意外下线,channel(通道)关闭, connection(连接)关闭,或者TCP连接丢失 而没有发送确认,MQ将理解为消息未完成处理并且将重新放入队列中.如果同时还有其它消费者在线,则会迅速将其发送给其它消费者.这样就可以确保即使消费者下线,消息也不会丢失
- 在默认情况下,Manual message acknowledgments(手动消息确认) 是已经打开的,但是在上面的代码中我们已经关闭了它,使用的autoAck=true ,在真实情况下一旦我们完成的任务,就应该将此标志设置为false并从消费者发送确认
2.1-消息确认Java消费者端代码
- 修改过的消费者代码如下
1 |
|
2.2-效果演示
- 生产者发送消息
- 消费者1接收到消息,但是处理过程中下线
- 任务被重新调度到消费者2进行处理
3-消息持久化(或者叫队列和消息的持久化)
- 上面的例子中确保了即使消费者下线,任务也不会丢失.但是MQ服务停止,我们的任务任然会丢失
- 队列持久化可以通过在声明队列的时候进行持久化队列,如下图(修改生产者代码中的队列声明部分,消费者一样需要修改)
1 | //持久化队列声明,参数 durable 为 true |
- 消息持久化只需要修改生产者的消息发送部分,如下图
1 | //持久化消息声明,修改了BasicProperties参数 |
4-公平派遣
MQ的默认消息调度是轮训的,就像上面的例子,第一个任务给A如果有第二个消息,任务就给B,第三个消息任务就又给A.这样的调度会造成消费者的任务堆积假如在处理复杂任务的情况下,这种情况下可以在消费者中的队列中设置每次只接受一个消息,只有处理完过后才接受第二个消息
在消费者中设置如下
1 | int prefetchCount = 1 ; |
- 消费者如下图:
- 这样的话在任务过多加上任务复杂的情况下可能会造成队列里面任务堆积太多,所以要及时的添加消费者