kagamihogeの日記

kagamihogeの日記です。

RabbitMQチュートリアル4 Routingをテキトーに訳した

RabbitMQはじめて触ったけどよく分からん、ということでチュートリアルを読むついでにテキトーに訳すことにした。

RabbitMQチュートリアル(https://www.rabbitmq.com/getstarted.html)の4 Routing Java(https://www.rabbitmq.com/tutorials/tutorial-four-java.html)をテキトーに訳した。

Routing

(using the Java client)

先のチュートリアルではシンプルなロギングシステムを作りました。これで複数のレシーバにログメッセージをブロードキャスト可能になりました。

このチュートリアルでは更に機能追加を行い、メッセージのサブセットのみサブスクライブ可能にします。たとえば、クリティカルなエラーメッセージのみログファイルに向ける一方で、コンソールにはログメッセージをすべて出力できるようにします。

Bindings

先の例でバインディングの作成方法については解説しました。バインディングのコードは以下のようになります。

channel.queueBind(queueName, EXCHANGE_NAME, "");

バインディングはエクスチェンジとキューとの関連付けです。つまり、キューはエクスチェンジからのメッセージのみ受け付けます。

バインディングはルーティングキー(routingKey)パラメータも取ることが出来ます。basic_publishのパラメータとの混乱を避けるため、バインディングのパラメータはバインディングキー(binding key)と呼ぶことにします*1。キーと一緒にバインディングを作るには以下のようにします。

channel.queueBind(queueName, EXCHANGE_NAME, "black");

バインディングキーの意味付けはエクスチェンジタイプに依存します。fanoutエクスチェンジでは前に使用したとおり無視されます。

Direct exchange

先のチュートリアルのロギングシステムではすべてのコンシューマーにすべてのメッセージをブロードキャストしていました。今回は重要度(severity)に応じてメッセージのフィルタリングが可能なように拡張します。たとえば、警告や情報のログメッセージでディスク容量を浪費しないよう、クリティカルなエラーを受信するときにだけディスクにログメッセージを書き出すようにします。

fanoutエクスチェンジにはそのような柔軟性は無く、単なるブロードキャストの機能しかありません。

その代わりにdirectエクスチェンジを使います。directエクスチェンジのルーティングアルゴリズムはシンプルで、メッセージのルーティングキーにマッチするバインディングキーを持つキューにメッセージを送信します。

これの解説のために以下のような設定を行います。

direct-exchange.png

この設定では、directエクスチェンジXに二つのキューをバインドします。前者のキューはバインディングキーorangeバインドし、後者は二つのバインディングを持ち、一つはバインディングキーblackでもう片方はgreenです。

この設定では、エクスチェンジにパブリッシュされるメッセージのうち、ルーティングキーorangeを持つメッセージはキューQ1にルーティングされます。ルーティングキーがblackないしgreenのメッセージはQ2に行きます。それ以外のすべてのメッセージは破棄されます。

Multiple bindings

direct-exchange-multiple.png

同一のバインディングキーで複数のキューをバインドするのは全く問題ありません。上記のようにXQ1バインディングキーblackバインディングを作成できます。この場合、directエクスチェンジはfanoutのように振る舞い、キーにマッチするすべてのキューにメッセージをブロードキャストします。ルーティングキーblackを持つメッセージはQ1Q2に配信されます。

Emitting logs

チュートリアルのロギングシステムでは上記のモデルを使います。fanoutではなくdirectエクスチェンジにメッセージを送信します。ログの重要度はルーティングキーで指定します。これにより、受信側のプログラムは受信したい重要度のログを選択可能になります。まずログ送信側を見て行きます。

これまで通り、まずエクスチェンジを作成します。

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

次に、メッセージ送信の開始は以下のようにします。

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

説明の簡略化のため、'severity'には'info', 'warning', 'error'のいずれか一つを指定するものとします。

Subscribing

メッセージ受信は以前のチュートリアルとほぼ同じですが、一つ違いがあり、受信したい重要度ごとにバインディングを作成します。

String queueName = channel.queueDeclare().getQueue();

for(String severity : argv){    
  channel.queueBind(queueName, EXCHANGE_NAME, severity);
}

Putting it all together

python-four.png

EmitLogDirect.javaクラスのコードです。

public class EmitLogDirect {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv)
                  throws java.io.IOException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        String severity = getSeverity(argv);
        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
        System.out.println(" [x] Sent '" + severity + "':'" + message + "'");

        channel.close();
        connection.close();
    }
    //..
}

