博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
白话RabbitMQ(四): 建立路由
阅读量:6912 次
发布时间:2019-06-27

本文共 3836 字,大约阅读时间需要 12 分钟。

推广

RabbitMQ专题讲座

CoolMQ开源项目

我们利用消息队列实现了分布式事务的最终一致性解决方案,请大家围观。可以参考源码:…,项目支持网站: ,最新文章或实现会更新在上面

前言

在中我们建立了一个简单的日志系统,从而将log消息广播给一些消费者。这章我们会在此基础上加入一些新的特性-我们将有针对性的进行消息分发,比如,只把错误(error)消息保存到磁盘,与此同时,打印出所有的消息。

绑定

我们在前面的例子中,绑定是这么来做的

channel.queueBind(queueName, EXCHANGE_NAME, "");

绑定是建立交换机和队列之间的一种联系:队列会接受交换机中的消息。绑定可以用一个路由键来指明,为了与basic_publish区分开,我们称之为绑定键(binding key):

channel.queueBind(queueName, EXCHANGE_NAME, "black");

绑定键跟路由器类型也有关系,我们之前用的广播路由器,会忽略掉这个值

直达交换机(Direct Exchange)

之前我们用的是广播交换机,会将消息发送给所有的消费者。这里我们希望通过log的严重程度进行过滤,例如只有严重的错误才会写入到磁盘,而warn和info消息就不用了,以此来节省磁盘空间

而广播交换机没法满足这个需求-它只是无脑的发送消息。所以我们会使用直达交换机(Direct Exchange)- 消息会通过所绑定的键来发送给对应的队列,可以看如下这幅图

clipboard.png

如上图所示,直达交换机X绑定了两个队列,C1是通过orange来绑定,而C2是通过black和green绑定。因此,发送到路由键orange的消息会发送给队列Q1,发送到路由键black或者green的消息会发送给Q2,其它的消息将被丢弃。

多项绑定

clipboard.png

当然,多个队列绑定到一个键上也是合法的,在这种情况下,直达交换机将会将消息发送给所有的队列,就像广播交换机一样,如上图所示,一个键为black的消息将会同时被发送给C1和C2.

我们首先需要创建一个直达路由器

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

并发送消息到这个路由器

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

上面我们是发送给'severity',简单起见,假设有下列几种日志类型'severity' ,'info', 'warning', 'error'.

订阅消息(Subscribing)

接受消息跟之前一样,但有一点不同,我们提供了一个binding key,

String queueName = channel.queueDeclare().getQueue();for(String severity : argv){  channel.queueBind(queueName, EXCHANGE_NAME, severity);}

整合

clipboard.png

将上面的所有代码整合到一起

EmitLogDirect.java

import com.rabbitmq.client.*;import java.io.IOException;public class EmitLogDirect {    private static final String EXCHANGE_NAME = "direct_logs";    public static void main(String[] argv)                  throws java.io.IOException {        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("localhost");        Connection connection = factory.newConnection();        Channel channel = connection.createChannel();        channel.exchangeDeclare(EXCHANGE_NAME, "direct");        String severity = getSeverity(argv);        String message = getMessage(argv);        channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());        System.out.println(" [x] Sent '" + severity + "':'" + message + "'");        channel.close();        connection.close();    }    //..}

ReceiveLogsDirect.java

import com.rabbitmq.client.*;import java.io.IOException;public class ReceiveLogsDirect {  private static final String EXCHANGE_NAME = "direct_logs";  public static void main(String[] argv) throws Exception {    ConnectionFactory factory = new ConnectionFactory();    factory.setHost("localhost");    Connection connection = factory.newConnection();    Channel channel = connection.createChannel();    channel.exchangeDeclare(EXCHANGE_NAME, "direct");    String queueName = channel.queueDeclare().getQueue();    if (argv.length < 1){      System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");      System.exit(1);    }    for(String severity : argv){      channel.queueBind(queueName, EXCHANGE_NAME, severity);    }    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");    Consumer consumer = new DefaultConsumer(channel) {      @Override      public void handleDelivery(String consumerTag, Envelope envelope,                                 AMQP.BasicProperties properties, byte[] body) throws IOException {        String message = new String(body, "UTF-8");        System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");      }    };    channel.basicConsume(queueName, true, consumer);  }}

编译

javac -cp $CP ReceiveLogsDirect.java EmitLogDirect.java

只保存warning和error的消息到磁盘上

java -cp $CP ReceiveLogsDirect warning error > logs_from_rabbit.log

将所有的消息打印到频幕上

java -cp $CP ReceiveLogsDirect info warning error# => [*] Waiting for logs. To exit press CTRL+C

最后,发送error消息

java -cp $CP EmitLogDirect error "Run. Run. Or it will explode."# => [x] Sent 'error':'Run. Run. Or it will explode.'

好了,这一章就到这儿,下一章我们将讲述如何基于特定模式进行监听

转载地址:http://mmicl.baihongyu.com/

你可能感兴趣的文章
LNMP之memcached实现tomcat群集(三)
查看>>
我的友情链接
查看>>
Android IPC进程间通讯机制
查看>>
无损音乐资源
查看>>
对SpringAop的思考之基于cglib的动态代理
查看>>
Linux5.3双网卡绑定虚拟成一块网卡
查看>>
轻松获取格林尼治Linux时间戳
查看>>
java 执行cmd、shell 、exe 返回结果
查看>>
linux之iptables详解及配置(一)
查看>>
struts2 通过action返回json
查看>>
DHCP
查看>>
Ubuntu 升级错误信息:mount: mounting none on /dev fail...
查看>>
symantec Desktop and Laptop Option 桌面备份
查看>>
Centos7安装Docker私服Harbor
查看>>
struts2 404处理
查看>>
从LiveJournal后台发展看大规模网站性能优化方法
查看>>
apache的工作模式
查看>>
Linux 常用命令
查看>>
python 中的if __name__ == 'main':
查看>>
各网站平台API接口整理
查看>>