kagamihogeの日記

kagamihogeの日記です。

RabbitMQチュートリアル3 Publish/Subscribeをテキトーに訳した

RabbitMQはじめて触ったけどよく分からん、ということでチュートリアルを読むついでにテキトーに訳すことにした。

RabbitMQチュートリアル(https://www.rabbitmq.com/getstarted.html)の3 Publish/Subscribe Java(https://www.rabbitmq.com/tutorials/tutorial-three-java.html)をテキトーに訳した。

Publish/Subscribe

(using the Java Client)

二番目のチュートリアルではワークキューを作成しました。ワークキューの暗黙の仮定は個々のタスクは一つのワーカーにだけ配信される、というものです。このパートではそれとは全く異なり、複数のコンシューマーにメッセージを配信します。このパターンは"publish/subscribe"と呼ばれています。

このパターンを説明するために、シンプルなロギングシステムを構築していきます。このシステムは二つのプログラムから構成され、前者はログメッセージを送信し、後者は受信と表示を行います。

このロギングシステムではすべての受信プログラムがメッセージを受け取ります。この方法では、ある受信プログラムはディスクにログを出力し、同時に別の受信プログラムは画面にログを表示します。

基本的に、パブリッシュされたログメッセージはすべての受信プログラムにブロードキャストされます。

Exchanges

前のチュートリアルでは、キューに対してメッセージの送受信をしていました。このパートではRabbitの完全なメッセージングモデル(full messaging model)を使います。

これまでのチュートリアルで解説したことを簡単におさらいします。

  • プロデューサー(producer)はメッセージ送信を行うユーザーアプリケーション。
  • キュー(queue)はメッセージを格納するバッファ。
  • コンシューマ(consumer)はメッセージ受信を行うユーザーアプリケーション。

RabbitMQのメッセージングモデルの基本的な考え方では、プロデューサーはキューへ直接メッセージ送信は行いません。実際、プロデューサーはメッセージがどのキューに配信されるかさえ関知しません。

その代わりに、プロデューサーはエクスチェンジ(exchange)にだけメッセージ送信を行います。エクスチェンジは非常にシンプルです。プロデューサーからメッセージを受信する一方で、他方ではキューにそのメッセージをプッシュします。エクスチェンジは受信メッセージに対して何らかの処理を行うことが必須となります。特定のキューにエクスチェンジを追加するのでしょうか? 多数のキューに追加したり、廃棄するのでしょうか? そのためのルールはエクスチェンジタイプ(exchange type)で定義されています。

exchanges.png

利用可能なエクスチェンジタイプは、direct, topic, headers, fanoutです。このパートでは一番最後のfanoutを使います。このタイプのエクスチェンジを作成し、logsと名付けます。

channel.exchangeDeclare("logs", "fanout");

fanoutは非常にシンプルです。名前からおおむね推測できるとおり*1、既知のすべてのキューにすべての受信メッセージをブロードキャストします。よって、今作ろうとしているロガーにはこれで十分です。

Listing exchanges

サーバ上のエクスチェンジを一覧するにはrabbitmqctlを使います。

$ sudo rabbitmqctl list_exchanges
Listing exchanges ...
        direct
amq.direct      direct
amq.fanout      fanout
amq.headers     headers
amq.match       headers
amq.rabbitmq.log        topic
amq.rabbitmq.trace      topic
amq.topic       topic
logs    fanout
...done.

上記の一覧にはいくつかのamq.*エクスチェンジとデフォルトの(無名な)エクスチェンジがあります。これらはデフォルトで生成されますが、現時点では使う必要はありません。

Nameless exchange

チュートリアルのこれ以前のパートではまだエクスチェンジについては触れていませんでしたが、キューにメッセージ送信することは可能でした。これが可能なのはデフォルトエクスチェンジを使用しているためで、そのエクスチェンジは空文字列("")で識別します。

前のパートではメッセージパブリッシュは以下のようにやっていました。

channel.basicPublish("", "hello", null, message.getBytes());

最初のパラメータがエクスチェンジの名前です。空文字列はデフォルトもしくは無名(nameless)エクスチェンジを指します。routingKeyが存在する場合は、メッセージはroutingKeyが指すキューにルーティングされます。

名前付きのエクスチェンジにパブリッシュするには以下のようにします。

channel.basicPublish( "logs", "", null, message.getBytes());

Temporary queues

これまでのチュートリアルでは特定の名前を持つキューを使用していました(hellotask_queue)。同一のキューをワーカーが指すようにするために、キューに名前を付けられることは重要です。プロデューサーとコンシューマでキューを共有したい場合、キューの名前は重要です。

しかし今回のロガーではその場合に当たりません。今回はすべてのログメッセージを受信したいのであり、そのサブセットではありません。過去のメッセージではなく現在進行で流れてくるメッセージにのみ関心があります。これを解決するには二つの機能が必要です。

まず、Rabbitに接続する際には常に新しい空のキューが必要です。このためにはランダム名でキューを生成しますが、それよりはサーバにランダムのキュー名を作らせる方が手軽です。

次に、コンシューマーを切断したらそのキューは自動的に削除する必要があります。

Javaクライアントでは、パラメータ無しのqueueDeclare()を使います。これにより、非durable・排他的(exclusive)・名前を自動生成してキューを自動削除、になります。

String queueName = channel.queueDeclare().getQueue();

この場合、queueNameはランダムキュー名になります。たとえばamq.gen-JzTY20BRgKO-HjmUJj0wLgのようになります。

Bindings

bindings.png

これまでの説明でfanoutエクスチェンジとキューを作りました。残りはエクスチェンジにキューへメッセージ送信させる設定です。エクスチェンジとキューの関連付けはバインディング(binding)と言います。

channel.queueBind(queueName, "logs", "");

これでlogsエクスチェンジはキューにメッセージを追加するようになります。

Listing bindings

既存のバインディングをリストするにはrabbitmqctl list_bindingsを使います。

Putting it all together

python-three-overall.png

プロデューサーのプログラムはログメッセージを送信するもので、これまでのチュートリアルとほとんど同じです。重要な変更点は、無名エクスチェンジではなくlogsエクスチェンジにメッセージをパブリッシュする点です。送信時にはroutingKeyの指定が必要ですが、fanoutエクスチェンジの場合は無視されます。以下がEmitLog.javaのプログラムになります。

import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class EmitLog {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv)
                  throws java.io.IOException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
    //...
}

