kagamihogeの日記

kagamihogeの日記です。

RabbitMQチュートリアル5 Topicsをテキトーに訳した

RabbitMQはじめて触ったけどよく分からん、ということでチュートリアルを読むついでにテキトーに訳すことにした。

RabbitMQチュートリアル(https://www.rabbitmq.com/getstarted.html)の5 Topics Java(https://www.rabbitmq.com/tutorials/tutorial-five-java.html)をテキトーに訳した。

Topics

(using the Java client)

一つ前のチュートリアルではサンプルのロギングシステムを改良しました。ブロードキャストのみ行うfanoutの代わりに、directエクスチェンジを使用し、選択的なログ受信機能を改良しました。

directエクスチェンジによってロギングシステムは改良しましたが、まだ制限があり、複数の条件に基づくルーティングは出来ません。

サンプルのロギングシステムにおいて、重要度だけでなく送信されるログの中身に基づいてサブスクライブを行いたい、とします。重要度(info/warn/crit...)と機能(auth/cron/kern...)に基づいてログをルーティングするsyslogのような考え方です。

これにより柔軟性が増します。cronのクリティカルなエラーとkernのすべてのログをリッスンする、のような柔軟性です。

これをロギングシステムに実装するには、より複雑なtopicエクスチェンジを使う必要があります。

Topic exchange

topicエクスチェンジに送信されるメッセージは任意のrouting_keyではなく、ドット区切りの単語リストが必須です。単語は何でも良いですが、通常は、メッセージに関連する機能を指定します。妥当なルーティングキーの例としては、"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"などです。ルーティングキーには複数の単語を255バイトまで指定可能です。

バインディングキーにも同様の形式が必須となります。topicエクスチェンジのロジックはdirectと似ており、あるルーティングキーで送信したメッセージは、マッチするバインディングキーでバインドしたすべてのキューに配信されます。ただし、バインディングキーに二つの特殊指定が出来ます。

  • * (star) ちょうど一単語に合致するもの。
  • # (hash) ゼロもしくは複数単語に合致するもの。

これを以下の図で簡単に説明しています。

python-five.png

上記の例では、動物を表すメッセージを送信しています。メッセージは三単語(ドットは2つ)から構成されるルーティングキーで送信します。ルーティングキーの最初の単語はスピードで、二番目はカラー、三番目は種です。"<speed>.<colour>.<species>"

上記の例では三つのバインディングを作成しています。Q1はバインディングキー"*.orange.*"で、Q2は"*.*.rabbit""lazy.#"です。

これらのバインディングの要約は以下のようになります。

  • Q1はすべてのオレンジ色の動物。
  • Q2はラビット種のすべての動物と、lazyなすべての動物。

ルーティングキーに"quick.orange.rabbit"をセットしたメッセージは両方のキューに配信されます。"lazy.orange.elephant"のメッセージも同様に両方のキューに配信されます。一方、"quick.orange.fox"は前者のキューにのみ配信され、"lazy.brown.fox"は後者のキューにのみ配信されます。"lazy.pink.rabbit"は、二つのバインディングにマッチしますが、一度だけ後者のキューに配信されます。"quick.brown.fox"はマッチしないので破棄されます。

これらの規則を破り1単語あるいは4単語の"orange""quick.orange.male.rabbit"などのメッセージを送信するとどうなるのでしょうか? これらのメッセージはいずれのバインディングともマッチしないのでロストします。

一方"lazy.orange.male.rabbit"は4単語ですが最後のバインディングにマッチするので後者のキューに配信されます。

Topic exchange

トピックエクスチェンジは強力なので他のエクスチェンジのようにも振舞えます。

キューと"#" (hash)バインディングキーでバインドする場合、すべてのメッセージを受信します。ルーティングキーに依らないのでfanoutエクスチェンジのように振舞います。

特殊文字"*" (star)と"#" (hash)をバインディングに使っていない場合、トピックエクスチェンジはdirectのように振る舞います。

Putting it all together

ロギングシステムでtopicエクスチェンジを使うように変更していきます。ログのルーティングキーが二つの単語"<facility>.<severity>"を持つという想定で始めます。

コードそのものは先のチュートリアルとほとんど同じです。

EmitLogTopic.javaのコードです。

public class EmitLogTopic {

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv)
                  throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        String routingKey = getRouting(argv);
        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
        System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");

        connection.close();
    }
    //...
}

ReceiveLogsTopic.javaのコードです。

import com.rabbitmq.client.*;

import java.io.IOException;

public class ReceiveLogsTopic {
  private static final String EXCHANGE_NAME = "topic_logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    String queueName = channel.queueDeclare().getQueue();

    if (argv.length < 1) {
      System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
      System.exit(1);
    }

    for (String bindingKey : argv) {
      channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
    }

    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope,
                                 AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
      }
    };
    channel.basicConsume(queueName, true, consumer);
  }
}

Tutorial 1で示したクラスパスを、Windowsでは%CP%で設定し、以下のようなサンプルを実行します。

すべてのログを取得するには、

$ java -cp $CP ReceiveLogsTopic "#"

"kern"のすべてのログを取得するには、

$ java -cp $CP ReceiveLogsTopic "kern.*"

"critical"に関するログを取得するには、

$ java -cp $CP ReceiveLogsTopic "*.critical"

複数のバインディングを作成するには、

$ java -cp $CP ReceiveLogsTopic "kern.*" "*.critical"

ルーティングキー"kern.critical"でログを送信するには以下のようにします。

$ java -cp $CP EmitLogTopic "kern.critical" "A critical kernel error"

色々なパターンを試してみて下さい。Note that the code doesn't make any assumption about the routing or binding keys, you may want to play with more than two routing key parameters.*1

以下はちょっとしたクイズです。

"*"のバインディングは空のルーティングキーで送信されたメッセージをキャッチするか?

No.

       ./ReceiveLogsTopic "*"
       ./EmitLogTopic ""

キーに".."を用いるメッセージは"#.*"でキャッチできるか? また、1単語のメッセージはキャッチできるか?

No. (but I don't know why!)*2

       ./ReceiveLogsTopic "#.*"
       ./EmitLogTopic ".."

Yes

       ./ReceiveLogsTopic "#.*"
       ./EmitLogTopic "a"

"a.*.#"と"a.#"の違いは?

'a.*.#'は最初の単語が'a'かつ2単語以上の場合にマッチする。'a.#'は最初の単語が'a'かつ1単語以上の場合にマッチする。

       ./ReceiveLogsTopic "a.*.#"
       ./EmitLogTopic "a.b"
       ./ReceiveLogsTopic "a.#"
       ./EmitLogTopic "a.b"

(すべてのソースコードEmitLogTopic.javaReceiveLogsTopic.java) 次に、RPCとしてラウンドトリップメッセージを行うやり方をtutorial 6で見て行きます。

*1:よくわからん

*2:えっ何それは