kagamihogeの日記

kagamihogeの日記です。

RabbitMQチュートリアル2 Work Queues Javaをテキトーに訳した

RabbitMQはじめて触ったけどよく分からん、ということでチュートリアルを読むついでにテキトーに訳すことにした。

RabbitMQチュートリアル(https://www.rabbitmq.com/getstarted.html)の2 Work Queues Java(https://www.rabbitmq.com/tutorials/tutorial-two-java.html)をテキトーに訳した。

2 Work queues

python-two.png

一つ目のチュートリアルではキューにメッセージを送信して受信するプログラムを作りました。このパートではワークキュー(Work Queue)を作成します。ワークキューは複数のワーカに時間のかかるタスクを分配します。

ワークキュー(別名:タスクキュー(Task Queues))の基本的な考え方は、リソースを浪費するタスクを即時実行してから完了まで待機することを避ける点にあります。その代わりに、タスクを後で実行するようスケジュールします。タスク(Task)はメッセージにカプセル化してキューに送信します。バックグラウンドで実行中のワーカープロセスがタスクをポップしてジョブを実行します。複数のワーカーを実行する場合、タスクはワーカー間で共有されます。

このやり方は、短時間のHTTPリクエストで複雑なタスク処理をするのが難しいwebアプリケーションで特に有効です。

Preparation

チュートリアルの一つ目のパートでは"Hello World!"というメッセージを送信しました。このパートでは複雑なタスクを表現する文字列を送信します。実際のタスク、たとえばリサイズ対象のイメージやレンダするPDFファイルなどは用意せず、Thread.sleep()のビジーによりフェイクの処理とします。複雑さの表現は文字列の複数のドットで行います。各ドットは1秒の"work"にに相当します。たとえば、Hello...というフェイクのタスクは3秒かかります。

一つ目のサンプルのSend.javaを少々修正し、コマンドラインから任意のメッセージを送信出来るようにします。このプログラムはワークキューにタスクをスケジュールするので、NewTask.javaという名前にします。

String message = getMessage(argv);

channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

コマンドライン引数からメッセージを取得するヘルパ関数を作ります。

private static String getMessage(String[] strings){
    if (strings.length < 1)
        return "Hello World!";
    return joinStrings(strings, " ");
}

private static String joinStrings(String[] strings, String delimiter) {
    int length = strings.length;
    if (length == 0) return "";
    StringBuilder words = new StringBuilder(strings[0]);
    for (int i = 1; i < length; i++) {
        words.append(delimiter).append(strings[i]);
    }
    return words.toString();
}

Recv.javaもまたいくつか修正が必要です。メッセージボディのドットに従ってワークにフェイクの秒数を取る必要があります。配信メッセージを処理してタスクを実行するので、Worker.javaという名前にします。

final Consumer consumer = new DefaultConsumer(channel) {
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    String message = new String(body, "UTF-8");

    System.out.println(" [x] Received '" + message + "'");
    try {
      doWork(message);
    } finally {
      System.out.println(" [x] Done");
    }
  }
};
channel.basicConsume(TASK_QUEUE_NAME, true, consumer);

フェイクタスクの実行は以下のようにします。

private static void doWork(String task) throws InterruptedException {
    for (char ch: task.toCharArray()) {
        if (ch == '.') Thread.sleep(1000);
    }
}

一つ目のチュートリアル同様にコンパイルします。

$ javac -cp rabbitmq-client.jar NewTask.java Worker.java

Round-robin dispatching

タスクキューを使用する利点の一つは、ワークを簡単に並行処理できる点です。ワークの未処理が累積してきた場合、簡単にワーカーを追加できるため、スケールがやりやすいです。

まず、同時に二つのワーカーインスタンスで実行してみようと思います。二つのワーカーが一つのキューからメッセージを取得しますが、具体的なやり方を見て行きます。

コンソールを三つ開きます。二つはワーカープログラムを実行します。このコンソールはコンシューマーC1とC2になります。

shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
 [*] Waiting for messages. To exit press CTRL+C
shell2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
 [*] Waiting for messages. To exit press CTRL+C

三つ目のコンソールで新しいタスクをパブリッシュします。コンシューマを開始すればメッセージのパブリッシュが可能となります。

shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask First message.
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Second message..
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Third message...
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Fourth message....
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Fifth message.....

ワーカーに配信されると以下のようになります。

shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'First message.'
 [x] Received 'Third message...'
 [x] Received 'Fifth message.....'
shell2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Second message..'
 [x] Received 'Fourth message....'

デフォルトでは、RabbitMQは順繰りに次のコンシューマーへと個々のメッセージを送信していきます。平均すると、どのコンシューマも同じ数のメッセージを取得します。このようなメッセージ配信方法はラウンドロビン(round-robin)と言います。3つ以上のワーカーでどうなるか試してみて下さい。

Message acknowledgment

タスクの実行には数秒を要します。このとき、コンシューマーの一つが長時間タスクを開始して処理途中で死んだ場合どうなるか、と疑問に思うかもしれません。現状のコードだと、RabbitMQはカスタマーにメッセージを配信するとメモリからすぐに削除します。この場合、ワーカーを殺すとそのワーカーで処理中のメッセージはすべてロストします。また、このワーカーにディスパッチされたすべてのメッセージもロストし、未処理のままとなります。

しかし、普通はタスクのロストは望ましくありません。あるワーカーが死ぬ場合、別のワーカーにタスクが配信されるのが望ましいです。

メッセージを決してロストしないようにするのに、RabbitMQはメッセージ確認(acknowledgments)をサポートしています。ackとは、ある特定のメッセージを受信および処理したことをRabbitMQへ通知するために、コンシューマーから送り返されるものです。そのあとRabbitMQでメッセージの削除が可能になります。

ackを送信せずにコンシューマが死ぬ場合(チャンネルがクローズした・コネクションがクローズした・TCPコネクションがロストした)、RabbitMQはメッセージが完全に処理されなかったことを認識してキューに再度入れます。その段階で他にオンラインのコンシューマがまだ存在する場合、別のコンシューマへ直ちに再配信されます。この方法によりワーカーが時々死んだとしてもメッセージがロストしないことを保証出来ます。

現状のサンプルにはメッセージタイムアウトを設けておらず、コンシューマーが死ぬとRabbitMQはメッセージを再配信します。メッセージ処理に極めて長時間かかる場合でも問題無く動作します。

メッセージ確認はデフォルトで有効です。以前のサンプルではautoAck=trueフラグ*1によって明示的に無効化していました。このフラグを削除してワーカーから適切な確認を送信するようにします。

channel.basicQos(1);

final Consumer consumer = new DefaultConsumer(channel) {
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    String message = new String(body, "UTF-8");

    System.out.println(" [x] Received '" + message + "'");
    try {
      doWork(message);
    } finally {
      System.out.println(" [x] Done");
      channel.basicAck(envelope.getDeliveryTag(), false);
    }
  }
};

このコードによって、ワーカーがメッセージ処理中にCTRL+Cで死んだとしても、メッセージはロストしません。ワーカーが死ぬとすぐに未確認メッセージは再配信されます。

Forgotten acknowledgment

よくあるミスはbasicAckの書き忘れです。間違いやすい割りに影響は甚大です。クライアントが停止するとメッセージは再配信されますが、RabbitMQは未確認メッセージの解放が出来ないためにメモリを消費し続けます。

この種のミスをデバッグするにはrabbitmqctlmessages_unacknowledgedフィールドを表示します。

$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello    0       0
...done.

Message durability

コンシューマ死亡時の扱い方について解説し、これでタスクはロストしません。しかしtRabbitMQサーバ停止時には依然としてタスクはロストします。

RabbitMQが停止もしくはクラッシュすると、設定しない限り、キューとメッセージは消滅します。メッセージロストしないようにするためにやるべきことは2つあり、キューとメッセージをdurableに設定します。

まず、RabbitMQがキューを削除しないようにします。そのためには、キューをdurableと宣言します。

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);

上記のコマンドそのものは問題無いですが、現状の設定では動作しません。その理由はhelloという名前のキューをnot durableとして定義済みだからです。RabbitMQは既存のキューを異なるパラメータでの再定義を許可しておらず、これを試みるプログラムにはエラーを戻します。ここでは場当たり的な対処として、サンプル用に別の名前のtask_queueというキューを宣言します。

boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

queueDeclareの修正はプロデューサーとコンシューマの両方で必要です。

これでRabbitMQをリスタートしてもtask_queueキューが失われないようになりました。次に、メッセージをpersistentにするため、MessagePropertiesBasicPropertiesの実装)にPERSISTENT_TEXT_PLAINを設定します。

import com.rabbitmq.client.MessageProperties;

channel.basicPublish("", "task_queue",
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());

Note on message persistence

メッセージをpersistentと設定することは、メッセージロストしないことを完全には保証しません。この設定はメッセージをディスクに格納するようRabbitMQに指示しますが、RabbitMQのメッセージ受信時に短時間ですが保存されません*2。また、RabbitMQはすべてのメッセージに対しfsync(2)はしません。キャッシュには保存されるもののディスクには書き出されないかもしれません。persistenceは強力ではありませんが、単純なタスクキューには十分です。より強力な保証が必要な場合はpublisher confirmsを使用してください。

Fair dispatch

ディスパッチが我々の期待通りに動作しないと思うかもしれません。例えば、二つのワーカーがあるとして、奇数メッセージは重くて偶数は軽い場合、片方のワーカーは常にビジーでもう片方の負荷は軽いままです。RabbitMQは負荷については関知しないので均等にメッセージをディスパッチします。

RabbitMQはメッセージがキューに入ると直ちにディスパッチするために上記のような動作になります。RabbitMQはコンシューマの多数の未確認メッセージを参照しません。よって、n番目のメッセージはn番目のコンシューマーに漫然とディスパッチされます。

prefetch-count.png

これに対してはbasicQosメソッドにprefetchCount = 1を指定します。これにより、RabbitMQはあるワーカーに対し一度に一つ以上のメッセージを与えなくなります。言い換えると、一つ前のメッセージが処理されて確認されるまで、新しいメッセージはワーカーに配信されません。すなわち、ディスパッチするワーカーはビジーではない、ということです。

int prefetchCount = 1;
channel.basicQos(prefetchCount);

Note about queue size

すべてのワーカーがビジーだとキューに滞留します。キューを監視してワーカーを追加したり、別の方法を試したいと考えるようになるかもしれません。

Putting it all together

最終的なNewTask.javaクラスのコードになります。

import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

public class NewTask {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv)
                      throws java.io.IOException {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

    String message = getMessage(argv);

    channel.basicPublish( "", TASK_QUEUE_NAME,
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");

    channel.close();
    connection.close();
  }      
  //...
}

(NewTask.java source)

Worker.java

import com.rabbitmq.client.*;

import java.io.IOException;

public class Worker {
  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    final Connection connection = factory.newConnection();
    final Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    channel.basicQos(1);

    final Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");

        System.out.println(" [x] Received '" + message + "'");
        try {
          doWork(message);
        } finally {
          System.out.println(" [x] Done");
          channel.basicAck(envelope.getDeliveryTag(), false);
        }
      }
    };
    channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
  }

  private static void doWork(String task) {
    for (char ch : task.toCharArray()) {
      if (ch == '.') {
        try {
          Thread.sleep(1000);
        } catch (InterruptedException _ignored) {
          Thread.currentThread().interrupt();
        }
      }
    }
  }
}

(Worker.java source)

メッセージ確認とprefetchCountを使うにはワークキューの設定が必要です。durabilityオプションによってRabbitMQリスタート時にもタスクが保存されたままに出来ます。

ChannelメソッドとMessagePropertiesの詳細についてはjavadocsを参照してください。

次のtutorial 3では、複数のコンシューマに同一メッセージを配信する方法を解説します。

*1:com.rabbitmq.client.Channel#basicConsume の第二引数のこと。trueは一度メッセージ配信するとメッセージ確認したものと見なし、falseだと確認が帰ってくるのを待つモードになる https://www.rabbitmq.com/releases/rabbitmq-java-client/v3.6.3/rabbitmq-java-client-javadoc-3.6.3/com/rabbitmq/client/Channel.html#basicConsume(java.lang.String,%20boolean,%20com.rabbitmq.client.Consumer)

*2:there is still a short time window when RabbitMQ has accepted a message and hasn't saved it yet. が原文。ちょっと訳に自身が無い