ReceiveLogsDirect.javaのコードです。

import com.rabbitmq.client.*;

import java.io.IOException;

public class ReceiveLogsDirect {

  private static final String EXCHANGE_NAME = "direct_logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    String queueName = channel.queueDeclare().getQueue();

    if (argv.length < 1){
      System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
      System.exit(1);
    }

    for(String severity : argv){
      channel.queueBind(queueName, EXCHANGE_NAME, severity);
    }
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope,
                                 AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
      }
    };
    channel.basicConsume(queueName, true, consumer);
  }
}

これまで通りコンパイルを行います(コンパイルとクラスパスについてはtutorial one を参照)。説明簡略化のため、サンプル実行時にはクラスパスに環境変数$CP(windowsでは%CP%)を使います。

ファイルへのログメッセージは'warning'と'error'('info'は除く)のみにしたい場合、コンソールで以下のように実行します。

$ java -cp $CP ReceiveLogsDirect warning error > logs_from_rabbit.log

画面上ですべてのログメッセージを参照したい場合、以下のように実行します。

$ java -cp $CP ReceiveLogsDirect info warning error
 [*] Waiting for logs. To exit press CTRL+C

次に、例えば、errorログメッセージの送信は以下のように行います。

$ java -cp $CP EmitLogDirect error "Run. Run. Or it will explode."
 [x] Sent 'error':'Run. Run. Or it will explode.'

(すべてのソースコード(EmitLogDirect.java source)(ReceiveLogsDirect.java source)です)

パターンに応じてメッセージをリッスンする方法についてはtutorial 5に進んで下さい。

*1:Channel#queueBindの引数はroutingKeyで、Channel#basicPublishの引数もroutingKey。両者は同じものを指定するのだけど、説明の際にはメッセージ送信時のキーかバインディング作成時のキーのことなのかは区別する意図があるみたい。なので、このチュートリアルでは前者はルーティングキー(routingKey)、後者はバインディングキー(binding key)という呼称にした、と思われる。

RabbitMQチュートリアル3 Publish/Subscribeをテキトーに訳した

RabbitMQはじめて触ったけどよく分からん、ということでチュートリアルを読むついでにテキトーに訳すことにした。

