kagamihogeの日記

kagamihogeの日記です。

RabbitMQチュートリアル1 "Hello World!" Javaをテキトーに訳した

RabbitMQはじめて触ったけどよく分からん、ということでチュートリアルを読むついでにテキトーに訳すことにした。

RabbitMQチュートリアル(https://www.rabbitmq.com/getstarted.html)の1 "Hello World!" Java (https://www.rabbitmq.com/tutorials/tutorial-one-java.html)をテキトーに訳した。

1 "Hello World!"

Prerequisites

このチュートリアルはRabbitMQをインストールしてlocalhostの標準ポート(5672)で動作していることを想定しています。異なるホスト・ポート・クレデンシャル・接続設定を使う場合には必要に応じて調整していください。

Introduction

RabbitMQはメッセージブローカーです。基本的な考え方は極めてシンプルで、メッセージを受信もしくは送信します。郵便局のようなものと考えることが出来ます。郵便ポストに手紙を送信すると、郵便屋さんが送り先に送信してくれます。この例えでいうと、RabbitMQはポスト・郵便局・郵便屋となります。

RabbitMQと郵便局の主な違いは紙を扱わないことで、メッセージ(messages)というデータのバイナリを受信・格納・送信します。

RabbitMQと、いわゆるメッセージングは、同じ用語を持ちいます。

プロデュース(Producing)とは、単に送信を意味します。メッセージ送信を行うプログラムはプロデューサー(producer)と言います。このドキュメントでは以下のように"P"で図示します。

producer.png

キュー(キュー)とは、メールボックスの名前で、RabbitMQ内部に存在します。メッセージはRabbitMQとアプリケーションに流れていきますが、キューにだけ保存できます。キューには特に制限は無く、好きなだけメッセージを保存できます。本質的には無限バッファです。複数のプロデューサが一つのキューに向けてメッセージを送信可能で、複数のコンシューマが一つのキューからデータ受信を試行出来ます。このドキュメントでは以下のように上部に名前を持つ図で示します。

queue.png

コンシューム(Consuming)は受信と同様の意味です。コンシューマーはメッセージ受信のために大抵は待ち続けるプログラムです。このドキュメントでは以下のように"C"で図示します。

consumer.png

なお、プロデューサー・コンシューマ・ブローカは同一マシンに存在しなくても構いませんし、実際に大抵のアプリケーションではそうしていません。

"Hello World"

(using the Java Client)

チュートリアルのこのパートでは二つのJavaプログラムを作成します。一つのメッセージを送信するプロデューサーと、メッセージを受信してそれを表示するコンシューマです。Java APIの詳細に立ち入るのは避けて、チュートリアルに必要な最小限にだけ解説します。いわゆる"Hello World"的なことをメッセージングでやります。

以下の図において、"P"がプロデューサーで"C"がコンシューマーです。真ん中の長方形はキューで、コンシューマーのためにRabbitMQがメッセージを保存し続けるバッファです。 python-one.png

The Java client library

RabbitMQは複数のプロトコルをサポートしています。このチュートリアルではAMQP 0-9-1を使用します。AMQP 0-9-1はオープンなメッセージング用の汎用プロトコルです。様々な言語のRabbitMQクライアントが存在します。ここではRabbitMQが提供するJavaクライアントを使います。

クライアントライブラリパッケージをダウンロードし、シグネチャをチェックして下さい。作業ディレクトリにunzipしてそこからJARファイルを取得して下さい。

$ unzip rabbitmq-java-client-bin-*.zip
$ cp rabbitmq-java-client-bin-*/*.jar ./

(なお、RabbitMQ Javaクライアントはcentral Maven repositoryにもあり、グループIDはcom.rabbitmqアーティファクトIDはamqp-clientです)

Javaクライアントと依存性を準備出来たとして、コードを書いていきます。

Sending

sending.png

メッセージの送信側をSend、受信側をRecvとします。送信側はRabbitMQに接続後に単一のメッセージを送信してから終了します。

Send.javaではいくつのクラスをインポートします。

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

クラスとキュー名を定義します。

public class Send {
  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv)
      throws java.io.IOException {
      ...
  }
}

それから、サーバ接続を作成します。

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

connectionはソケット接続を抽象化し、protocol version negotiationと認証などを扱います。ここではローカルマシンlocalhost上のブローカに接続します。異なるマシンのブローカに接続したい場合、名前やIPアドレスを指定します。

次にチャネルを作成しており、ここに各種の操作をするためのAPIの多くが存在します。

送信を行うには、送信を行うためのキューを宣言する必要があり、それからキューにメッセージのパブリッシュが可能となります。

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    String message = "Hello World!";
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");

キューの宣言はべき等(idempotent)で、既に存在しない場合にだけ生成されます。メッセージの中身はバイト配列のため、好きなようにエンコード出来ます。

最後に、チャネルとコネクションをクローズします。

    channel.close();
    connection.close();

Send.javaのソースコード全体はこちら

Sending doesn't work!

RabbitMQを初めて使う場合には"Sent"が出力されず何がおかしいんだと頭を抱えるかもしれません。おそらく、ブローカを空きディスク容量(デフォルトで少なくとも1Gb必要)が足らない状態で開始すると、メッセージの受信を拒否します。ブローカーのログファイルを確認して必要に応じて制限を変更します。設定ファイルのドキュメントdisk_free_limitの設定方法があります。

Receiving

送信側については以上です。受信側はRabbitMQからメッセージがプッシュされるため、一つのメッセージをパブリッシュする送信側とは異なり、メッセージをリッスンし続けます。

receiving.png

コード(Recv.java)もSendと似たようなインポートを行います。

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;

DefaultConsumerConsumerインタフェースの実装クラスで、サーバからプッシュされるメッセージのバッファに使います。

開始処理は送信側と同じです。コネクションとチャネルをオープンして、コンシュームを行うためのキューを宣言します。キュー名はsendがパブリッシュするものと同一にします。

public class Recv {
  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv)
      throws java.io.IOException,
             java.lang.InterruptedException {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    ...
    }
}

同じキューを宣言している点に注意してください。送信側の前に受信側を開始しておくことも可能なので、キューからメッセージを受信し始める前にそのキューが存在することを確認する必要があります。

サーバに対してキューからメッセージを配信するよう指示します。サーバは非同期的にメッセージをプッシュするため、メッセージが使用可能になるまでバッファするオブジェクト形式のコールバックを作成します。これにはDefaultConsumerのサブクラスを作ります。

    Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
          throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
      }
    };
    channel.basicConsume(QUEUE_NAME, true, consumer);

Recv.javaクラスのソースコードはこちら

Putting it all together

これらのクラスのコンパイルにはクラスパスにRabbitMQ javaクライアントを加えます。

$ javac -cp rabbitmq-client.jar Send.java Recv.java

実行するにはrabbitmq-client.jarとその依存関係をクラスパスに追加します。ターミナルで送信側を動かすには、

$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Send

受信側を動かすには、

$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Recv

Windowsでは、クラスパスの区切り文字にコロンではなくセミコロンを使います。

受信側はRabbitMQ経由で送信側のメッセージを取得して表示します。受信側はメッセージ受信のために待ち続けます(Ctrl-Cで停止)。そのため、別のターミナルから送信側を実行してください。

キューをチェックしたい場合はrabbitmqctl list_queuesを使います。

次にパート2に進んでワークキュー(work queue)を作ります。

RabbitMQチュートリアル2 Work Queues Javaをテキトーに訳した

RabbitMQはじめて触ったけどよく分からん、ということでチュートリアルを読むついでにテキトーに訳すことにした。

RabbitMQチュートリアル(https://www.rabbitmq.com/getstarted.html)の2 Work Queues Java(https://www.rabbitmq.com/tutorials/tutorial-two-java.html)をテキトーに訳した。

2 Work queues

python-two.png

一つ目のチュートリアルではキューにメッセージを送信して受信するプログラムを作りました。このパートではワークキュー(Work Queue)を作成します。ワークキューは複数のワーカに時間のかかるタスクを分配します。

ワークキュー(別名:タスクキュー(Task Queues))の基本的な考え方は、リソースを浪費するタスクを即時実行してから完了まで待機することを避ける点にあります。その代わりに、タスクを後で実行するようスケジュールします。タスク(Task)はメッセージにカプセル化してキューに送信します。バックグラウンドで実行中のワーカープロセスがタスクをポップしてジョブを実行します。複数のワーカーを実行する場合、タスクはワーカー間で共有されます。

このやり方は、短時間のHTTPリクエストで複雑なタスク処理をするのが難しいwebアプリケーションで特に有効です。

Preparation

チュートリアルの一つ目のパートでは"Hello World!"というメッセージを送信しました。このパートでは複雑なタスクを表現する文字列を送信します。実際のタスク、たとえばリサイズ対象のイメージやレンダするPDFファイルなどは用意せず、Thread.sleep()のビジーによりフェイクの処理とします。複雑さの表現は文字列の複数のドットで行います。各ドットは1秒の"work"にに相当します。たとえば、Hello...というフェイクのタスクは3秒かかります。

一つ目のサンプルのSend.javaを少々修正し、コマンドラインから任意のメッセージを送信出来るようにします。このプログラムはワークキューにタスクをスケジュールするので、NewTask.javaという名前にします。

String message = getMessage(argv);

channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

コマンドライン引数からメッセージを取得するヘルパ関数を作ります。

private static String getMessage(String[] strings){
    if (strings.length < 1)
        return "Hello World!";
    return joinStrings(strings, " ");
}

private static String joinStrings(String[] strings, String delimiter) {
    int length = strings.length;
    if (length == 0) return "";
    StringBuilder words = new StringBuilder(strings[0]);
    for (int i = 1; i < length; i++) {
        words.append(delimiter).append(strings[i]);
    }
    return words.toString();
}

Recv.javaもまたいくつか修正が必要です。メッセージボディのドットに従ってワークにフェイクの秒数を取る必要があります。配信メッセージを処理してタスクを実行するので、Worker.javaという名前にします。

final Consumer consumer = new DefaultConsumer(channel) {
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    String message = new String(body, "UTF-8");

    System.out.println(" [x] Received '" + message + "'");
    try {
      doWork(message);
    } finally {
      System.out.println(" [x] Done");
    }
  }
};
channel.basicConsume(TASK_QUEUE_NAME, true, consumer);

フェイクタスクの実行は以下のようにします。

private static void doWork(String task) throws InterruptedException {
    for (char ch: task.toCharArray()) {
        if (ch == '.') Thread.sleep(1000);
    }
}

一つ目のチュートリアル同様にコンパイルします。

$ javac -cp rabbitmq-client.jar NewTask.java Worker.java

Round-robin dispatching

タスクキューを使用する利点の一つは、ワークを簡単に並行処理できる点です。ワークの未処理が累積してきた場合、簡単にワーカーを追加できるため、スケールがやりやすいです。

まず、同時に二つのワーカーインスタンスで実行してみようと思います。二つのワーカーが一つのキューからメッセージを取得しますが、具体的なやり方を見て行きます。

コンソールを三つ開きます。二つはワーカープログラムを実行します。このコンソールはコンシューマーC1とC2になります。

shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
 [*] Waiting for messages. To exit press CTRL+C
shell2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
 [*] Waiting for messages. To exit press CTRL+C

三つ目のコンソールで新しいタスクをパブリッシュします。コンシューマを開始すればメッセージのパブリッシュが可能となります。

shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask First message.
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Second message..
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Third message...
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Fourth message....
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Fifth message.....

ワーカーに配信されると以下のようになります。

shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'First message.'
 [x] Received 'Third message...'
 [x] Received 'Fifth message.....'
shell2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Second message..'
 [x] Received 'Fourth message....'

デフォルトでは、RabbitMQは順繰りに次のコンシューマーへと個々のメッセージを送信していきます。平均すると、どのコンシューマも同じ数のメッセージを取得します。このようなメッセージ配信方法はラウンドロビン(round-robin)と言います。3つ以上のワーカーでどうなるか試してみて下さい。

Message acknowledgment

タスクの実行には数秒を要します。このとき、コンシューマーの一つが長時間タスクを開始して処理途中で死んだ場合どうなるか、と疑問に思うかもしれません。現状のコードだと、RabbitMQはカスタマーにメッセージを配信するとメモリからすぐに削除します。この場合、ワーカーを殺すとそのワーカーで処理中のメッセージはすべてロストします。また、このワーカーにディスパッチされたすべてのメッセージもロストし、未処理のままとなります。

しかし、普通はタスクのロストは望ましくありません。あるワーカーが死ぬ場合、別のワーカーにタスクが配信されるのが望ましいです。

メッセージを決してロストしないようにするのに、RabbitMQはメッセージ確認(acknowledgments)をサポートしています。ackとは、ある特定のメッセージを受信および処理したことをRabbitMQへ通知するために、コンシューマーから送り返されるものです。そのあとRabbitMQでメッセージの削除が可能になります。

ackを送信せずにコンシューマが死ぬ場合(チャンネルがクローズした・コネクションがクローズした・TCPコネクションがロストした)、RabbitMQはメッセージが完全に処理されなかったことを認識してキューに再度入れます。その段階で他にオンラインのコンシューマがまだ存在する場合、別のコンシューマへ直ちに再配信されます。この方法によりワーカーが時々死んだとしてもメッセージがロストしないことを保証出来ます。

現状のサンプルにはメッセージタイムアウトを設けておらず、コンシューマーが死ぬとRabbitMQはメッセージを再配信します。メッセージ処理に極めて長時間かかる場合でも問題無く動作します。

メッセージ確認はデフォルトで有効です。以前のサンプルではautoAck=trueフラグ*1によって明示的に無効化していました。このフラグを削除してワーカーから適切な確認を送信するようにします。

channel.basicQos(1);

final Consumer consumer = new DefaultConsumer(channel) {
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    String message = new String(body, "UTF-8");

    System.out.println(" [x] Received '" + message + "'");
    try {
      doWork(message);
    } finally {
      System.out.println(" [x] Done");
      channel.basicAck(envelope.getDeliveryTag(), false);
    }
  }
};

このコードによって、ワーカーがメッセージ処理中にCTRL+Cで死んだとしても、メッセージはロストしません。ワーカーが死ぬとすぐに未確認メッセージは再配信されます。

Forgotten acknowledgment

よくあるミスはbasicAckの書き忘れです。間違いやすい割りに影響は甚大です。クライアントが停止するとメッセージは再配信されますが、RabbitMQは未確認メッセージの解放が出来ないためにメモリを消費し続けます。

この種のミスをデバッグするにはrabbitmqctlmessages_unacknowledgedフィールドを表示します。

$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello    0       0
...done.

Message durability

コンシューマ死亡時の扱い方について解説し、これでタスクはロストしません。しかしtRabbitMQサーバ停止時には依然としてタスクはロストします。

RabbitMQが停止もしくはクラッシュすると、設定しない限り、キューとメッセージは消滅します。メッセージロストしないようにするためにやるべきことは2つあり、キューとメッセージをdurableに設定します。

まず、RabbitMQがキューを削除しないようにします。そのためには、キューをdurableと宣言します。

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);

上記のコマンドそのものは問題無いですが、現状の設定では動作しません。その理由はhelloという名前のキューをnot durableとして定義済みだからです。RabbitMQは既存のキューを異なるパラメータでの再定義を許可しておらず、これを試みるプログラムにはエラーを戻します。ここでは場当たり的な対処として、サンプル用に別の名前のtask_queueというキューを宣言します。

boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

queueDeclareの修正はプロデューサーとコンシューマの両方で必要です。

これでRabbitMQをリスタートしてもtask_queueキューが失われないようになりました。次に、メッセージをpersistentにするため、MessagePropertiesBasicPropertiesの実装)にPERSISTENT_TEXT_PLAINを設定します。

import com.rabbitmq.client.MessageProperties;

channel.basicPublish("", "task_queue",
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());

Note on message persistence

メッセージをpersistentと設定することは、メッセージロストしないことを完全には保証しません。この設定はメッセージをディスクに格納するようRabbitMQに指示しますが、RabbitMQのメッセージ受信時に短時間ですが保存されません*2。また、RabbitMQはすべてのメッセージに対しfsync(2)はしません。キャッシュには保存されるもののディスクには書き出されないかもしれません。persistenceは強力ではありませんが、単純なタスクキューには十分です。より強力な保証が必要な場合はpublisher confirmsを使用してください。

Fair dispatch

ディスパッチが我々の期待通りに動作しないと思うかもしれません。例えば、二つのワーカーがあるとして、奇数メッセージは重くて偶数は軽い場合、片方のワーカーは常にビジーでもう片方の負荷は軽いままです。RabbitMQは負荷については関知しないので均等にメッセージをディスパッチします。

RabbitMQはメッセージがキューに入ると直ちにディスパッチするために上記のような動作になります。RabbitMQはコンシューマの多数の未確認メッセージを参照しません。よって、n番目のメッセージはn番目のコンシューマーに漫然とディスパッチされます。

prefetch-count.png

これに対してはbasicQosメソッドにprefetchCount = 1を指定します。これにより、RabbitMQはあるワーカーに対し一度に一つ以上のメッセージを与えなくなります。言い換えると、一つ前のメッセージが処理されて確認されるまで、新しいメッセージはワーカーに配信されません。すなわち、ディスパッチするワーカーはビジーではない、ということです。

int prefetchCount = 1;
channel.basicQos(prefetchCount);

Note about queue size

すべてのワーカーがビジーだとキューに滞留します。キューを監視してワーカーを追加したり、別の方法を試したいと考えるようになるかもしれません。

Putting it all together

最終的なNewTask.javaクラスのコードになります。

import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

public class NewTask {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv)
                      throws java.io.IOException {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

    String message = getMessage(argv);

    channel.basicPublish( "", TASK_QUEUE_NAME,
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");

    channel.close();
    connection.close();
  }      
  //...
}

(NewTask.java source)

Worker.java

import com.rabbitmq.client.*;

import java.io.IOException;

public class Worker {
  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    final Connection connection = factory.newConnection();
    final Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    channel.basicQos(1);

    final Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");

        System.out.println(" [x] Received '" + message + "'");
        try {
          doWork(message);
        } finally {
          System.out.println(" [x] Done");
          channel.basicAck(envelope.getDeliveryTag(), false);
        }
      }
    };
    channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
  }

  private static void doWork(String task) {
    for (char ch : task.toCharArray()) {
      if (ch == '.') {
        try {
          Thread.sleep(1000);
        } catch (InterruptedException _ignored) {
          Thread.currentThread().interrupt();
        }
      }
    }
  }
}

(Worker.java source)

メッセージ確認とprefetchCountを使うにはワークキューの設定が必要です。durabilityオプションによってRabbitMQリスタート時にもタスクが保存されたままに出来ます。

ChannelメソッドとMessagePropertiesの詳細についてはjavadocsを参照してください。

次のtutorial 3では、複数のコンシューマに同一メッセージを配信する方法を解説します。

*1:com.rabbitmq.client.Channel#basicConsume の第二引数のこと。trueは一度メッセージ配信するとメッセージ確認したものと見なし、falseだと確認が帰ってくるのを待つモードになる https://www.rabbitmq.com/releases/rabbitmq-java-client/v3.6.3/rabbitmq-java-client-javadoc-3.6.3/com/rabbitmq/client/Channel.html#basicConsume(java.lang.String,%20boolean,%20com.rabbitmq.client.Consumer)

*2:there is still a short time window when RabbitMQ has accepted a message and hasn't saved it yet. が原文。ちょっと訳に自身が無い