`
zhchx0827
  • 浏览: 191276 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

RabbitMQ入门学习——Work queues 工作队列

阅读更多

 

http://www.rabbitmq.com/tutorials/tutorial-two-java.html

在第一个教程中,我们通过一个命名队列来发送消息和接受消息。在这一节,我们将创建一个工作队列,在多个工作者之间,分发比较耗时的任务

工作队列主要是为了避免资源密集型任务的立即执行,然后一直等待它执行结束。相反,我们可以安排好任务,然后在执行。我们可以将一个任务封装成一个消息,发送到队列中。由工作者在后台取出任务然后执行。当有多个工作者时,他们共同处理这些任务。

web应用中,当一次http请求需要处理复杂的任务时,工作队列将会变得非常有用

 

1Preparation

     在前面的教程中,我们发送一条消息“Hello World!”,现在我们会发送一条字符串,来模拟一个复杂的任务。我们没有像图片的大小调整或者pdf文件的渲染这类很耗时的任务,所以我们通过Thread.sleep()来模拟耗时任务。我们通过字符串中的点来模拟复杂度,每个点将代表一个耗时1秒的工作。例如:任务“Hello. . .”将耗时3秒。

      我们会略微的修改前面的Send.java代码,使其可以接受控制端输入的随意的字符串。程序会将任务安排到我们的工作队列中,所以我们命名为NewTask.java

String message = getMessage(argv);

 

channel.basicPublish("", "hello", null, message.getBytes());

System.out.println(" [x] Sent '" + message + "'");

从客户端接受消息

private static String getMessage(String[] strings){

    if (strings.length < 1)

        return "Hello World!";

    return joinStrings(strings, " ");

}

 

private static String joinStrings(String[] strings, String delimiter) {

    int length = strings.length;

    if (length == 0) return "";

    StringBuilder words = new StringBuilder(strings[0]);

    for (int i = 1; i < length; i++) {

        words.append(delimiter).append(strings[i]);

    }

    return words.toString();

}

     我们的Recv.java代码也需要一些变化:它需要假设字符串中得每个点,都需要耗时1秒。它会从消息队列中弹出一条消息,然后执行,所以我们叫他Work.java

while (true) {

    QueueingConsumer.Delivery delivery = consumer.nextDelivery();

    String message = new String(delivery.getBody());

 

    System.out.println(" [x] Received '" + message + "'");        

    doWork(message);

    System.out.println(" [x] Done");

}

我们模拟执行的任务

private static void doWork(String task) throws InterruptedException {

    for (char ch: task.toCharArray()) {

        if (ch == '.') Thread.sleep(1000);

    }

}

2Round-robin dispatching(循环调度)

    使用工作队列的一个优点就是可以并行的执行。如果我们需要处理积压的工作,可以通过这种方式添加更多的工作者,并且很容易扩展

首先,同时先运行两个Worker.java代码,它们都将从消息队列获取消息。我们需要开启3个控制端窗口。两个运行Worker.java代码,它们作为消费者

shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar

Worker

 [*] Waiting for messages. To exit press CTRL+C

 

shell2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar

Worker

 [*] Waiting for messages. To exit press CTRL+C

第三个窗口,我们会发布任务。一旦启动了consumer消费者之后,你就可以发布一些消息:

shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar

NewTask First message.

shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar

NewTask Second message..

shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar

NewTask Third message...

shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar

NewTask Fourth message....

shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar

NewTask Fifth message.....

让我们看看我们的工作者接受到了那些信息:

shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar

Worker

 [*] Waiting for messages. To exit press CTRL+C

 [x] Received 'First message.'

 [x] Received 'Third message...'

 [x] Received 'Fifth message.....'

 

java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar

Worker

 [*] Waiting for messages. To exit press CTRL+C

 [x] Received 'Second message..'

 [x] Received 'Fourth message....'

默认情况下,RabbitMQ会以此发送消息给下一个消费者。平均下来,每个消费者都会获得相同数量的消息。这种分发消息的方式就叫做Round-robin dispatching(循环调度)

4Message acknowledgment(消息确认)

     一个任务可能花费几秒种的之间,当一个消费者开始执行一个耗时的任务,并且只执行了任务的一部分就死亡时,你或许希望知道究竟发生了什么。通过我们目前的代码,一旦RabbitMQ向向消费者传递消息,它会立即从内存中将其删除。在这种情况下,如果你杀死了一个worker,我们将会失去它正在处理的消息。我们同样会失去所有交给它,但还没来得及处理的消息。但是我们不希望丢失任何的tasks。如果一个worker死亡,我们希望该task被交给另一个worker来处理。

     为了确保不丢失任何的messageRabbitMQ支持消息的确认。消费者会向RabbitMQ发送ack,来告诉RabbitMQ它已经接收,处理消息,RabbitMQ可以将其删除。

如果消费者没有发送ack就已经死亡,RabbitMQ会将其理解为消息没有被正确的处理,会将其从新发送给另一个消费者。通过这种方式,可以确保即便是worker偶然的死亡,也不会丢失任何消息。

不存在消息的超时。只有在worker的连接被关闭时,RabbitMQ才会将消息从新发送给下一个worker。即便是处理一个任务需要花费很长很长的时间,他也会正常的执行,而不会从新发送消息。

消息确认默认是开启的。在前面的例子中,我们通过autoAck=true,将其关闭。当我们执行一个task时,需要开启该功能,使worker可以发送一个正确的消息确认。

QueueingConsumer consumer = new QueueingConsumer(channel);

boolean autoAck = false;

channel.basicConsume("hello", autoAck, consumer);

 

while (true) {

  QueueingConsumer.Delivery delivery = consumer.nextDelivery();

  //...      

  channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

}

使用这样的代码,可以保证当worker正在处理一条消息时,强行将其关闭,也不会丢失任何的消息。在worker死亡后,所有未被确认的消息都将被从新发送。


5Message durability(消息持久化)

我们已经学习了如何确保消费者死亡,任务也不会丢失。但是一旦我们的RabbitMQ服务停止,任务还是同样会丢失。

如果你不告诉RabbitMQ,当它退出或发生灾难关闭时,它会丢失队列里的消息。我们需要做两件事情来确保消息不会丢失:我们需要同时保证队列和消息时持久的。

    首先:我们需要保证RabbitMQ不会丢失我们的队列。为了实现这个目的,我们需要将队列申明为永久的:

boolean durable = true;

channel.queueDeclare("hello", durable, false, false, null);

    即便命令执行正常,在我们现有的程序中,它也不会起作用。那是因为我们已经存在一个非永久的队列“hello”。RabbitMQ不允许我们使用不同的参数来定义一个已经存在的队列,它会向我们返回一些错误信息。但是,通过使用不同的名字(egtask_queue),来定义一个新的队列,可以快速的解决这个问题。

boolean durable = true;

channel.queueDeclare("task_queue", durable, false, false, null);

queueDeclare方法申明队列的变动,需要保证同时被应用在生产者和消费者两方

通过这种方式,我们可以确保task_queue队列在RabbitMQ服务重启后也不会丢失。现在我们需要把我们的消息也标记为永久的。通过设置MessageProperties(实现了BasicProperties)的值为PERSISTENT_TEXT_PLAIN可以实现这一目的。

channel.basicPublish("","task_queue",

        MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

消息持久化注意事项

即便我们将消息标记为永久的,但这也不能完全保证消息就一定不会丢失。虽然RabbitMQ会将消息保存到硬盘上,但是在RabbitMQ接收到消息并且将其保存到硬盘上之间,仍然有一个短暂的时间片段。

 

6Fair dispatch (公平调度)

     可能你已经注意到了,分发工作并没有按照我们预期的方式来执行。例如:当有两个工作者时,所有的奇数任务都比较繁重,偶数任务都比较简单。这样就导致了一个工作者会一直处于忙碌状态,而另一个工作者几乎没有事情可做。RabbitMQ对于这些情况并不了解,它还会均匀的分配任务。

发生这样的原因是因为一旦有消息进入队列,RabbitMQ就会将消息进行分发。他不会去观察消费者发回的确认信息。它会盲目的将n条消息分发给n个消费者。为了杜绝这种情况,我们可以通过basicQos方法设置prefetchCount = 1。它会限制RabbitMQ在同一时间向一个工作者发送一条以上的消息。换句话说,它会等待一个工作者处理完或者接收到上一条消息的确认信息,才向它发送新的消息。否者,它会将其分发给下一个比较空闲的工作者。

int prefetchCount = 1;

channel.basicQos(prefetchCount);

注意:如果所有的工作者都处于忙碌的状态,你的队列可能会被填满,你需要时刻注意。或许你可以添加更多的工作者或者采用其他的策略。

 

7Putting it all together(完整代码)

NewTask.java

import java.io.IOException;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.MessageProperties;

 

public class NewTask {

 

  private static final String TASK_QUEUE_NAME = "task_queue";

 

  public static void main(String[] argv) 

                      throws java.io.IOException {

 

    ConnectionFactory factory = new ConnectionFactory();

    factory.setHost("localhost");

    Connection connection = factory.newConnection();

    Channel channel = connection.createChannel();

 

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

 

    String message = getMessage(argv);

 

    channel.basicPublish( "", TASK_QUEUE_NAME, 

            MessageProperties.PERSISTENT_TEXT_PLAIN,

            message.getBytes());

    System.out.println(" [x] Sent '" + message + "'");

 

    channel.close();

    connection.close();

  }      

  //...

}

Work.java

import java.io.IOException;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.QueueingConsumer;

 

public class Worker {

 

  private static final String TASK_QUEUE_NAME = "task_queue";

 

  public static void main(String[] argv)

                      throws java.io.IOException,

                      java.lang.InterruptedException {

 

    ConnectionFactory factory = new ConnectionFactory();

    factory.setHost("localhost");

    Connection connection = factory.newConnection();

    Channel channel = connection.createChannel();

 

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

 

    channel.basicQos(1);

 

    QueueingConsumer consumer = new QueueingConsumer(channel);

    channel.basicConsume(TASK_QUEUE_NAME, false, consumer);

 

    while (true) {

      QueueingConsumer.Delivery delivery = consumer.nextDelivery();

      String message = new String(delivery.getBody());

 

      System.out.println(" [x] Received '" + message + "'");   

      doWork(message); 

      System.out.println(" [x] Done" );

 

      channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

    }

  }

  //...

}

 

  • 大小: 4.7 KB
分享到:
评论
2 楼 小灯笼 2018-01-15  
分布式消息队列高效部署及插件集群开发信息数据监控、分析实战(RabbitMQ、分布式、ZooKeeper、集群、监控、rabbitmq)
网盘地址1:https://pan.baidu.com/s/1smhdUH3 密码: w6vq
网盘地址2:https://pan.baidu.com/s/1kVUyLef 密码: 5pmi
1 楼 快乐的小六 2017-12-14  
分布式消息队列高效部署及插件集群开发信息数据监控、分析实战(RabbitMQ、分布式、ZooKeeper、集群、监控、rabbitmq)
网盘地址:https://pan.baidu.com/s/1qY7g9sg 密码: apuk

相关推荐

Global site tag (gtag.js) - Google Analytics