博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQ(5)--发布与订阅模式(publish/subscribe)
阅读量:4087 次
发布时间:2019-05-25

本文共 7006 字,大约阅读时间需要 23 分钟。

1)、每个消费者监听自己的队列;

2)、生产者将消息发送给broker,然后由交换机x将消息转发到绑定此交换机的每一个队列,每一个绑定到交换机的队列都将接受到消息。

注:broker就是消息中间件的服务节点,一般情况下可以将一个RabbitMQ Broker看作是一台RabbitMQ服务器。

3)、Exchange:交换机,如图中的X。它一方面接受生产者发送的消息,另一方面知道如何处理消息(如:递交给某个特别队列、递交给所有队列、或是将消息丢弃)至于如何处理消息,取决于Exchange的类型。

常用的Exchange类型如下:

Fanout:广播,将消息交给所有绑定到交换机的队列

Direct:定向,将消息交给符合指定routing key的队列

Topic:通配符,将消息交给符合routing pattern(路由模式)的队列

注:exchange交换机只负责转发消息,而不具备存储消息的能力,因此如果没有任何队列与exchange绑定,或者没有符合规则的队列,那么消息会丢失。

第1步、生产者代码如下:

package com.wzy.product;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.wzy.com.wzy.utils.ConnectionUtil;import com.wzy.com.wzy.utils.Constant;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * 发布与订阅模式队列:生产者 * */public class Sub_Pub_Producer {    public static void main(String[] args) throws IOException, TimeoutException {        //1.创建连接        Connection connection= ConnectionUtil.getConn();        //2.创建频道        Channel channel=connection.createChannel();        /**         * 3.创建交换机         * 参数1:交换机名称         * 参数2:交换机类型,有fanout、topic、direct、headers         * */        channel.exchangeDeclare(Constant.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);        //4.创建队列1        channel.queueDeclare(Constant.SUB_PUB_QUEUE_NAME1,true,false,false,null);        //创建队列2        channel.queueDeclare(Constant.SUB_PUB_QUEUE_NAME2,true,false,false,null);        //5.将队列绑定到交换机上        channel.queueBind(Constant.SUB_PUB_QUEUE_NAME1,Constant.FANOUT_EXCHANGE,"");        channel.queueBind(Constant.SUB_PUB_QUEUE_NAME2,Constant.FANOUT_EXCHANGE,"");        //6.发布消息        for(int i=1;i<10;i++){            //消息            String message="嗨,rabbitMQ,发布订阅模式:"+i;            channel.basicPublish(Constant.FANOUT_EXCHANGE,"",null,message.getBytes());            System.out.println("已发送消息:"+message);        }        //关闭资源        channel.close();        connection.close();    }}

执行后,控制台打印结果如下:

第2步、消费者1

package com.wzy.consumer;import com.rabbitmq.client.*;import com.wzy.com.wzy.utils.ConnectionUtil;import com.wzy.com.wzy.utils.Constant;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * 发布订阅队列:消费者1 * */public class Sub_Pub_Consumer1 {    public static void main(String[] args) throws IOException, TimeoutException {        //1.创建连接        Connection connection= ConnectionUtil.getConn();        //2.创建频道        Channel channel=connection.createChannel();        //3.创建交换机        channel.exchangeDeclare(Constant.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);        //4.创建队列        channel.queueDeclare(Constant.SUB_PUB_QUEUE_NAME1,true,false,false,null);        //5.队列绑定到交换机        channel.queueBind(Constant.SUB_PUB_QUEUE_NAME1,Constant.FANOUT_EXCHANGE,"");        //6.创建消费者,并处理消息        DefaultConsumer consumer=new DefaultConsumer(channel){            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                //路由key                System.out.println("路由key为:"+envelope.getRoutingKey());                //交换机                System.out.println("交换机为:"+envelope.getExchange());                //消息id                System.out.println("消息id为:"+envelope.getDeliveryTag());                //收到的消息                System.out.println("消费者1接收到的消息为:"+new String(body,"utf-8"));            }        };        //监听消息        channel.basicConsume(Constant.SUB_PUB_QUEUE_NAME1,true,consumer);    }}
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".SLF4J: Defaulting to no-operation (NOP) logger implementationSLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.路由key为:交换机为:fanout_exchange消息id为:1消费者1接收到的消息为:嗨,rabbitMQ,发布订阅模式:1路由key为:交换机为:fanout_exchange消息id为:2消费者1接收到的消息为:嗨,rabbitMQ,发布订阅模式:2路由key为:交换机为:fanout_exchange消息id为:3消费者1接收到的消息为:嗨,rabbitMQ,发布订阅模式:3路由key为:交换机为:fanout_exchange消息id为:4消费者1接收到的消息为:嗨,rabbitMQ,发布订阅模式:4路由key为:交换机为:fanout_exchange消息id为:5消费者1接收到的消息为:嗨,rabbitMQ,发布订阅模式:5路由key为:交换机为:fanout_exchange消息id为:6消费者1接收到的消息为:嗨,rabbitMQ,发布订阅模式:6路由key为:交换机为:fanout_exchange消息id为:7消费者1接收到的消息为:嗨,rabbitMQ,发布订阅模式:7路由key为:交换机为:fanout_exchange消息id为:8消费者1接收到的消息为:嗨,rabbitMQ,发布订阅模式:8路由key为:交换机为:fanout_exchange消息id为:9消费者1接收到的消息为:嗨,rabbitMQ,发布订阅模式:9

第3步、消费者2

package com.wzy.consumer;import com.rabbitmq.client.*;import com.wzy.com.wzy.utils.ConnectionUtil;import com.wzy.com.wzy.utils.Constant;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * 发布订阅队列:消费者1 * */public class Sub_Pub_Consumer2 {    public static void main(String[] args) throws IOException, TimeoutException {        //1.创建连接        Connection connection= ConnectionUtil.getConn();        //2.创建频道        Channel channel=connection.createChannel();        //3.创建交换机        channel.exchangeDeclare(Constant.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);        //4.创建队列        channel.queueDeclare(Constant.SUB_PUB_QUEUE_NAME2,true,false,false,null);        //5.队列绑定到交换机        channel.queueBind(Constant.SUB_PUB_QUEUE_NAME2,Constant.FANOUT_EXCHANGE,"");        //6.创建消费者,并处理消息        DefaultConsumer consumer=new DefaultConsumer(channel){            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                //路由key                System.out.println("路由key为:"+envelope.getRoutingKey());                //交换机                System.out.println("交换机为:"+envelope.getExchange());                //消息id                System.out.println("消息id为:"+envelope.getDeliveryTag());                //收到的消息                System.out.println("消费者2接收到的消息为:"+new String(body,"utf-8"));            }        };        //监听消息        channel.basicConsume(Constant.SUB_PUB_QUEUE_NAME2,true,consumer);    }}

执行结果:

com.wzy.consumer.Sub_Pub_Consumer2SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".SLF4J: Defaulting to no-operation (NOP) logger implementationSLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.路由key为:交换机为:fanout_exchange消息id为:1消费者2接收到的消息为:嗨,rabbitMQ,发布订阅模式:1路由key为:交换机为:fanout_exchange消息id为:2消费者2接收到的消息为:嗨,rabbitMQ,发布订阅模式:2路由key为:交换机为:fanout_exchange消息id为:3消费者2接收到的消息为:嗨,rabbitMQ,发布订阅模式:3路由key为:交换机为:fanout_exchange消息id为:4消费者2接收到的消息为:嗨,rabbitMQ,发布订阅模式:4路由key为:交换机为:fanout_exchange消息id为:5消费者2接收到的消息为:嗨,rabbitMQ,发布订阅模式:5路由key为:交换机为:fanout_exchange消息id为:6消费者2接收到的消息为:嗨,rabbitMQ,发布订阅模式:6路由key为:交换机为:fanout_exchange消息id为:7消费者2接收到的消息为:嗨,rabbitMQ,发布订阅模式:7路由key为:交换机为:fanout_exchange消息id为:8消费者2接收到的消息为:嗨,rabbitMQ,发布订阅模式:8路由key为:交换机为:fanout_exchange消息id为:9消费者2接收到的消息为:嗨,rabbitMQ,发布订阅模式:9

第4步、测试

启动所有消费者,然后使用生产者发送消息,在每个消费者对应的控制台可以查看到生产者发送的所有消息。达到广播的效果。

在执行完测试代码后,在RabbitMQ的管理后台找到exchanges选项卡,点击fanout_exchange的交换机,可以查看到如下的绑定:

总结:

发布订阅模式和工作队列模式的区别:

1)、工作队列不用定义交换机

          发布订阅队列需要定义交换机

2)、工作队列模式的生产方使用面向队列发送消息(底层使用默认交换机)

         发布订阅模式的生产方使用面向交换机发送消息

3)、工作队列模式不需要设置队列和交换机的绑定,而实际上工作队列会将队列绑定到默认的交换机上

        发布订阅模式需要设置队列和交换机的绑定

4)、工作队列的一个消息被多个消费者竞争

         发布订阅队列的一个消息被多个消费者接收

转载地址:http://smuii.baihongyu.com/

你可能感兴趣的文章
DirectX11 平行光
查看>>
DirectX11 点光
查看>>
DirectX11 聚光灯
查看>>
DirectX11 HLSL打包(packing)格式和“pad”变量的必要性
查看>>
DirectX11 光照演示示例Demo
查看>>
漫谈一下前端的可视化技术
查看>>
VUe+webpack构建单页router应用(一)
查看>>
Vue+webpack构建单页router应用(二)
查看>>
从头开始讲Node.js——异步与事件驱动
查看>>
Node.js-模块和包
查看>>
Node.js核心模块
查看>>
express的应用
查看>>
NodeJS开发指南——mongoDB、Session
查看>>
Express: Can’t set headers after they are sent.
查看>>
2017年,这一次我们不聊技术
查看>>
实现接口创建线程
查看>>
Java对象序列化与反序列化(1)
查看>>
HTML5的表单验证实例
查看>>
JavaScript入门笔记:全选功能的实现
查看>>
程序设计方法概述:从面相对象到面向功能到面向对象
查看>>