博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Rabbitmq之高级特性——实现消费端限流&NACK重回队列
阅读量:4708 次
发布时间:2019-06-10

本文共 12617 字,大约阅读时间需要 42 分钟。

  如果是高并发下,rabbitmq服务器上收到成千上万条消息,那么当打开消费端时,这些消息必定喷涌而来,导致消费端消费不过来甚至挂掉都有可能。

在非自动确认的模式下,可以采用限流模式,rabbitmq 提供了服务质量保障qos机制来控制一次消费消息数量。

下面直接上代码:

生产端:

1 package com.zxy.demo.rabbitmq; 2  3 import java.io.IOException; 4 import java.util.concurrent.TimeoutException; 5  6 import com.rabbitmq.client.AMQP; 7 import com.rabbitmq.client.Channel; 8 import com.rabbitmq.client.ConfirmListener; 9 import com.rabbitmq.client.Connection;10 import com.rabbitmq.client.ConnectionFactory;11 import com.rabbitmq.client.ReturnListener;12 import com.rabbitmq.client.AMQP.BasicProperties;13 14 public class Producter {15 16     public static void main(String[] args) throws IOException, TimeoutException {17         // TODO Auto-generated method stub18         ConnectionFactory factory = new ConnectionFactory();19         factory.setHost("192.168.10.110");20         factory.setPort(5672);21         factory.setUsername("guest");22         factory.setPassword("guest");23         factory.setVirtualHost("/");24         Connection conn = factory.newConnection();25         Channel channel = conn.createChannel();26         String exchange001 = "exchange_001";27         String queue001 = "queue_001";28         String routingkey = "mq.topic";29         String body = "hello rabbitmq!===============限流策略";30 //        开启确认模式31         channel.confirmSelect();32 //        循环发送多条消息        33         for(int i = 0 ;i<10;i++){34         channel.basicPublish(exchange001, routingkey, null, body.getBytes());35     }36         37 //        添加一个返回监听========消息返回模式重要添加38         channel.addConfirmListener(new ConfirmListener() {39             40             @Override41             public void handleNack(long deliveryTag, boolean multiple) throws IOException {42                 System.out.println("===========NACK============");43                 44             }45             46             @Override47             public void handleAck(long deliveryTag, boolean multiple) throws IOException {48                 System.out.println("===========ACK============");49                 50             }51         });52     }53 54 }

 

消费端:

1 package com.zxy.demo.rabbitmq; 2  3 import java.io.IOException; 4 import java.util.concurrent.TimeoutException; 5  6 import com.rabbitmq.client.Channel; 7 import com.rabbitmq.client.Connection; 8 import com.rabbitmq.client.ConnectionFactory; 9 10 public class Receiver {11 12     public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {13         // TODO Auto-generated method stub14         ConnectionFactory factory = new ConnectionFactory();15         factory.setHost("192.168.10.110");16         factory.setPort(5672);17         factory.setUsername("guest");18         factory.setPassword("guest");19         factory.setVirtualHost("/");20         Connection conn = factory.newConnection();21         Channel channel = conn.createChannel();22         String exchange001 = "exchange_001";23         String queue001 = "queue_001";24         String routingkey = "mq.*";25         channel.exchangeDeclare(exchange001, "topic", true, false, null);26         channel.queueDeclare(queue001, true, false, false, null);27         channel.queueBind(queue001, exchange001, routingkey);28 //        设置限流策略29 //        channel.basicQos(获取消息最大数[0-无限制], 依次获取数量, 作用域[true作用于整个channel,false作用于具体消费者]);30         channel.basicQos(0, 2, false);31 //        自定义消费者32         MyConsumer myConsumer = new MyConsumer(channel);33 //        进行消费,签收模式一定要为手动签收34         Thread.sleep(3000);35         channel.basicConsume(queue001, false, myConsumer);36     }37 38 }

 

自定义消费者:

1 package com.zxy.demo.rabbitmq; 2  3 import java.io.IOException; 4  5 import com.rabbitmq.client.AMQP.BasicProperties; 6 import com.rabbitmq.client.Channel; 7 import com.rabbitmq.client.DefaultConsumer; 8 import com.rabbitmq.client.Envelope; 9 10 /**11  * 可以继承,可以实现,实现的话要覆写的方法比较多,所以这里用了继承12  *13  */14 public class MyConsumer extends DefaultConsumer{15     private Channel channel;16     public MyConsumer(Channel channel) {17         super(channel);18         // TODO Auto-generated constructor stub19         this.channel=channel;20     }21 22     @Override23     public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)24             throws IOException {25         System.out.println("消费标签:"+consumerTag);26         System.out.println("envelope.getDeliveryTag():==="+envelope.getDeliveryTag());27         System.out.println("envelope.getExchange():==="+envelope.getExchange());28         System.out.println("envelope.getRoutingKey():==="+envelope.getRoutingKey());29         System.out.println("body:==="+new String(body));30 //        手动签收,一定要有消费者签收,如果没有如下代码,则限流模式下,仅能打印出来channel.basicQos(0, 2, false);第二参数的2条信息31         channel.basicAck(envelope.getDeliveryTag(), false);32     }33     34 35 }

 重回队列模式,是当投递消息失败时,让该消息重新回到队列的模式,该模式需要手动签收,并需要在消费者中进行判断,调用重回队列的确认模式

代码如下

生产端:

1 package com.zxy.demo.rabbitmq; 2  3 import java.io.IOException; 4 import java.util.HashMap; 5 import java.util.Map; 6 import java.util.concurrent.TimeoutException; 7  8 import org.springframework.amqp.core.Message; 9 10 import com.rabbitmq.client.AMQP.BasicProperties;11 import com.rabbitmq.client.Channel;12 import com.rabbitmq.client.ConfirmListener;13 import com.rabbitmq.client.Connection;14 import com.rabbitmq.client.ConnectionFactory;15 import com.rabbitmq.client.ReturnListener;16 17 public class Producter {18 19     public static void main(String[] args) throws IOException, TimeoutException {20         // TODO Auto-generated method stub21         ConnectionFactory factory = new ConnectionFactory();22         factory.setHost("192.168.10.110");23         factory.setPort(5672);24         factory.setUsername("guest");25         factory.setPassword("guest");26         factory.setVirtualHost("/");27         Connection conn = factory.newConnection();28         Channel channel = conn.createChannel();29         String exchange001 = "exchange_001";30         String queue001 = "queue_001";31         String routingkey = "mq.topic";32         33 //        循环发送多条消息        34         for(int i = 0 ;i<5;i++){35             String body = "hello rabbitmq!===============ACK&重回队列,第"+i+"条";36             Map
head = new HashMap<>();37 head.put("n", i);38 BasicProperties properties = new BasicProperties(null, "utf-8", head, 2, 1, null, null, null, null, null, null, null, null, null);39 40 channel.basicPublish(exchange001, routingkey, properties, body.getBytes());41 }42 43 }44 45 }

 

消费端:

1 package com.zxy.demo.rabbitmq; 2  3 import java.io.IOException; 4 import java.util.concurrent.TimeoutException; 5  6 import com.rabbitmq.client.Channel; 7 import com.rabbitmq.client.Connection; 8 import com.rabbitmq.client.ConnectionFactory; 9 10 public class Receiver {11 12     public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {13         // TODO Auto-generated method stub14         ConnectionFactory factory = new ConnectionFactory();15         factory.setHost("192.168.10.110");16         factory.setPort(5672);17         factory.setUsername("guest");18         factory.setPassword("guest");19         factory.setVirtualHost("/");20         Connection conn = factory.newConnection();21         Channel channel = conn.createChannel();22         String exchange001 = "exchange_001";23         String queue001 = "queue_001";24         String routingkey = "mq.*";25         channel.exchangeDeclare(exchange001, "topic", true, false, null);26         channel.queueDeclare(queue001, true, false, false, null);27         channel.queueBind(queue001, exchange001, routingkey);28 //        自定义消费者29         MyConsumer myConsumer = new MyConsumer(channel);30 //        进行消费,签收模式一定要为手动签收31         channel.basicConsume(queue001, false, myConsumer);32     }33 34 }

 

自定义消费者:

1 package com.zxy.demo.rabbitmq; 2  3 import java.io.IOException; 4  5 import com.rabbitmq.client.AMQP.BasicProperties; 6 import com.rabbitmq.client.Channel; 7 import com.rabbitmq.client.DefaultConsumer; 8 import com.rabbitmq.client.Envelope; 9 10 /**11  * 可以继承,可以实现,实现的话要覆写的方法比较多,所以这里用了继承12  *13  */14 public class MyConsumer extends DefaultConsumer{15     private Channel channel;16     public MyConsumer(Channel channel) {17         super(channel);18         // TODO Auto-generated constructor stub19         this.channel=channel;20     }21 22     @Override23     public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)24             throws IOException {25         System.out.println("消费标签:"+consumerTag);26         System.out.println("envelope.getDeliveryTag():==="+envelope.getDeliveryTag());27         System.out.println("envelope.getExchange():==="+envelope.getExchange());28         System.out.println("envelope.getRoutingKey():==="+envelope.getRoutingKey());29         System.out.println("body:==="+new String(body));30         System.out.println("===================休眠以便查看===============");31         try {32             Thread.sleep(2000);33         } catch (InterruptedException e) {34             // TODO Auto-generated catch block35             e.printStackTrace();36         }37 //        手动签收38         Integer i = (Integer) properties.getHeaders().get("n");39         System.out.println("iiiiiiiiiiiiiiiii======================================================"+i);40         if(i==1) {41             channel.basicNack(envelope.getDeliveryTag(),false, true);//第三个参数为是否重返队列42         }else {43             channel.basicAck(envelope.getDeliveryTag(), false);    44         }45     }46     47 48 }

下面是重回队列执行结果,可以看到当消费完后第一条不断的被扔回队列然后消费再扔回。

1 消费标签:amq.ctag-eeeaa9jgrJ3tDhtQR2XReg 2 envelope.getDeliveryTag():===1 3 envelope.getExchange():===exchange_001 4 envelope.getRoutingKey():===mq.topic 5 body:===hello rabbitmq!===============ACK&重回队列,第0条 6 ===================休眠以便查看=============== 7 iiiiiiiiiiiiiiiii======================================================0 8 消费标签:amq.ctag-eeeaa9jgrJ3tDhtQR2XReg 9 envelope.getDeliveryTag():===210 envelope.getExchange():===exchange_00111 envelope.getRoutingKey():===mq.topic12 body:===hello rabbitmq!===============ACK&重回队列,第1条13 ===================休眠以便查看===============14 iiiiiiiiiiiiiiiii======================================================115 消费标签:amq.ctag-eeeaa9jgrJ3tDhtQR2XReg16 envelope.getDeliveryTag():===317 envelope.getExchange():===exchange_00118 envelope.getRoutingKey():===mq.topic19 body:===hello rabbitmq!===============ACK&重回队列,第2条20 ===================休眠以便查看===============21 iiiiiiiiiiiiiiiii======================================================222 消费标签:amq.ctag-eeeaa9jgrJ3tDhtQR2XReg23 envelope.getDeliveryTag():===424 envelope.getExchange():===exchange_00125 envelope.getRoutingKey():===mq.topic26 body:===hello rabbitmq!===============ACK&重回队列,第3条27 ===================休眠以便查看===============28 iiiiiiiiiiiiiiiii======================================================329 消费标签:amq.ctag-eeeaa9jgrJ3tDhtQR2XReg30 envelope.getDeliveryTag():===531 envelope.getExchange():===exchange_00132 envelope.getRoutingKey():===mq.topic33 body:===hello rabbitmq!===============ACK&重回队列,第4条34 ===================休眠以便查看===============35 iiiiiiiiiiiiiiiii======================================================436 消费标签:amq.ctag-eeeaa9jgrJ3tDhtQR2XReg37 envelope.getDeliveryTag():===638 envelope.getExchange():===exchange_00139 envelope.getRoutingKey():===mq.topic40 body:===hello rabbitmq!===============ACK&重回队列,第1条41 ===================休眠以便查看===============42 iiiiiiiiiiiiiiiii======================================================143 消费标签:amq.ctag-eeeaa9jgrJ3tDhtQR2XReg44 envelope.getDeliveryTag():===745 envelope.getExchange():===exchange_00146 envelope.getRoutingKey():===mq.topic47 body:===hello rabbitmq!===============ACK&重回队列,第1条48 ===================休眠以便查看===============49 iiiiiiiiiiiiiiiii======================================================150 消费标签:amq.ctag-eeeaa9jgrJ3tDhtQR2XReg51 envelope.getDeliveryTag():===852 envelope.getExchange():===exchange_00153 envelope.getRoutingKey():===mq.topic54 body:===hello rabbitmq!===============ACK&重回队列,第1条55 ===================休眠以便查看===============56 iiiiiiiiiiiiiiiii======================================================157 消费标签:amq.ctag-eeeaa9jgrJ3tDhtQR2XReg58 envelope.getDeliveryTag():===959 envelope.getExchange():===exchange_00160 envelope.getRoutingKey():===mq.topic61 body:===hello rabbitmq!===============ACK&重回队列,第1条62 ===================休眠以便查看===============
View Code

 

转载于:https://www.cnblogs.com/xiaoyao-001/p/9608665.html

你可能感兴趣的文章
IT常用单词
查看>>
拓扑排序
查看>>
NYOJ--32--SEARCH--组合数
查看>>
gulpfile 压缩模板
查看>>
【34.14%】【BZOJ 3110】 [Zjoi2013]K大数查询
查看>>
【 henuacm2016级暑期训练-动态规划专题 A 】Cards
查看>>
第五篇:白话tornado源码之褪去模板的外衣
查看>>
设备常用框架framework
查看>>
bootstrap模态框和select2合用时input无法获取焦点(转)
查看>>
MockObject
查看>>
BZOJ4516: [Sdoi2016]生成魔咒(后缀自动机)
查看>>
查看手机已经记住的WIFI密码
查看>>
最新版IntelliJ IDEA2019 破解教程(2019.08.07-情人节更新)
查看>>
C# 两个datatable中的数据快速比较返回交集或差集
查看>>
关于oracle样例数据库emp、dept、salgrade的mysql脚本复杂查询分析
查看>>
adb shell am 的用法
查看>>
iOS10 UI教程视图和子视图的可见性
查看>>
FindChildControl与FindComponent
查看>>
中国城市json
查看>>
android下载手动下载Android SDK
查看>>