RabbitMQチュートリアル(https://www.rabbitmq.com/getstarted.html)の3 Publish/Subscribe Java(https://www.rabbitmq.com/tutorials/tutorial-three-java.html)をテキトーに訳した。

Publish/Subscribe

(using the Java Client)

二番目のチュートリアルではワークキューを作成しました。ワークキューの暗黙の仮定は個々のタスクは一つのワーカーにだけ配信される、というものです。このパートではそれとは全く異なり、複数のコンシューマーにメッセージを配信します。このパターンは"publish/subscribe"と呼ばれています。

このパターンを説明するために、シンプルなロギングシステムを構築していきます。このシステムは二つのプログラムから構成され、前者はログメッセージを送信し、後者は受信と表示を行います。

このロギングシステムではすべての受信プログラムがメッセージを受け取ります。この方法では、ある受信プログラムはディスクにログを出力し、同時に別の受信プログラムは画面にログを表示します。

基本的に、パブリッシュされたログメッセージはすべての受信プログラムにブロードキャストされます。

Exchanges

前のチュートリアルでは、キューに対してメッセージの送受信をしていました。このパートではRabbitの完全なメッセージングモデル(full messaging model)を使います。

これまでのチュートリアルで解説したことを簡単におさらいします。

  • プロデューサー(producer)はメッセージ送信を行うユーザーアプリケーション。
  • キュー(queue)はメッセージを格納するバッファ。
  • コンシューマ(consumer)はメッセージ受信を行うユーザーアプリケーション。

RabbitMQのメッセージングモデルの基本的な考え方では、プロデューサーはキューへ直接メッセージ送信は行いません。実際、プロデューサーはメッセージがどのキューに配信されるかさえ関知しません。

その代わりに、プロデューサーはエクスチェンジ(exchange)にだけメッセージ送信を行います。エクスチェンジは非常にシンプルです。プロデューサーからメッセージを受信する一方で、他方ではキューにそのメッセージをプッシュします。エクスチェンジは受信メッセージに対して何らかの処理を行うことが必須となります。特定のキューにエクスチェンジを追加するのでしょうか? 多数のキューに追加したり、廃棄するのでしょうか? そのためのルールはエクスチェンジタイプ(exchange type)で定義されています。

exchanges.png

利用可能なエクスチェンジタイプは、direct, topic, headers, fanoutです。このパートでは一番最後のfanoutを使います。このタイプのエクスチェンジを作成し、logsと名付けます。

channel.exchangeDeclare("logs", "fanout");

fanoutは非常にシンプルです。名前からおおむね推測できるとおり*1、既知のすべてのキューにすべての受信メッセージをブロードキャストします。よって、今作ろうとしているロガーにはこれで十分です。

Listing exchanges

サーバ上のエクスチェンジを一覧するにはrabbitmqctlを使います。

$ sudo rabbitmqctl list_exchanges
Listing exchanges ...
        direct
amq.direct      direct
amq.fanout      fanout
amq.headers     headers
amq.match       headers
amq.rabbitmq.log        topic
amq.rabbitmq.trace      topic
amq.topic       topic
logs    fanout
...done.

上記の一覧にはいくつかのamq.*エクスチェンジとデフォルトの(無名な)エクスチェンジがあります。これらはデフォルトで生成されますが、現時点では使う必要はありません。

Nameless exchange

チュートリアルのこれ以前のパートではまだエクスチェンジについては触れていませんでしたが、キューにメッセージ送信することは可能でした。これが可能なのはデフォルトエクスチェンジを使用しているためで、そのエクスチェンジは空文字列("")で識別します。

前のパートではメッセージパブリッシュは以下のようにやっていました。

channel.basicPublish("", "hello", null, message.getBytes());

最初のパラメータがエクスチェンジの名前です。空文字列はデフォルトもしくは無名(nameless)エクスチェンジを指します。routingKeyが存在する場合は、メッセージはroutingKeyが指すキューにルーティングされます。

名前付きのエクスチェンジにパブリッシュするには以下のようにします。

channel.basicPublish( "logs", "", null, message.getBytes());

Temporary queues

これまでのチュートリアルでは特定の名前を持つキューを使用していました(hellotask_queue)。同一のキューをワーカーが指すようにするために、キューに名前を付けられることは重要です。プロデューサーとコンシューマでキューを共有したい場合、キューの名前は重要です。

しかし今回のロガーではその場合に当たりません。今回はすべてのログメッセージを受信したいのであり、そのサブセットではありません。過去のメッセージではなく現在進行で流れてくるメッセージにのみ関心があります。これを解決するには二つの機能が必要です。

まず、Rabbitに接続する際には常に新しい空のキューが必要です。このためにはランダム名でキューを生成しますが、それよりはサーバにランダムのキュー名を作らせる方が手軽です。

次に、コンシューマーを切断したらそのキューは自動的に削除する必要があります。

Javaクライアントでは、パラメータ無しのqueueDeclare()を使います。これにより、非durable・排他的(exclusive)・名前を自動生成してキューを自動削除、になります。

String queueName = channel.queueDeclare().getQueue();

この場合、queueNameはランダムキュー名になります。たとえばamq.gen-JzTY20BRgKO-HjmUJj0wLgのようになります。

Bindings

bindings.png

これまでの説明でfanoutエクスチェンジとキューを作りました。残りはエクスチェンジにキューへメッセージ送信させる設定です。エクスチェンジとキューの関連付けはバインディング(binding)と言います。

channel.queueBind(queueName, "logs", "");

これでlogsエクスチェンジはキューにメッセージを追加するようになります。

Listing bindings

既存のバインディングをリストするにはrabbitmqctl list_bindingsを使います。

Putting it all together

python-three-overall.png

プロデューサーのプログラムはログメッセージを送信するもので、これまでのチュートリアルとほとんど同じです。重要な変更点は、無名エクスチェンジではなくlogsエクスチェンジにメッセージをパブリッシュする点です。送信時にはroutingKeyの指定が必要ですが、fanoutエクスチェンジの場合は無視されます。以下がEmitLog.javaのプログラムになります。

import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class EmitLog {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv)
                  throws java.io.IOException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
    //...
}