(EmitLog.java source)

これまで見てきたように、コネクションを確立したあとでエクスチェンジを宣言しています。存在しないエクスチェンジへのパブリッシュは禁止されているのでこの手順が必要となります。

エクスチェンジにキューが一つもバインドされていない場合メッセージはロストしますが、このサンプルの場合にはそれで問題ありません。リッスン中のコンシューマが無いということはそのメッセージを破棄しても支障は無いからです。

ReceiveLogs.javaは以下です。

import com.rabbitmq.client.*;

import java.io.IOException;

public class ReceiveLogs {
  private static final String EXCHANGE_NAME = "logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, EXCHANGE_NAME, "");

    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope,
                                 AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
      }
    };
    channel.basicConsume(queueName, true, consumer);
  }
}

(ReceiveLogs.java source)

実行するにはまずコンパイルします。

$ javac -cp .:rabbitmq-client.jar EmitLog.java ReceiveLogs.java

ファイルにログ出力する場合はコンソールで以下のようにします。

$ java -cp .:rabbitmq-client.jar ReceiveLogs > logs_from_rabbit.log

画面のログ出力する場合は別のターミナルで以下のように実行します。

$ java -cp .:rabbitmq-client.jar ReceiveLogs

ログ送信は以下のようにします。

$ java -cp .:rabbitmq-client.jar EmitLog

rabbitmqctl list_bindingsで、プログラムが実際にキューの生成とバインディングをしているかを確認できます。ReceiveLogs.javaを二つ実行すると以下のような表示になります。

$ sudo rabbitmqctl list_bindings
Listing bindings ...
logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
...done.

実行結果の見方は簡単です。logsエクスチェンジのデータが、サーバが自動生成した名前の二つのキューに流れています。これは意図した通りの動作です。

メッセージのサブセットをリッスンする方法を学ぶには、次のtutorial 4を参照してください。

*1:fan outには扇形に広げるとか展開させるとかそういう意味がある http://eow.alc.co.jp/search?q=fan-out&ref=wl