kagamihogeの日記

kagamihogeの日記です。

RabbitMQチュートリアル4 Routingをテキトーに訳した

RabbitMQはじめて触ったけどよく分からん、ということでチュートリアルを読むついでにテキトーに訳すことにした。

RabbitMQチュートリアル(https://www.rabbitmq.com/getstarted.html)の4 Routing Java(https://www.rabbitmq.com/tutorials/tutorial-four-java.html)をテキトーに訳した。

Routing

(using the Java client)

先のチュートリアルではシンプルなロギングシステムを作りました。これで複数のレシーバにログメッセージをブロードキャスト可能になりました。

このチュートリアルでは更に機能追加を行い、メッセージのサブセットのみサブスクライブ可能にします。たとえば、クリティカルなエラーメッセージのみログファイルに向ける一方で、コンソールにはログメッセージをすべて出力できるようにします。

Bindings

先の例でバインディングの作成方法については解説しました。バインディングのコードは以下のようになります。

channel.queueBind(queueName, EXCHANGE_NAME, "");

バインディングはエクスチェンジとキューとの関連付けです。つまり、キューはエクスチェンジからのメッセージのみ受け付けます。

バインディングはルーティングキー(routingKey)パラメータも取ることが出来ます。basic_publishのパラメータとの混乱を避けるため、バインディングのパラメータはバインディングキー(binding key)と呼ぶことにします*1。キーと一緒にバインディングを作るには以下のようにします。

channel.queueBind(queueName, EXCHANGE_NAME, "black");

バインディングキーの意味付けはエクスチェンジタイプに依存します。fanoutエクスチェンジでは前に使用したとおり無視されます。

Direct exchange

先のチュートリアルのロギングシステムではすべてのコンシューマーにすべてのメッセージをブロードキャストしていました。今回は重要度(severity)に応じてメッセージのフィルタリングが可能なように拡張します。たとえば、警告や情報のログメッセージでディスク容量を浪費しないよう、クリティカルなエラーを受信するときにだけディスクにログメッセージを書き出すようにします。

fanoutエクスチェンジにはそのような柔軟性は無く、単なるブロードキャストの機能しかありません。

その代わりにdirectエクスチェンジを使います。directエクスチェンジのルーティングアルゴリズムはシンプルで、メッセージのルーティングキーにマッチするバインディングキーを持つキューにメッセージを送信します。

これの解説のために以下のような設定を行います。

direct-exchange.png

この設定では、directエクスチェンジXに二つのキューをバインドします。前者のキューはバインディングキーorangeバインドし、後者は二つのバインディングを持ち、一つはバインディングキーblackでもう片方はgreenです。

この設定では、エクスチェンジにパブリッシュされるメッセージのうち、ルーティングキーorangeを持つメッセージはキューQ1にルーティングされます。ルーティングキーがblackないしgreenのメッセージはQ2に行きます。それ以外のすべてのメッセージは破棄されます。

Multiple bindings

direct-exchange-multiple.png

同一のバインディングキーで複数のキューをバインドするのは全く問題ありません。上記のようにXQ1バインディングキーblackバインディングを作成できます。この場合、directエクスチェンジはfanoutのように振る舞い、キーにマッチするすべてのキューにメッセージをブロードキャストします。ルーティングキーblackを持つメッセージはQ1Q2に配信されます。

Emitting logs

チュートリアルのロギングシステムでは上記のモデルを使います。fanoutではなくdirectエクスチェンジにメッセージを送信します。ログの重要度はルーティングキーで指定します。これにより、受信側のプログラムは受信したい重要度のログを選択可能になります。まずログ送信側を見て行きます。

これまで通り、まずエクスチェンジを作成します。

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

次に、メッセージ送信の開始は以下のようにします。

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

説明の簡略化のため、'severity'には'info', 'warning', 'error'のいずれか一つを指定するものとします。

Subscribing

メッセージ受信は以前のチュートリアルとほぼ同じですが、一つ違いがあり、受信したい重要度ごとにバインディングを作成します。

String queueName = channel.queueDeclare().getQueue();

for(String severity : argv){    
  channel.queueBind(queueName, EXCHANGE_NAME, severity);
}

Putting it all together

python-four.png

EmitLogDirect.javaクラスのコードです。

public class EmitLogDirect {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv)
                  throws java.io.IOException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        String severity = getSeverity(argv);
        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
        System.out.println(" [x] Sent '" + severity + "':'" + message + "'");

        channel.close();
        connection.close();
    }
    //..
}

ReceiveLogsDirect.javaのコードです。

import com.rabbitmq.client.*;

import java.io.IOException;

public class ReceiveLogsDirect {

  private static final String EXCHANGE_NAME = "direct_logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    String queueName = channel.queueDeclare().getQueue();

    if (argv.length < 1){
      System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
      System.exit(1);
    }

    for(String severity : argv){
      channel.queueBind(queueName, EXCHANGE_NAME, severity);
    }
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope,
                                 AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
      }
    };
    channel.basicConsume(queueName, true, consumer);
  }
}

これまで通りコンパイルを行います(コンパイルとクラスパスについてはtutorial one を参照)。説明簡略化のため、サンプル実行時にはクラスパスに環境変数$CP(windowsでは%CP%)を使います。

ファイルへのログメッセージは'warning'と'error'('info'は除く)のみにしたい場合、コンソールで以下のように実行します。

$ java -cp $CP ReceiveLogsDirect warning error > logs_from_rabbit.log

画面上ですべてのログメッセージを参照したい場合、以下のように実行します。

$ java -cp $CP ReceiveLogsDirect info warning error
 [*] Waiting for logs. To exit press CTRL+C

次に、例えば、errorログメッセージの送信は以下のように行います。

$ java -cp $CP EmitLogDirect error "Run. Run. Or it will explode."
 [x] Sent 'error':'Run. Run. Or it will explode.'

(すべてのソースコード(EmitLogDirect.java source)(ReceiveLogsDirect.java source)です)

パターンに応じてメッセージをリッスンする方法についてはtutorial 5に進んで下さい。

*1:Channel#queueBindの引数はroutingKeyで、Channel#basicPublishの引数もroutingKey。両者は同じものを指定するのだけど、説明の際にはメッセージ送信時のキーかバインディング作成時のキーのことなのかは区別する意図があるみたい。なので、このチュートリアルでは前者はルーティングキー(routingKey)、後者はバインディングキー(binding key)という呼称にした、と思われる。