(EmitLog.java source)

これまで見てきたように、コネクションを確立したあとでエクスチェンジを宣言しています。存在しないエクスチェンジへのパブリッシュは禁止されているのでこの手順が必要となります。

エクスチェンジにキューが一つもバインドされていない場合メッセージはロストしますが、このサンプルの場合にはそれで問題ありません。リッスン中のコンシューマが無いということはそのメッセージを破棄しても支障は無いからです。

ReceiveLogs.javaは以下です。

import com.rabbitmq.client.*;

import java.io.IOException;

public class ReceiveLogs {
  private static final String EXCHANGE_NAME = "logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, EXCHANGE_NAME, "");

    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope,
                                 AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
      }
    };
    channel.basicConsume(queueName, true, consumer);
  }
}

(ReceiveLogs.java source)

実行するにはまずコンパイルします。

$ javac -cp .:rabbitmq-client.jar EmitLog.java ReceiveLogs.java

ファイルにログ出力する場合はコンソールで以下のようにします。

$ java -cp .:rabbitmq-client.jar ReceiveLogs > logs_from_rabbit.log

画面のログ出力する場合は別のターミナルで以下のように実行します。

$ java -cp .:rabbitmq-client.jar ReceiveLogs

ログ送信は以下のようにします。

$ java -cp .:rabbitmq-client.jar EmitLog

rabbitmqctl list_bindingsで、プログラムが実際にキューの生成とバインディングをしているかを確認できます。ReceiveLogs.javaを二つ実行すると以下のような表示になります。

$ sudo rabbitmqctl list_bindings
Listing bindings ...
logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
...done.

実行結果の見方は簡単です。logsエクスチェンジのデータが、サーバが自動生成した名前の二つのキューに流れています。これは意図した通りの動作です。

メッセージのサブセットをリッスンする方法を学ぶには、次のtutorial 4を参照してください。

*1:fan outには扇形に広げるとか展開させるとかそういう意味がある http://eow.alc.co.jp/search?q=fan-out&ref=wl

RabbitMQチュートリアル2 Work Queues Javaをテキトーに訳した

RabbitMQはじめて触ったけどよく分からん、ということでチュートリアルを読むついでにテキトーに訳すことにした。

