kagamihogeの日記

kagamihogeの日記です。

RabbitMQチュートリアル1 "Hello World!" Javaをテキトーに訳した

RabbitMQはじめて触ったけどよく分からん、ということでチュートリアルを読むついでにテキトーに訳すことにした。

RabbitMQチュートリアル(https://www.rabbitmq.com/getstarted.html)の1 "Hello World!" Java (https://www.rabbitmq.com/tutorials/tutorial-one-java.html)をテキトーに訳した。

1 "Hello World!"

Prerequisites

このチュートリアルはRabbitMQをインストールしてlocalhostの標準ポート(5672)で動作していることを想定しています。異なるホスト・ポート・クレデンシャル・接続設定を使う場合には必要に応じて調整していください。

Introduction

RabbitMQはメッセージブローカーです。基本的な考え方は極めてシンプルで、メッセージを受信もしくは送信します。郵便局のようなものと考えることが出来ます。郵便ポストに手紙を送信すると、郵便屋さんが送り先に送信してくれます。この例えでいうと、RabbitMQはポスト・郵便局・郵便屋となります。

RabbitMQと郵便局の主な違いは紙を扱わないことで、メッセージ(messages)というデータのバイナリを受信・格納・送信します。

RabbitMQと、いわゆるメッセージングは、同じ用語を持ちいます。

プロデュース(Producing)とは、単に送信を意味します。メッセージ送信を行うプログラムはプロデューサー(producer)と言います。このドキュメントでは以下のように"P"で図示します。

producer.png

キュー(キュー)とは、メールボックスの名前で、RabbitMQ内部に存在します。メッセージはRabbitMQとアプリケーションに流れていきますが、キューにだけ保存できます。キューには特に制限は無く、好きなだけメッセージを保存できます。本質的には無限バッファです。複数のプロデューサが一つのキューに向けてメッセージを送信可能で、複数のコンシューマが一つのキューからデータ受信を試行出来ます。このドキュメントでは以下のように上部に名前を持つ図で示します。

queue.png

コンシューム(Consuming)は受信と同様の意味です。コンシューマーはメッセージ受信のために大抵は待ち続けるプログラムです。このドキュメントでは以下のように"C"で図示します。

consumer.png

なお、プロデューサー・コンシューマ・ブローカは同一マシンに存在しなくても構いませんし、実際に大抵のアプリケーションではそうしていません。

"Hello World"

(using the Java Client)

チュートリアルのこのパートでは二つのJavaプログラムを作成します。一つのメッセージを送信するプロデューサーと、メッセージを受信してそれを表示するコンシューマです。Java APIの詳細に立ち入るのは避けて、チュートリアルに必要な最小限にだけ解説します。いわゆる"Hello World"的なことをメッセージングでやります。

以下の図において、"P"がプロデューサーで"C"がコンシューマーです。真ん中の長方形はキューで、コンシューマーのためにRabbitMQがメッセージを保存し続けるバッファです。 python-one.png

The Java client library

RabbitMQは複数のプロトコルをサポートしています。このチュートリアルではAMQP 0-9-1を使用します。AMQP 0-9-1はオープンなメッセージング用の汎用プロトコルです。様々な言語のRabbitMQクライアントが存在します。ここではRabbitMQが提供するJavaクライアントを使います。

クライアントライブラリパッケージをダウンロードし、シグネチャをチェックして下さい。作業ディレクトリにunzipしてそこからJARファイルを取得して下さい。

$ unzip rabbitmq-java-client-bin-*.zip
$ cp rabbitmq-java-client-bin-*/*.jar ./

(なお、RabbitMQ Javaクライアントはcentral Maven repositoryにもあり、グループIDはcom.rabbitmqアーティファクトIDはamqp-clientです)

Javaクライアントと依存性を準備出来たとして、コードを書いていきます。

Sending

sending.png

メッセージの送信側をSend、受信側をRecvとします。送信側はRabbitMQに接続後に単一のメッセージを送信してから終了します。

Send.javaではいくつのクラスをインポートします。

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

クラスとキュー名を定義します。

public class Send {
  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv)
      throws java.io.IOException {
      ...
  }
}

それから、サーバ接続を作成します。

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

connectionはソケット接続を抽象化し、protocol version negotiationと認証などを扱います。ここではローカルマシンlocalhost上のブローカに接続します。異なるマシンのブローカに接続したい場合、名前やIPアドレスを指定します。

次にチャネルを作成しており、ここに各種の操作をするためのAPIの多くが存在します。

送信を行うには、送信を行うためのキューを宣言する必要があり、それからキューにメッセージのパブリッシュが可能となります。

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    String message = "Hello World!";
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");

キューの宣言はべき等(idempotent)で、既に存在しない場合にだけ生成されます。メッセージの中身はバイト配列のため、好きなようにエンコード出来ます。

最後に、チャネルとコネクションをクローズします。

    channel.close();
    connection.close();

Send.javaのソースコード全体はこちら

Sending doesn't work!

RabbitMQを初めて使う場合には"Sent"が出力されず何がおかしいんだと頭を抱えるかもしれません。おそらく、ブローカを空きディスク容量(デフォルトで少なくとも1Gb必要)が足らない状態で開始すると、メッセージの受信を拒否します。ブローカーのログファイルを確認して必要に応じて制限を変更します。設定ファイルのドキュメントdisk_free_limitの設定方法があります。

Receiving

送信側については以上です。受信側はRabbitMQからメッセージがプッシュされるため、一つのメッセージをパブリッシュする送信側とは異なり、メッセージをリッスンし続けます。

receiving.png

コード(Recv.java)もSendと似たようなインポートを行います。

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;

DefaultConsumerConsumerインタフェースの実装クラスで、サーバからプッシュされるメッセージのバッファに使います。

開始処理は送信側と同じです。コネクションとチャネルをオープンして、コンシュームを行うためのキューを宣言します。キュー名はsendがパブリッシュするものと同一にします。

public class Recv {
  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv)
      throws java.io.IOException,
             java.lang.InterruptedException {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    ...
    }
}

同じキューを宣言している点に注意してください。送信側の前に受信側を開始しておくことも可能なので、キューからメッセージを受信し始める前にそのキューが存在することを確認する必要があります。

サーバに対してキューからメッセージを配信するよう指示します。サーバは非同期的にメッセージをプッシュするため、メッセージが使用可能になるまでバッファするオブジェクト形式のコールバックを作成します。これにはDefaultConsumerのサブクラスを作ります。

    Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
          throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
      }
    };
    channel.basicConsume(QUEUE_NAME, true, consumer);

Recv.javaクラスのソースコードはこちら

Putting it all together

これらのクラスのコンパイルにはクラスパスにRabbitMQ javaクライアントを加えます。

$ javac -cp rabbitmq-client.jar Send.java Recv.java

実行するにはrabbitmq-client.jarとその依存関係をクラスパスに追加します。ターミナルで送信側を動かすには、

$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Send

受信側を動かすには、

$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Recv

Windowsでは、クラスパスの区切り文字にコロンではなくセミコロンを使います。

受信側はRabbitMQ経由で送信側のメッセージを取得して表示します。受信側はメッセージ受信のために待ち続けます(Ctrl-Cで停止)。そのため、別のターミナルから送信側を実行してください。

キューをチェックしたい場合はrabbitmqctl list_queuesを使います。

次にパート2に進んでワークキュー(work queue)を作ります。