RabbitMQはじめて触ったけどよく分からん、ということでチュートリアルを読むついでにテキトーに訳すことにした。
RabbitMQチュートリアル(https://www.rabbitmq.com/getstarted.html)の3 Publish/Subscribe Java(https://www.rabbitmq.com/tutorials/tutorial-three-java.html)をテキトーに訳した。
Publish/Subscribe
(using the Java Client)
二番目のチュートリアルではワークキューを作成しました。ワークキューの暗黙の仮定は個々のタスクは一つのワーカーにだけ配信される、というものです。このパートではそれとは全く異なり、複数のコンシューマーにメッセージを配信します。このパターンは"publish/subscribe"と呼ばれています。
このパターンを説明するために、シンプルなロギングシステムを構築していきます。このシステムは二つのプログラムから構成され、前者はログメッセージを送信し、後者は受信と表示を行います。
このロギングシステムではすべての受信プログラムがメッセージを受け取ります。この方法では、ある受信プログラムはディスクにログを出力し、同時に別の受信プログラムは画面にログを表示します。
基本的に、パブリッシュされたログメッセージはすべての受信プログラムにブロードキャストされます。
Exchanges
前のチュートリアルでは、キューに対してメッセージの送受信をしていました。このパートではRabbitの完全なメッセージングモデル(full messaging model)を使います。
これまでのチュートリアルで解説したことを簡単におさらいします。
- プロデューサー(producer)はメッセージ送信を行うユーザーアプリケーション。
- キュー(queue)はメッセージを格納するバッファ。
- コンシューマ(consumer)はメッセージ受信を行うユーザーアプリケーション。
RabbitMQのメッセージングモデルの基本的な考え方では、プロデューサーはキューへ直接メッセージ送信は行いません。実際、プロデューサーはメッセージがどのキューに配信されるかさえ関知しません。
その代わりに、プロデューサーはエクスチェンジ(exchange)にだけメッセージ送信を行います。エクスチェンジは非常にシンプルです。プロデューサーからメッセージを受信する一方で、他方ではキューにそのメッセージをプッシュします。エクスチェンジは受信メッセージに対して何らかの処理を行うことが必須となります。特定のキューにエクスチェンジを追加するのでしょうか? 多数のキューに追加したり、廃棄するのでしょうか? そのためのルールはエクスチェンジタイプ(exchange type)で定義されています。
利用可能なエクスチェンジタイプは、direct
, topic
, headers
, fanout
です。このパートでは一番最後のfanout
を使います。このタイプのエクスチェンジを作成し、logs
と名付けます。
channel.exchangeDeclare("logs", "fanout");
fanoutは非常にシンプルです。名前からおおむね推測できるとおり*1、既知のすべてのキューにすべての受信メッセージをブロードキャストします。よって、今作ろうとしているロガーにはこれで十分です。
Listing exchanges
サーバ上のエクスチェンジを一覧するにはrabbitmqctl
を使います。
$ sudo rabbitmqctl list_exchanges Listing exchanges ... direct amq.direct direct amq.fanout fanout amq.headers headers amq.match headers amq.rabbitmq.log topic amq.rabbitmq.trace topic amq.topic topic logs fanout ...done.
上記の一覧にはいくつかのamq.*
エクスチェンジとデフォルトの(無名な)エクスチェンジがあります。これらはデフォルトで生成されますが、現時点では使う必要はありません。
Nameless exchange
チュートリアルのこれ以前のパートではまだエクスチェンジについては触れていませんでしたが、キューにメッセージ送信することは可能でした。これが可能なのはデフォルトエクスチェンジを使用しているためで、そのエクスチェンジは空文字列(""
)で識別します。
前のパートではメッセージパブリッシュは以下のようにやっていました。
channel.basicPublish("", "hello", null, message.getBytes());
最初のパラメータがエクスチェンジの名前です。空文字列はデフォルトもしくは無名(nameless)エクスチェンジを指します。routingKey
が存在する場合は、メッセージはroutingKey
が指すキューにルーティングされます。
名前付きのエクスチェンジにパブリッシュするには以下のようにします。
channel.basicPublish( "logs", "", null, message.getBytes());
Temporary queues
これまでのチュートリアルでは特定の名前を持つキューを使用していました(hello
とtask_queue
)。同一のキューをワーカーが指すようにするために、キューに名前を付けられることは重要です。プロデューサーとコンシューマでキューを共有したい場合、キューの名前は重要です。
しかし今回のロガーではその場合に当たりません。今回はすべてのログメッセージを受信したいのであり、そのサブセットではありません。過去のメッセージではなく現在進行で流れてくるメッセージにのみ関心があります。これを解決するには二つの機能が必要です。
まず、Rabbitに接続する際には常に新しい空のキューが必要です。このためにはランダム名でキューを生成しますが、それよりはサーバにランダムのキュー名を作らせる方が手軽です。
次に、コンシューマーを切断したらそのキューは自動的に削除する必要があります。
Javaクライアントでは、パラメータ無しのqueueDeclare()
を使います。これにより、非durable・排他的(exclusive)・名前を自動生成してキューを自動削除、になります。
String queueName = channel.queueDeclare().getQueue();
この場合、queueName
はランダムキュー名になります。たとえばamq.gen-JzTY20BRgKO-HjmUJj0wLg
のようになります。
Bindings
これまでの説明でfanoutエクスチェンジとキューを作りました。残りはエクスチェンジにキューへメッセージ送信させる設定です。エクスチェンジとキューの関連付けはバインディング(binding)と言います。
channel.queueBind(queueName, "logs", "");
これでlogs
エクスチェンジはキューにメッセージを追加するようになります。
Listing bindings
既存のバインディングをリストするにはrabbitmqctl list_bindings
を使います。
Putting it all together
プロデューサーのプログラムはログメッセージを送信するもので、これまでのチュートリアルとほとんど同じです。重要な変更点は、無名エクスチェンジではなくlogs
エクスチェンジにメッセージをパブリッシュする点です。送信時にはroutingKey
の指定が必要ですが、fanout
エクスチェンジの場合は無視されます。以下がEmitLog.java
のプログラムになります。
import java.io.IOException; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; public class EmitLog { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws java.io.IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } //... }
これまで見てきたように、コネクションを確立したあとでエクスチェンジを宣言しています。存在しないエクスチェンジへのパブリッシュは禁止されているのでこの手順が必要となります。
エクスチェンジにキューが一つもバインドされていない場合メッセージはロストしますが、このサンプルの場合にはそれで問題ありません。リッスン中のコンシューマが無いということはそのメッセージを破棄しても支障は無いからです。
ReceiveLogs.java
は以下です。
import com.rabbitmq.client.*; import java.io.IOException; public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } }
実行するにはまずコンパイルします。
$ javac -cp .:rabbitmq-client.jar EmitLog.java ReceiveLogs.java
ファイルにログ出力する場合はコンソールで以下のようにします。
$ java -cp .:rabbitmq-client.jar ReceiveLogs > logs_from_rabbit.log
画面のログ出力する場合は別のターミナルで以下のように実行します。
$ java -cp .:rabbitmq-client.jar ReceiveLogs
ログ送信は以下のようにします。
$ java -cp .:rabbitmq-client.jar EmitLog
rabbitmqctl list_bindings
で、プログラムが実際にキューの生成とバインディングをしているかを確認できます。ReceiveLogs.java
を二つ実行すると以下のような表示になります。
$ sudo rabbitmqctl list_bindings Listing bindings ... logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue [] logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue [] ...done.
実行結果の見方は簡単です。logs
エクスチェンジのデータが、サーバが自動生成した名前の二つのキューに流れています。これは意図した通りの動作です。
メッセージのサブセットをリッスンする方法を学ぶには、次のtutorial 4を参照してください。
*1:fan outには扇形に広げるとか展開させるとかそういう意味がある http://eow.alc.co.jp/search?q=fan-out&ref=wl