RabbitMQチュートリアル(https://www.rabbitmq.com/getstarted.html)の2 Work Queues Java(https://www.rabbitmq.com/tutorials/tutorial-two-java.html)をテキトーに訳した。

2 Work queues

python-two.png

一つ目のチュートリアルではキューにメッセージを送信して受信するプログラムを作りました。このパートではワークキュー(Work Queue)を作成します。ワークキューは複数のワーカに時間のかかるタスクを分配します。

ワークキュー(別名:タスクキュー(Task Queues))の基本的な考え方は、リソースを浪費するタスクを即時実行してから完了まで待機することを避ける点にあります。その代わりに、タスクを後で実行するようスケジュールします。タスク(Task)はメッセージにカプセル化してキューに送信します。バックグラウンドで実行中のワーカープロセスがタスクをポップしてジョブを実行します。複数のワーカーを実行する場合、タスクはワーカー間で共有されます。

このやり方は、短時間のHTTPリクエストで複雑なタスク処理をするのが難しいwebアプリケーションで特に有効です。

Preparation

チュートリアルの一つ目のパートでは"Hello World!"というメッセージを送信しました。このパートでは複雑なタスクを表現する文字列を送信します。実際のタスク、たとえばリサイズ対象のイメージやレンダするPDFファイルなどは用意せず、Thread.sleep()のビジーによりフェイクの処理とします。複雑さの表現は文字列の複数のドットで行います。各ドットは1秒の"work"にに相当します。たとえば、Hello...というフェイクのタスクは3秒かかります。

一つ目のサンプルのSend.javaを少々修正し、コマンドラインから任意のメッセージを送信出来るようにします。このプログラムはワークキューにタスクをスケジュールするので、NewTask.javaという名前にします。

String message = getMessage(argv);

channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

コマンドライン引数からメッセージを取得するヘルパ関数を作ります。

private static String getMessage(String[] strings){
    if (strings.length < 1)
        return "Hello World!";
    return joinStrings(strings, " ");
}

private static String joinStrings(String[] strings, String delimiter) {
    int length = strings.length;
    if (length == 0) return "";
    StringBuilder words = new StringBuilder(strings[0]);
    for (int i = 1; i < length; i++) {
        words.append(delimiter).append(strings[i]);
    }
    return words.toString();
}

Recv.javaもまたいくつか修正が必要です。メッセージボディのドットに従ってワークにフェイクの秒数を取る必要があります。配信メッセージを処理してタスクを実行するので、Worker.javaという名前にします。

final Consumer consumer = new DefaultConsumer(channel) {
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    String message = new String(body, "UTF-8");

    System.out.println(" [x] Received '" + message + "'");
    try {
      doWork(message);
    } finally {
      System.out.println(" [x] Done");
    }
  }
};
channel.basicConsume(TASK_QUEUE_NAME, true, consumer);

フェイクタスクの実行は以下のようにします。

private static void doWork(String task) throws InterruptedException {
    for (char ch: task.toCharArray()) {
        if (ch == '.') Thread.sleep(1000);
    }
}

一つ目のチュートリアル同様にコンパイルします。

$ javac -cp rabbitmq-client.jar NewTask.java Worker.java

Round-robin dispatching

タスクキューを使用する利点の一つは、ワークを簡単に並行処理できる点です。ワークの未処理が累積してきた場合、簡単にワーカーを追加できるため、スケールがやりやすいです。

まず、同時に二つのワーカーインスタンスで実行してみようと思います。二つのワーカーが一つのキューからメッセージを取得しますが、具体的なやり方を見て行きます。

コンソールを三つ開きます。二つはワーカープログラムを実行します。このコンソールはコンシューマーC1とC2になります。

shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
 [*] Waiting for messages. To exit press CTRL+C
shell2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
 [*] Waiting for messages. To exit press CTRL+C

三つ目のコンソールで新しいタスクをパブリッシュします。コンシューマを開始すればメッセージのパブリッシュが可能となります。

shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask First message.
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Second message..
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Third message...
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Fourth message....
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Fifth message.....

ワーカーに配信されると以下のようになります。

shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'First message.'
 [x] Received 'Third message...'
 [x] Received 'Fifth message.....'
shell2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Second message..'
 [x] Received 'Fourth message....'

デフォルトでは、RabbitMQは順繰りに次のコンシューマーへと個々のメッセージを送信していきます。平均すると、どのコンシューマも同じ数のメッセージを取得します。このようなメッセージ配信方法はラウンドロビン(round-robin)と言います。3つ以上のワーカーでどうなるか試してみて下さい。

