RabbitMQはじめて触ったけどよく分からん、ということでチュートリアルを読むついでにテキトーに訳すことにした。
RabbitMQチュートリアル(https://www.rabbitmq.com/getstarted.html)の4 Routing Java(https://www.rabbitmq.com/tutorials/tutorial-four-java.html)をテキトーに訳した。
Routing
(using the Java client)
先のチュートリアルではシンプルなロギングシステムを作りました。これで複数のレシーバにログメッセージをブロードキャスト可能になりました。
このチュートリアルでは更に機能追加を行い、メッセージのサブセットのみサブスクライブ可能にします。たとえば、クリティカルなエラーメッセージのみログファイルに向ける一方で、コンソールにはログメッセージをすべて出力できるようにします。
Bindings
先の例でバインディングの作成方法については解説しました。バインディングのコードは以下のようになります。
channel.queueBind(queueName, EXCHANGE_NAME, "");
バインディングはエクスチェンジとキューとの関連付けです。つまり、キューはエクスチェンジからのメッセージのみ受け付けます。
バインディングはルーティングキー(routingKey
)パラメータも取ることが出来ます。basic_publish
のパラメータとの混乱を避けるため、バインディングのパラメータはバインディングキー(binding key
)と呼ぶことにします*1。キーと一緒にバインディングを作るには以下のようにします。
channel.queueBind(queueName, EXCHANGE_NAME, "black");
バインディングキーの意味付けはエクスチェンジタイプに依存します。fanout
エクスチェンジでは前に使用したとおり無視されます。
Direct exchange
先のチュートリアルのロギングシステムではすべてのコンシューマーにすべてのメッセージをブロードキャストしていました。今回は重要度(severity)に応じてメッセージのフィルタリングが可能なように拡張します。たとえば、警告や情報のログメッセージでディスク容量を浪費しないよう、クリティカルなエラーを受信するときにだけディスクにログメッセージを書き出すようにします。
fanout
エクスチェンジにはそのような柔軟性は無く、単なるブロードキャストの機能しかありません。
その代わりにdirect
エクスチェンジを使います。direct
エクスチェンジのルーティングアルゴリズムはシンプルで、メッセージのルーティングキーにマッチするバインディングキーを持つキューにメッセージを送信します。
これの解説のために以下のような設定を行います。
この設定では、direct
エクスチェンジX
に二つのキューをバインドします。前者のキューはバインディングキーorange
でバインドし、後者は二つのバインディングを持ち、一つはバインディングキーblack
でもう片方はgreen
です。
この設定では、エクスチェンジにパブリッシュされるメッセージのうち、ルーティングキーorange
を持つメッセージはキューQ1
にルーティングされます。ルーティングキーがblack
ないしgreen
のメッセージはQ2
に行きます。それ以外のすべてのメッセージは破棄されます。
Multiple bindings
同一のバインディングキーで複数のキューをバインドするのは全く問題ありません。上記のようにX
とQ1
はバインディングキーblack
でバインディングを作成できます。この場合、direct
エクスチェンジはfanout
のように振る舞い、キーにマッチするすべてのキューにメッセージをブロードキャストします。ルーティングキーblack
を持つメッセージはQ1
とQ2
に配信されます。
Emitting logs
チュートリアルのロギングシステムでは上記のモデルを使います。fanout
ではなくdirect
エクスチェンジにメッセージを送信します。ログの重要度はルーティングキーで指定します。これにより、受信側のプログラムは受信したい重要度のログを選択可能になります。まずログ送信側を見て行きます。
これまで通り、まずエクスチェンジを作成します。
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
次に、メッセージ送信の開始は以下のようにします。
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
説明の簡略化のため、'severity'には'info', 'warning', 'error'のいずれか一つを指定するものとします。
Subscribing
メッセージ受信は以前のチュートリアルとほぼ同じですが、一つ違いがあり、受信したい重要度ごとにバインディングを作成します。
String queueName = channel.queueDeclare().getQueue();
for(String severity : argv){
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
Putting it all together
EmitLogDirect.java
クラスのコードです。
public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws java.io.IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String severity = getSeverity(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes()); System.out.println(" [x] Sent '" + severity + "':'" + message + "'"); channel.close(); connection.close(); } //.. }
ReceiveLogsDirect.java
のコードです。
import com.rabbitmq.client.*; import java.io.IOException; public class ReceiveLogsDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1){ System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]"); System.exit(1); } for(String severity : argv){ channel.queueBind(queueName, EXCHANGE_NAME, severity); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } }
これまで通りコンパイルを行います(コンパイルとクラスパスについてはtutorial one を参照)。説明簡略化のため、サンプル実行時にはクラスパスに環境変数$CP(windowsでは%CP%)を使います。
ファイルへのログメッセージは'warning'と'error'('info'は除く)のみにしたい場合、コンソールで以下のように実行します。
$ java -cp $CP ReceiveLogsDirect warning error > logs_from_rabbit.log
画面上ですべてのログメッセージを参照したい場合、以下のように実行します。
$ java -cp $CP ReceiveLogsDirect info warning error [*] Waiting for logs. To exit press CTRL+C
次に、例えば、error
ログメッセージの送信は以下のように行います。
$ java -cp $CP EmitLogDirect error "Run. Run. Or it will explode." [x] Sent 'error':'Run. Run. Or it will explode.'
(すべてのソースコードは(EmitLogDirect.java source)と(ReceiveLogsDirect.java source)です)
パターンに応じてメッセージをリッスンする方法についてはtutorial 5に進んで下さい。