本文共 7006 字,大约阅读时间需要 23 分钟。
1)、每个消费者监听自己的队列;
2)、生产者将消息发送给broker,然后由交换机x将消息转发到绑定此交换机的每一个队列,每一个绑定到交换机的队列都将接受到消息。
注:broker就是消息中间件的服务节点,一般情况下可以将一个RabbitMQ Broker看作是一台RabbitMQ服务器。
3)、Exchange:交换机,如图中的X。它一方面接受生产者发送的消息,另一方面知道如何处理消息(如:递交给某个特别队列、递交给所有队列、或是将消息丢弃)至于如何处理消息,取决于Exchange的类型。
常用的Exchange类型如下:
Fanout:广播,将消息交给所有绑定到交换机的队列
Direct:定向,将消息交给符合指定routing key的队列
Topic:通配符,将消息交给符合routing pattern(路由模式)的队列
注:exchange交换机只负责转发消息,而不具备存储消息的能力,因此如果没有任何队列与exchange绑定,或者没有符合规则的队列,那么消息会丢失。
第1步、生产者代码如下:
package com.wzy.product;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.wzy.com.wzy.utils.ConnectionUtil;import com.wzy.com.wzy.utils.Constant;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * 发布与订阅模式队列:生产者 * */public class Sub_Pub_Producer { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接 Connection connection= ConnectionUtil.getConn(); //2.创建频道 Channel channel=connection.createChannel(); /** * 3.创建交换机 * 参数1:交换机名称 * 参数2:交换机类型,有fanout、topic、direct、headers * */ channel.exchangeDeclare(Constant.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT); //4.创建队列1 channel.queueDeclare(Constant.SUB_PUB_QUEUE_NAME1,true,false,false,null); //创建队列2 channel.queueDeclare(Constant.SUB_PUB_QUEUE_NAME2,true,false,false,null); //5.将队列绑定到交换机上 channel.queueBind(Constant.SUB_PUB_QUEUE_NAME1,Constant.FANOUT_EXCHANGE,""); channel.queueBind(Constant.SUB_PUB_QUEUE_NAME2,Constant.FANOUT_EXCHANGE,""); //6.发布消息 for(int i=1;i<10;i++){ //消息 String message="嗨,rabbitMQ,发布订阅模式:"+i; channel.basicPublish(Constant.FANOUT_EXCHANGE,"",null,message.getBytes()); System.out.println("已发送消息:"+message); } //关闭资源 channel.close(); connection.close(); }}
执行后,控制台打印结果如下:
第2步、消费者1
package com.wzy.consumer;import com.rabbitmq.client.*;import com.wzy.com.wzy.utils.ConnectionUtil;import com.wzy.com.wzy.utils.Constant;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * 发布订阅队列:消费者1 * */public class Sub_Pub_Consumer1 { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接 Connection connection= ConnectionUtil.getConn(); //2.创建频道 Channel channel=connection.createChannel(); //3.创建交换机 channel.exchangeDeclare(Constant.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT); //4.创建队列 channel.queueDeclare(Constant.SUB_PUB_QUEUE_NAME1,true,false,false,null); //5.队列绑定到交换机 channel.queueBind(Constant.SUB_PUB_QUEUE_NAME1,Constant.FANOUT_EXCHANGE,""); //6.创建消费者,并处理消息 DefaultConsumer consumer=new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //路由key System.out.println("路由key为:"+envelope.getRoutingKey()); //交换机 System.out.println("交换机为:"+envelope.getExchange()); //消息id System.out.println("消息id为:"+envelope.getDeliveryTag()); //收到的消息 System.out.println("消费者1接收到的消息为:"+new String(body,"utf-8")); } }; //监听消息 channel.basicConsume(Constant.SUB_PUB_QUEUE_NAME1,true,consumer); }}
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".SLF4J: Defaulting to no-operation (NOP) logger implementationSLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.路由key为:交换机为:fanout_exchange消息id为:1消费者1接收到的消息为:嗨,rabbitMQ,发布订阅模式:1路由key为:交换机为:fanout_exchange消息id为:2消费者1接收到的消息为:嗨,rabbitMQ,发布订阅模式:2路由key为:交换机为:fanout_exchange消息id为:3消费者1接收到的消息为:嗨,rabbitMQ,发布订阅模式:3路由key为:交换机为:fanout_exchange消息id为:4消费者1接收到的消息为:嗨,rabbitMQ,发布订阅模式:4路由key为:交换机为:fanout_exchange消息id为:5消费者1接收到的消息为:嗨,rabbitMQ,发布订阅模式:5路由key为:交换机为:fanout_exchange消息id为:6消费者1接收到的消息为:嗨,rabbitMQ,发布订阅模式:6路由key为:交换机为:fanout_exchange消息id为:7消费者1接收到的消息为:嗨,rabbitMQ,发布订阅模式:7路由key为:交换机为:fanout_exchange消息id为:8消费者1接收到的消息为:嗨,rabbitMQ,发布订阅模式:8路由key为:交换机为:fanout_exchange消息id为:9消费者1接收到的消息为:嗨,rabbitMQ,发布订阅模式:9
第3步、消费者2
package com.wzy.consumer;import com.rabbitmq.client.*;import com.wzy.com.wzy.utils.ConnectionUtil;import com.wzy.com.wzy.utils.Constant;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * 发布订阅队列:消费者1 * */public class Sub_Pub_Consumer2 { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接 Connection connection= ConnectionUtil.getConn(); //2.创建频道 Channel channel=connection.createChannel(); //3.创建交换机 channel.exchangeDeclare(Constant.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT); //4.创建队列 channel.queueDeclare(Constant.SUB_PUB_QUEUE_NAME2,true,false,false,null); //5.队列绑定到交换机 channel.queueBind(Constant.SUB_PUB_QUEUE_NAME2,Constant.FANOUT_EXCHANGE,""); //6.创建消费者,并处理消息 DefaultConsumer consumer=new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //路由key System.out.println("路由key为:"+envelope.getRoutingKey()); //交换机 System.out.println("交换机为:"+envelope.getExchange()); //消息id System.out.println("消息id为:"+envelope.getDeliveryTag()); //收到的消息 System.out.println("消费者2接收到的消息为:"+new String(body,"utf-8")); } }; //监听消息 channel.basicConsume(Constant.SUB_PUB_QUEUE_NAME2,true,consumer); }}
执行结果:
com.wzy.consumer.Sub_Pub_Consumer2SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".SLF4J: Defaulting to no-operation (NOP) logger implementationSLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.路由key为:交换机为:fanout_exchange消息id为:1消费者2接收到的消息为:嗨,rabbitMQ,发布订阅模式:1路由key为:交换机为:fanout_exchange消息id为:2消费者2接收到的消息为:嗨,rabbitMQ,发布订阅模式:2路由key为:交换机为:fanout_exchange消息id为:3消费者2接收到的消息为:嗨,rabbitMQ,发布订阅模式:3路由key为:交换机为:fanout_exchange消息id为:4消费者2接收到的消息为:嗨,rabbitMQ,发布订阅模式:4路由key为:交换机为:fanout_exchange消息id为:5消费者2接收到的消息为:嗨,rabbitMQ,发布订阅模式:5路由key为:交换机为:fanout_exchange消息id为:6消费者2接收到的消息为:嗨,rabbitMQ,发布订阅模式:6路由key为:交换机为:fanout_exchange消息id为:7消费者2接收到的消息为:嗨,rabbitMQ,发布订阅模式:7路由key为:交换机为:fanout_exchange消息id为:8消费者2接收到的消息为:嗨,rabbitMQ,发布订阅模式:8路由key为:交换机为:fanout_exchange消息id为:9消费者2接收到的消息为:嗨,rabbitMQ,发布订阅模式:9
第4步、测试
启动所有消费者,然后使用生产者发送消息,在每个消费者对应的控制台可以查看到生产者发送的所有消息。达到广播的效果。
在执行完测试代码后,在RabbitMQ的管理后台找到exchanges选项卡,点击fanout_exchange的交换机,可以查看到如下的绑定:
总结:
发布订阅模式和工作队列模式的区别:
1)、工作队列不用定义交换机
发布订阅队列需要定义交换机
2)、工作队列模式的生产方使用面向队列发送消息(底层使用默认交换机)
发布订阅模式的生产方使用面向交换机发送消息
3)、工作队列模式不需要设置队列和交换机的绑定,而实际上工作队列会将队列绑定到默认的交换机上
发布订阅模式需要设置队列和交换机的绑定
4)、工作队列的一个消息被多个消费者竞争
发布订阅队列的一个消息被多个消费者接收
转载地址:http://smuii.baihongyu.com/