Message acknowledgment

タスクの実行には数秒を要します。このとき、コンシューマーの一つが長時間タスクを開始して処理途中で死んだ場合どうなるか、と疑問に思うかもしれません。現状のコードだと、RabbitMQはカスタマーにメッセージを配信するとメモリからすぐに削除します。この場合、ワーカーを殺すとそのワーカーで処理中のメッセージはすべてロストします。また、このワーカーにディスパッチされたすべてのメッセージもロストし、未処理のままとなります。

しかし、普通はタスクのロストは望ましくありません。あるワーカーが死ぬ場合、別のワーカーにタスクが配信されるのが望ましいです。

メッセージを決してロストしないようにするのに、RabbitMQはメッセージ確認(acknowledgments)をサポートしています。ackとは、ある特定のメッセージを受信および処理したことをRabbitMQへ通知するために、コンシューマーから送り返されるものです。そのあとRabbitMQでメッセージの削除が可能になります。

ackを送信せずにコンシューマが死ぬ場合(チャンネルがクローズした・コネクションがクローズした・TCPコネクションがロストした)、RabbitMQはメッセージが完全に処理されなかったことを認識してキューに再度入れます。その段階で他にオンラインのコンシューマがまだ存在する場合、別のコンシューマへ直ちに再配信されます。この方法によりワーカーが時々死んだとしてもメッセージがロストしないことを保証出来ます。

現状のサンプルにはメッセージタイムアウトを設けておらず、コンシューマーが死ぬとRabbitMQはメッセージを再配信します。メッセージ処理に極めて長時間かかる場合でも問題無く動作します。

メッセージ確認はデフォルトで有効です。以前のサンプルではautoAck=trueフラグ*1によって明示的に無効化していました。このフラグを削除してワーカーから適切な確認を送信するようにします。

channel.basicQos(1);

final Consumer consumer = new DefaultConsumer(channel) {
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    String message = new String(body, "UTF-8");

    System.out.println(" [x] Received '" + message + "'");
    try {
      doWork(message);
    } finally {
      System.out.println(" [x] Done");
      channel.basicAck(envelope.getDeliveryTag(), false);
    }
  }
};

このコードによって、ワーカーがメッセージ処理中にCTRL+Cで死んだとしても、メッセージはロストしません。ワーカーが死ぬとすぐに未確認メッセージは再配信されます。

Forgotten acknowledgment

よくあるミスはbasicAckの書き忘れです。間違いやすい割りに影響は甚大です。クライアントが停止するとメッセージは再配信されますが、RabbitMQは未確認メッセージの解放が出来ないためにメモリを消費し続けます。

この種のミスをデバッグするにはrabbitmqctlmessages_unacknowledgedフィールドを表示します。

$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello    0       0
...done.

Message durability

コンシューマ死亡時の扱い方について解説し、これでタスクはロストしません。しかしtRabbitMQサーバ停止時には依然としてタスクはロストします。

RabbitMQが停止もしくはクラッシュすると、設定しない限り、キューとメッセージは消滅します。メッセージロストしないようにするためにやるべきことは2つあり、キューとメッセージをdurableに設定します。

まず、RabbitMQがキューを削除しないようにします。そのためには、キューをdurableと宣言します。

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);

上記のコマンドそのものは問題無いですが、現状の設定では動作しません。その理由はhelloという名前のキューをnot durableとして定義済みだからです。RabbitMQは既存のキューを異なるパラメータでの再定義を許可しておらず、これを試みるプログラムにはエラーを戻します。ここでは場当たり的な対処として、サンプル用に別の名前のtask_queueというキューを宣言します。

boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

queueDeclareの修正はプロデューサーとコンシューマの両方で必要です。

これでRabbitMQをリスタートしてもtask_queueキューが失われないようになりました。次に、メッセージをpersistentにするため、MessagePropertiesBasicPropertiesの実装)にPERSISTENT_TEXT_PLAINを設定します。

