`
zhchx0827
  • 浏览: 191545 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

RabbitMQ入门学习——Topics(主题)

 
阅读更多

 

在前面的章节中,我们改善了我们的日志代码。我们使用direct直播代替fanout广播,并且可以选择性恶接受日志。尽管使用direct直播改善了我们的日志代码,但是它还有许多局限性。比如:不能给予多种标准来路由。

在我们的日志系统中,我们也许即希望按照日志的严重程度来订阅,也希望按照日志的来源定于。你也许知道unix syslog工具的概念,它是给予严重程度和设备来路由日志的。这将给我们许多的灵活性——我们也许既希望监听严重的错误,同时也不希望错过来自kern的日志。为了实现这种目的,我们需要学习一种更复杂的交换区——topic

1Topic exchange(主题交换区)

发送到topic交换区的消息不能使用随意的routing_key,它必须一串通过点分隔的单词。可以使任意的单词,但是它们通常都与消息有一定的关联。比如一些有效的routing key"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。在routing key中可以包含任意多个单词,只要它们不超过255个字节。

   Binding key必须要有相同的形式。Topic交换区和direct有些类似——它们都通过一个特定的routing key传递给多个binding key相匹配的队列。对于binding key有两点非常重要:

*可以代替一个单词

#可以代替0个或多个单词

下图是一个最简单的例子

在这个例子中,我们要发送的消息全部是用来描述动物的。该消息将会被路由到一个包含三个单词的routing key(两个点分隔)Routing key中的第一个单词表示速度,第二个表示颜色,第三个种类:“<speed>.<color>.<species>”。

我们创建了三个bindingQ1通过“*.orange.*”绑定,Q2通过“*.*.rabbit”和“lazy.#”绑定。

这三种绑定可以描述为:

Q1对所有黄颜色的动物感兴趣

Q2对所有的兔子和比较懒的动物感兴趣

通过使用routing keyquick.orange.rabbit”的消息,将会被发送到Q1Q2两个队列上,使用“lazy.orange.elephant”的消息也将被发送到两个队列上。但是,“quick.orange.fox”只会被发送到Q1队列上,“lazy.brown.fox”只会发送到Q2队列上。虽然“lazy.pink.rabbit”匹配了两个binding,但是它只会被发送到Q2上一次。"quick.brown.fox"没有匹配任何binding,它将被丢弃。

如果我们没有按照约定来发送消息(比如“orange”,“quick.orange.male.rabbit),那将会怎么样呢?这些消息由于没有匹配到任何的binding,所以他们将会被丢弃。虽然“lazy.orange.male.rabbit”有四个单词,单是匹配到了最后一个binding,所以会发送给Q2

Topic交换区功能非常强大,他可以实现其他交换区的功能。

当一个队列通过binding key #”进行绑定,它会接受所有的消息,忽略routing key。类似fanout交换区

当特殊字符“*”和“#”没有在binding中使用,topic交换区就和direct交换区类似。

2Putting it all together

public class EmitLogTopic {

 

    private static final String EXCHANGE_NAME = "topic_logs";

 

    public static void main(String[] argv)

                  throws Exception {

 

        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("localhost");

        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

 

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

 

        String routingKey = getRouting(argv);

        String message = getMessage(argv);

 

        channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());

        System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");

 

        connection.close();

    }

    //...

}

 

public class ReceiveLogsTopic {

 

    private static final String EXCHANGE_NAME = "topic_logs";

 

    public static void main(String[] argv)

                  throws Exception {

 

        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("localhost");

        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

 

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        String queueName = channel.queueDeclare().getQueue();

 

        if (argv.length < 1){

            System.err.println("Usage: ReceiveLogsTopic [binding_key]...");

            System.exit(1);

        }

 

        for(String bindingKey : argv){

            channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);

        }

 

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

 

        QueueingConsumer consumer = new QueueingConsumer(channel);

        channel.basicConsume(queueName, true, consumer);

 

        while (true) {

            QueueingConsumer.Delivery delivery = consumer.nextDelivery();

            String message = new String(delivery.getBody());

            String routingKey = delivery.getEnvelope().getRoutingKey();

 

            System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");

        }

    }

}

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics