如果是高并发下,rabbitmq服务器上收到成千上万条消息,那么当打开消费端时,这些消息必定喷涌而来,导致消费端消费不过来甚至挂掉都有可能。
在非自动确认的模式下,可以采用限流模式,rabbitmq 提供了服务质量保障qos机制来控制一次消费消息数量。
下面直接上代码:
生产端:
1 package com.zxy.demo.rabbitmq; 2 3 import java.io.IOException; 4 import java.util.concurrent.TimeoutException; 5 6 import com.rabbitmq.client.AMQP; 7 import com.rabbitmq.client.Channel; 8 import com.rabbitmq.client.ConfirmListener; 9 import com.rabbitmq.client.Connection;10 import com.rabbitmq.client.ConnectionFactory;11 import com.rabbitmq.client.ReturnListener;12 import com.rabbitmq.client.AMQP.BasicProperties;13 14 public class Producter {15 16 public static void main(String[] args) throws IOException, TimeoutException {17 // TODO Auto-generated method stub18 ConnectionFactory factory = new ConnectionFactory();19 factory.setHost("192.168.10.110");20 factory.setPort(5672);21 factory.setUsername("guest");22 factory.setPassword("guest");23 factory.setVirtualHost("/");24 Connection conn = factory.newConnection();25 Channel channel = conn.createChannel();26 String exchange001 = "exchange_001";27 String queue001 = "queue_001";28 String routingkey = "mq.topic";29 String body = "hello rabbitmq!===============限流策略";30 // 开启确认模式31 channel.confirmSelect();32 // 循环发送多条消息 33 for(int i = 0 ;i<10;i++){34 channel.basicPublish(exchange001, routingkey, null, body.getBytes());35 }36 37 // 添加一个返回监听========消息返回模式重要添加38 channel.addConfirmListener(new ConfirmListener() {39 40 @Override41 public void handleNack(long deliveryTag, boolean multiple) throws IOException {42 System.out.println("===========NACK============");43 44 }45 46 @Override47 public void handleAck(long deliveryTag, boolean multiple) throws IOException {48 System.out.println("===========ACK============");49 50 }51 });52 }53 54 }
消费端:
1 package com.zxy.demo.rabbitmq; 2 3 import java.io.IOException; 4 import java.util.concurrent.TimeoutException; 5 6 import com.rabbitmq.client.Channel; 7 import com.rabbitmq.client.Connection; 8 import com.rabbitmq.client.ConnectionFactory; 9 10 public class Receiver {11 12 public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {13 // TODO Auto-generated method stub14 ConnectionFactory factory = new ConnectionFactory();15 factory.setHost("192.168.10.110");16 factory.setPort(5672);17 factory.setUsername("guest");18 factory.setPassword("guest");19 factory.setVirtualHost("/");20 Connection conn = factory.newConnection();21 Channel channel = conn.createChannel();22 String exchange001 = "exchange_001";23 String queue001 = "queue_001";24 String routingkey = "mq.*";25 channel.exchangeDeclare(exchange001, "topic", true, false, null);26 channel.queueDeclare(queue001, true, false, false, null);27 channel.queueBind(queue001, exchange001, routingkey);28 // 设置限流策略29 // channel.basicQos(获取消息最大数[0-无限制], 依次获取数量, 作用域[true作用于整个channel,false作用于具体消费者]);30 channel.basicQos(0, 2, false);31 // 自定义消费者32 MyConsumer myConsumer = new MyConsumer(channel);33 // 进行消费,签收模式一定要为手动签收34 Thread.sleep(3000);35 channel.basicConsume(queue001, false, myConsumer);36 }37 38 }
自定义消费者:
1 package com.zxy.demo.rabbitmq; 2 3 import java.io.IOException; 4 5 import com.rabbitmq.client.AMQP.BasicProperties; 6 import com.rabbitmq.client.Channel; 7 import com.rabbitmq.client.DefaultConsumer; 8 import com.rabbitmq.client.Envelope; 9 10 /**11 * 可以继承,可以实现,实现的话要覆写的方法比较多,所以这里用了继承12 *13 */14 public class MyConsumer extends DefaultConsumer{15 private Channel channel;16 public MyConsumer(Channel channel) {17 super(channel);18 // TODO Auto-generated constructor stub19 this.channel=channel;20 }21 22 @Override23 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)24 throws IOException {25 System.out.println("消费标签:"+consumerTag);26 System.out.println("envelope.getDeliveryTag():==="+envelope.getDeliveryTag());27 System.out.println("envelope.getExchange():==="+envelope.getExchange());28 System.out.println("envelope.getRoutingKey():==="+envelope.getRoutingKey());29 System.out.println("body:==="+new String(body));30 // 手动签收,一定要有消费者签收,如果没有如下代码,则限流模式下,仅能打印出来channel.basicQos(0, 2, false);第二参数的2条信息31 channel.basicAck(envelope.getDeliveryTag(), false);32 }33 34 35 }
重回队列模式,是当投递消息失败时,让该消息重新回到队列的模式,该模式需要手动签收,并需要在消费者中进行判断,调用重回队列的确认模式
代码如下
生产端:
1 package com.zxy.demo.rabbitmq; 2 3 import java.io.IOException; 4 import java.util.HashMap; 5 import java.util.Map; 6 import java.util.concurrent.TimeoutException; 7 8 import org.springframework.amqp.core.Message; 9 10 import com.rabbitmq.client.AMQP.BasicProperties;11 import com.rabbitmq.client.Channel;12 import com.rabbitmq.client.ConfirmListener;13 import com.rabbitmq.client.Connection;14 import com.rabbitmq.client.ConnectionFactory;15 import com.rabbitmq.client.ReturnListener;16 17 public class Producter {18 19 public static void main(String[] args) throws IOException, TimeoutException {20 // TODO Auto-generated method stub21 ConnectionFactory factory = new ConnectionFactory();22 factory.setHost("192.168.10.110");23 factory.setPort(5672);24 factory.setUsername("guest");25 factory.setPassword("guest");26 factory.setVirtualHost("/");27 Connection conn = factory.newConnection();28 Channel channel = conn.createChannel();29 String exchange001 = "exchange_001";30 String queue001 = "queue_001";31 String routingkey = "mq.topic";32 33 // 循环发送多条消息 34 for(int i = 0 ;i<5;i++){35 String body = "hello rabbitmq!===============ACK&重回队列,第"+i+"条";36 Maphead = new HashMap<>();37 head.put("n", i);38 BasicProperties properties = new BasicProperties(null, "utf-8", head, 2, 1, null, null, null, null, null, null, null, null, null);39 40 channel.basicPublish(exchange001, routingkey, properties, body.getBytes());41 }42 43 }44 45 }
消费端:
1 package com.zxy.demo.rabbitmq; 2 3 import java.io.IOException; 4 import java.util.concurrent.TimeoutException; 5 6 import com.rabbitmq.client.Channel; 7 import com.rabbitmq.client.Connection; 8 import com.rabbitmq.client.ConnectionFactory; 9 10 public class Receiver {11 12 public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {13 // TODO Auto-generated method stub14 ConnectionFactory factory = new ConnectionFactory();15 factory.setHost("192.168.10.110");16 factory.setPort(5672);17 factory.setUsername("guest");18 factory.setPassword("guest");19 factory.setVirtualHost("/");20 Connection conn = factory.newConnection();21 Channel channel = conn.createChannel();22 String exchange001 = "exchange_001";23 String queue001 = "queue_001";24 String routingkey = "mq.*";25 channel.exchangeDeclare(exchange001, "topic", true, false, null);26 channel.queueDeclare(queue001, true, false, false, null);27 channel.queueBind(queue001, exchange001, routingkey);28 // 自定义消费者29 MyConsumer myConsumer = new MyConsumer(channel);30 // 进行消费,签收模式一定要为手动签收31 channel.basicConsume(queue001, false, myConsumer);32 }33 34 }
自定义消费者:
1 package com.zxy.demo.rabbitmq; 2 3 import java.io.IOException; 4 5 import com.rabbitmq.client.AMQP.BasicProperties; 6 import com.rabbitmq.client.Channel; 7 import com.rabbitmq.client.DefaultConsumer; 8 import com.rabbitmq.client.Envelope; 9 10 /**11 * 可以继承,可以实现,实现的话要覆写的方法比较多,所以这里用了继承12 *13 */14 public class MyConsumer extends DefaultConsumer{15 private Channel channel;16 public MyConsumer(Channel channel) {17 super(channel);18 // TODO Auto-generated constructor stub19 this.channel=channel;20 }21 22 @Override23 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)24 throws IOException {25 System.out.println("消费标签:"+consumerTag);26 System.out.println("envelope.getDeliveryTag():==="+envelope.getDeliveryTag());27 System.out.println("envelope.getExchange():==="+envelope.getExchange());28 System.out.println("envelope.getRoutingKey():==="+envelope.getRoutingKey());29 System.out.println("body:==="+new String(body));30 System.out.println("===================休眠以便查看===============");31 try {32 Thread.sleep(2000);33 } catch (InterruptedException e) {34 // TODO Auto-generated catch block35 e.printStackTrace();36 }37 // 手动签收38 Integer i = (Integer) properties.getHeaders().get("n");39 System.out.println("iiiiiiiiiiiiiiiii======================================================"+i);40 if(i==1) {41 channel.basicNack(envelope.getDeliveryTag(),false, true);//第三个参数为是否重返队列42 }else {43 channel.basicAck(envelope.getDeliveryTag(), false); 44 }45 }46 47 48 }
下面是重回队列执行结果,可以看到当消费完后第一条不断的被扔回队列然后消费再扔回。
1 消费标签:amq.ctag-eeeaa9jgrJ3tDhtQR2XReg 2 envelope.getDeliveryTag():===1 3 envelope.getExchange():===exchange_001 4 envelope.getRoutingKey():===mq.topic 5 body:===hello rabbitmq!===============ACK&重回队列,第0条 6 ===================休眠以便查看=============== 7 iiiiiiiiiiiiiiiii======================================================0 8 消费标签:amq.ctag-eeeaa9jgrJ3tDhtQR2XReg 9 envelope.getDeliveryTag():===210 envelope.getExchange():===exchange_00111 envelope.getRoutingKey():===mq.topic12 body:===hello rabbitmq!===============ACK&重回队列,第1条13 ===================休眠以便查看===============14 iiiiiiiiiiiiiiiii======================================================115 消费标签:amq.ctag-eeeaa9jgrJ3tDhtQR2XReg16 envelope.getDeliveryTag():===317 envelope.getExchange():===exchange_00118 envelope.getRoutingKey():===mq.topic19 body:===hello rabbitmq!===============ACK&重回队列,第2条20 ===================休眠以便查看===============21 iiiiiiiiiiiiiiiii======================================================222 消费标签:amq.ctag-eeeaa9jgrJ3tDhtQR2XReg23 envelope.getDeliveryTag():===424 envelope.getExchange():===exchange_00125 envelope.getRoutingKey():===mq.topic26 body:===hello rabbitmq!===============ACK&重回队列,第3条27 ===================休眠以便查看===============28 iiiiiiiiiiiiiiiii======================================================329 消费标签:amq.ctag-eeeaa9jgrJ3tDhtQR2XReg30 envelope.getDeliveryTag():===531 envelope.getExchange():===exchange_00132 envelope.getRoutingKey():===mq.topic33 body:===hello rabbitmq!===============ACK&重回队列,第4条34 ===================休眠以便查看===============35 iiiiiiiiiiiiiiiii======================================================436 消费标签:amq.ctag-eeeaa9jgrJ3tDhtQR2XReg37 envelope.getDeliveryTag():===638 envelope.getExchange():===exchange_00139 envelope.getRoutingKey():===mq.topic40 body:===hello rabbitmq!===============ACK&重回队列,第1条41 ===================休眠以便查看===============42 iiiiiiiiiiiiiiiii======================================================143 消费标签:amq.ctag-eeeaa9jgrJ3tDhtQR2XReg44 envelope.getDeliveryTag():===745 envelope.getExchange():===exchange_00146 envelope.getRoutingKey():===mq.topic47 body:===hello rabbitmq!===============ACK&重回队列,第1条48 ===================休眠以便查看===============49 iiiiiiiiiiiiiiiii======================================================150 消费标签:amq.ctag-eeeaa9jgrJ3tDhtQR2XReg51 envelope.getDeliveryTag():===852 envelope.getExchange():===exchange_00153 envelope.getRoutingKey():===mq.topic54 body:===hello rabbitmq!===============ACK&重回队列,第1条55 ===================休眠以便查看===============56 iiiiiiiiiiiiiiiii======================================================157 消费标签:amq.ctag-eeeaa9jgrJ3tDhtQR2XReg58 envelope.getDeliveryTag():===959 envelope.getExchange():===exchange_00160 envelope.getRoutingKey():===mq.topic61 body:===hello rabbitmq!===============ACK&重回队列,第1条62 ===================休眠以便查看===============