import com.rabbitmq.client.MessageProperties;

channel.basicPublish("", "task_queue",
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());

Note on message persistence

メッセージをpersistentと設定することは、メッセージロストしないことを完全には保証しません。この設定はメッセージをディスクに格納するようRabbitMQに指示しますが、RabbitMQのメッセージ受信時に短時間ですが保存されません*2。また、RabbitMQはすべてのメッセージに対しfsync(2)はしません。キャッシュには保存されるもののディスクには書き出されないかもしれません。persistenceは強力ではありませんが、単純なタスクキューには十分です。より強力な保証が必要な場合はpublisher confirmsを使用してください。

Fair dispatch

ディスパッチが我々の期待通りに動作しないと思うかもしれません。例えば、二つのワーカーがあるとして、奇数メッセージは重くて偶数は軽い場合、片方のワーカーは常にビジーでもう片方の負荷は軽いままです。RabbitMQは負荷については関知しないので均等にメッセージをディスパッチします。

RabbitMQはメッセージがキューに入ると直ちにディスパッチするために上記のような動作になります。RabbitMQはコンシューマの多数の未確認メッセージを参照しません。よって、n番目のメッセージはn番目のコンシューマーに漫然とディスパッチされます。

prefetch-count.png

これに対してはbasicQosメソッドにprefetchCount = 1を指定します。これにより、RabbitMQはあるワーカーに対し一度に一つ以上のメッセージを与えなくなります。言い換えると、一つ前のメッセージが処理されて確認されるまで、新しいメッセージはワーカーに配信されません。すなわち、ディスパッチするワーカーはビジーではない、ということです。

int prefetchCount = 1;
channel.basicQos(prefetchCount);

Note about queue size

すべてのワーカーがビジーだとキューに滞留します。キューを監視してワーカーを追加したり、別の方法を試したいと考えるようになるかもしれません。

Putting it all together

最終的なNewTask.javaクラスのコードになります。

import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

public class NewTask {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv)
                      throws java.io.IOException {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

    String message = getMessage(argv);

    channel.basicPublish( "", TASK_QUEUE_NAME,
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");

    channel.close();
    connection.close();
  }      
  //...
}

(NewTask.java source)

Worker.java

import com.rabbitmq.client.*;

import java.io.IOException;

public class Worker {
  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    final Connection connection = factory.newConnection();
    final Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    channel.basicQos(1);

    final Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");

        System.out.println(" [x] Received '" + message + "'");
        try {
          doWork(message);
        } finally {
          System.out.println(" [x] Done");
          channel.basicAck(envelope.getDeliveryTag(), false);
        }
      }
    };
    channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
  }

  private static void doWork(String task) {
    for (char ch : task.toCharArray()) {
      if (ch == '.') {
        try {
          Thread.sleep(1000);
        } catch (InterruptedException _ignored) {
          Thread.currentThread().interrupt();
        }
      }
    }
  }
}

(Worker.java source)

メッセージ確認とprefetchCountを使うにはワークキューの設定が必要です。durabilityオプションによってRabbitMQリスタート時にもタスクが保存されたままに出来ます。

ChannelメソッドとMessagePropertiesの詳細についてはjavadocsを参照してください。

次のtutorial 3では、複数のコンシューマに同一メッセージを配信する方法を解説します。

*1:com.rabbitmq.client.Channel#basicConsume の第二引数のこと。trueは一度メッセージ配信するとメッセージ確認したものと見なし、falseだと確認が帰ってくるのを待つモードになる https://www.rabbitmq.com/releases/rabbitmq-java-client/v3.6.3/rabbitmq-java-client-javadoc-3.6.3/com/rabbitmq/client/Channel.html#basicConsume(java.lang.String,%20boolean,%20com.rabbitmq.client.Consumer)

*2:there is still a short time window when RabbitMQ has accepted a message and hasn't saved it yet. が原文。ちょっと訳に自身が無い