kagamihogeの日記

kagamihogeの日記です。

spring-batchでCSV出力

最近になって知ったのだけど、spring-batchには原始的ではあるものの良くある処理についてはクラスが用意されている。例えばSQL食わせるとreaderとして実行してくれるヤツとか。既存部品だけで素朴なものCSV出力くらいなら作れるっぽかった。なので、xmlのbean定義だけで出来る範囲でCSV出力を作る練習をした。解説とかは各クラスでぐぐれば出てくるんで特にかかないけど、ぶっちゃけXML定義の嵐なのでなんも前提知識無しに読むのはきつい思われる。

ソースコードは以下のリポジトリ

https://github.com/kagamihoge/springbatchcsvwrite

RabbitMQチュートリアル6 RPCをテキトーに訳した

RabbitMQチュートリアル(https://www.rabbitmq.com/getstarted.html)の6 RPC(https://www.rabbitmq.com/tutorials/tutorial-six-java.html)をテキトーに訳した。

Remote procedure call (RPC)

(using the Java client)

二つ目のチュートリアルでは、複数のワーカーに長時間タスクを配分するワークキューの使い方について解説しました。

いま、リモートコンピュータ上の関数を実行して結果を待つ必要がある、としたらどうでしょうか。ワークキューとは少々事態が異なります。このパターンは一般的にはリモートプロシージャコール(Remote Procedure Call)ないしRPCと呼ばれています。

このチュートリアルでは、クライアントとスケーラブルなRPCサーバーから構成される、RPCシステムの構築にRabbitMQを使用します。分散実行するほどの長時間タスクの準備は面倒なので、フィボナッチ数を返すRPCサービスで代用します。

Client interface

RPCサービスの使用方法の説明に、シンプルなクライアントクラスを用意します。callという公開メソッドを持ち、RPCリクエストを送信して答えが返るまでブロックします。

FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();   
String result = fibonacciRpc.call("4");
System.out.println( "fib(4) is " + result);

A note on RPC

RPCは一般的な計算パターンですが、しばしば批判の的になります。プログラマが関数呼び出しがローカルか低速なRPCなのかに無頓着の場合に問題が発生します。こうした混乱は予測不可能なシステムに陥り、デバッグするのに余計な複雑さが増します。ソフトウェアをシンプルにせずに、RPCを誤用することはメンテナンスしづらいスパゲッティコードを生み出します。

これらを念頭に置き、以下のアドバイスを考慮に入れてください。

  • 関数呼び出しがローカルなのかリモートなのかをハッキリさせる。
  • システムのドキュメントを作る。コンポーネント間の依存関係をハッキリさせる。
  • エラーを処理する。RPCサーバが長時間ダウンする場合クライアントはどのような対処をすべきか?

疑問に思う場合はRPCを避けます。もし可能であれば、RPCライクのブロッキングよりも、非同期パイプラインを使いましょう。これにより、結果は非同期的にプッシュされます。

Callback queue

基本的にはRabbitMQ経由のRPCの実行は簡単です。クライアントはリクエストメッセージを送信し、サーバーはレスポンスメッセージで応答します。レスポンスを受信するには、リクエストを処理する'callback'キューに送信します。このサンプルでは(Javaクライアント内で排他的な)デフォルトキューを使います。

callbackQueueName = channel.queueDeclare().getQueue();

BasicProperties props = new BasicProperties
                            .Builder()
                            .replyTo(callbackQueueName)
                            .build();

channel.basicPublish("", "rpc_queue", props, message.getBytes());

// ... callback_queueからのレスポンスメッセージを読み込むコードが続く

Message properties

AMQPプロトコルにはメッセージ送信に関する定義済みの14個のプロパティがあります。これらのプロパティは、以下の例外を除き、滅多に使われません。

  • deliveryMode: メッセージをpersistentにする(値は2)か、transient(それ以外の値)にする。the second tutorialでこのプロパティを扱いました。
  • contentType: エンコーディングmime-typeを記載するのに使う。たとえば、よく使われるJSONエンコーディングの場合、このプロパティにapplication/jsonを設定するのは良いプラクティスです。
  • replyTo: 基本的にはコールバックキューの名前に使われる。
  • correlationId: リクエストとRPCレスポンスとの関連付けに使用する。

これまでのチュートリアルとの違いとしては以下のインポートが必要です。

import com.rabbitmq.client.AMQP.BasicProperties;

Correlation Id

上記のコードを見るとRPCリクエストのたびにコールバックキューを生成しているように見えます。これは極めて非効率で、より良い方法があります。クライアントごとに一つのコールバックキューを作るようにします。

その場合に新たな課題が発生します。そのキューでレスポンスを受信すると、そのレスポンスが属するリクエストが明確ではありません。そこでcorrelationIdを使います。すべてのリクエストでユニークとなる値をこれに設定します。そして、コールバックキューでのメッセージ受信時にこのプロパティを参照し、この値に基づいてリクエストとレスポンスを対応させます。不明なcorrelationIdの場合、属するリクエストが無いということなので、安全にそのメッセージを破棄出来ます。

エラーで落とすよりも、コールバックキューで不明なメッセージを無視すべきでは? と疑問に思うかもしれませんが、これはサーバーサイドでレースコンディションの可能性があります。滅多に起こりませんが、RPCサーバーが、リクエストの確認メッセージを送信する前だが回答を送信直後で、死ぬ場合に発生する可能性があります。仮にこれが発生したとすると、再開したRPCサーバはリクエストを再度処理します。クライアント側では重複したレスポンスを円満に(gracefully)処理する必要があり、RPCは理想的にはべき等であるべきです。

Summary

python-six.png

このサンプルのRPCは以下のような動作をします。

  • クライアントが起動すると、無名で排他的なコールバックキューを生成します。
  • RPCリクエストでは、クライアントは二つのプロパティと共にメッセージを送信します。replyToにはコールバックキューを設定し、correlationIdにはリクエストごとにユニークな値を設定します。
  • リクエストはrpc_queueキューに送信します。
  • RPCワーカー(サーバの別名)はrpc_queueでリクエストを待機します。リクエストが到着すると、ジョブを実行して、replyToフィールドのキューを使用してクライアントに結果をメッセージで送り返します。
  • クライアントはコールバックキューでデータを待機します。メッセージが到着すると、correlationIdプロパティをチェックします。リクエストの値とマッチする場合、アプリケーションにレスポンスを戻します。

Putting it all together

タスクとしてのフィボナッチは以下のようになります。

private static int fib(int n) throws Exception {
    if (n == 0) return 0;
    if (n == 1) return 1;
    return fib(n-1) + fib(n-2);
}

関数でフィボナッチを宣言します。妥当な正の整数の入力のみ来ると仮定します。(デカイ数は動作しないと思われます。おそらく低速な再帰的実行になります)

RPCサーバのコード RPCServer.java は以下のようになります。

private static final String RPC_QUEUE_NAME = "rpc_queue";

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);

channel.basicQos(1);

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);

System.out.println(" [x] Awaiting RPC requests");

while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();

    BasicProperties props = delivery.getProperties();
    BasicProperties replyProps = new BasicProperties
                                     .Builder()
                                     .correlationId(props.getCorrelationId())
                                     .build();

    String message = new String(delivery.getBody());
    int n = Integer.parseInt(message);

    System.out.println(" [.] fib(" + message + ")");
    String response = "" + fib(n);

    channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes());

    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}

サーバのコードは単純です。

  • これまで通り、コネクションとチャネルを確立し、キューを宣言します。
  • 一つ以上のサーバープロセスを実行したい場合、複数サーバーに負荷を均等に分散させるため、channel.basicQosにprefetchCountを設定します。
  • キューへのアクセスにはbasicConsumeを使います。その後、リクエストメッセージを待機するwhileループに入り、タスク実行後にレスポンスを送り返します。

RPCクライアントとなる RPCClient.java は以下のようになります。

private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;
private QueueingConsumer consumer;

public RPCClient() throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    connection = factory.newConnection();
    channel = connection.createChannel();

    replyQueueName = channel.queueDeclare().getQueue(); 
    consumer = new QueueingConsumer(channel);
    channel.basicConsume(replyQueueName, true, consumer);
}

public String call(String message) throws Exception {     
    String response = null;
    String corrId = java.util.UUID.randomUUID().toString();

    BasicProperties props = new BasicProperties
                                .Builder()
                                .correlationId(corrId)
                                .replyTo(replyQueueName)
                                .build();

    channel.basicPublish("", requestQueueName, props, message.getBytes());

    while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        if (delivery.getProperties().getCorrelationId().equals(corrId)) {
            response = new String(delivery.getBody());
            break;
        }
    }

    return response; 
}

public void close() throws Exception {
    connection.close();
}

クライアントはこれまでと比べ若干変更しています。

  • コネクションとチャネルを確立、返信用の排他的な'callback'キューを宣言します。
  • RPCレスポンスを受け取るため、'callback'キューをサブスクライブします。
  • callメソッドが実際のRPCリクエストになります。
  • ここで、ユニークなcorrelationIdを生成して保存します。whileループは適切なレスポンスをキャッチするのにこの値を使います。
  • 次に、リクエストメッセージを二つのパラメータreplyTocorrelationIdを設定してパブリッシュします。
  • 適切なレスポンスが到着するまで待機します。
  • whileループは単純に、レスポンスメッセージに対してcorrelationIdが一致するかチェックします。マッチすればレスポンスを保存します。
  • 最後にユーザに対してレスポンスの結果を戻します。

クライアントリクエストは以下のように構築します。

RPCClient fibonacciRpc = new RPCClient();

System.out.println(" [x] Requesting fib(30)");   
String response = fibonacciRpc.call("30");
System.out.println(" [.] Got '" + response + "'");

fibonacciRpc.close();

のこりはすべてのサンプルソースコードから参照してください。RPCClient.javaRPCServer.javaになります。

コンパイルとセットアップにはこれまで通りのクラスパスを使います。(tutorial oneを参照)

$ javac -cp rabbitmq-client.jar RPCClient.java RPCServer.java

サンプルのRPCサービスは以下のように開始します。

$ java -cp $CP RPCServer
 [x] Awaiting RPC requests

クライアントを実行してフィボナッチ数をリクエストするには以下のようにします。

$ java -cp $CP RPCClient
 [x] Requesting fib(30)

このサンプルで示した設計はRPCサービスの唯一の実装方法ではありませんが、以下の利点があります。

  • RPCサーバがスローダウンしても、別のサーバを実行することでスケールアップが可能。別のコンソールでRPCServerを実行してみてください。
  • クライアント側では、RPCは送信要求と単一メッセージのみの受信を行います。queueDeclareなどの同期的呼び出しは必要ありません。結果としてこのRPCクライアントは、一つのRPCリクエストに一回のネットワークラウンドトリップのみ必要とします。

これらのコードは極めてシンプルですが、より複雑な問題の解決には適しません。

  • 実行中のサーバが無い場合クライアントはどのような振る舞いをすべきか?
  • クライアントはRPCのタイムアウトを持つべきか?
  • サーバが異常動作をして例外を発生する場合、クライアントにどう伝えるべきか?
  • 処理前に不正なメッセージが到着することを防ぐ必要がある(境界や型チェックなど)。

キューの参照にはrabbitmq-management pluginが有用です。

RabbitMQチュートリアル5 Topicsをテキトーに訳した

RabbitMQはじめて触ったけどよく分からん、ということでチュートリアルを読むついでにテキトーに訳すことにした。

RabbitMQチュートリアル(https://www.rabbitmq.com/getstarted.html)の5 Topics Java(https://www.rabbitmq.com/tutorials/tutorial-five-java.html)をテキトーに訳した。

Topics

(using the Java client)

一つ前のチュートリアルではサンプルのロギングシステムを改良しました。ブロードキャストのみ行うfanoutの代わりに、directエクスチェンジを使用し、選択的なログ受信機能を改良しました。

directエクスチェンジによってロギングシステムは改良しましたが、まだ制限があり、複数の条件に基づくルーティングは出来ません。

サンプルのロギングシステムにおいて、重要度だけでなく送信されるログの中身に基づいてサブスクライブを行いたい、とします。重要度(info/warn/crit...)と機能(auth/cron/kern...)に基づいてログをルーティングするsyslogのような考え方です。

これにより柔軟性が増します。cronのクリティカルなエラーとkernのすべてのログをリッスンする、のような柔軟性です。

これをロギングシステムに実装するには、より複雑なtopicエクスチェンジを使う必要があります。

Topic exchange

topicエクスチェンジに送信されるメッセージは任意のrouting_keyではなく、ドット区切りの単語リストが必須です。単語は何でも良いですが、通常は、メッセージに関連する機能を指定します。妥当なルーティングキーの例としては、"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"などです。ルーティングキーには複数の単語を255バイトまで指定可能です。

バインディングキーにも同様の形式が必須となります。topicエクスチェンジのロジックはdirectと似ており、あるルーティングキーで送信したメッセージは、マッチするバインディングキーでバインドしたすべてのキューに配信されます。ただし、バインディングキーに二つの特殊指定が出来ます。

  • * (star) ちょうど一単語に合致するもの。
  • # (hash) ゼロもしくは複数単語に合致するもの。

これを以下の図で簡単に説明しています。

python-five.png

上記の例では、動物を表すメッセージを送信しています。メッセージは三単語(ドットは2つ)から構成されるルーティングキーで送信します。ルーティングキーの最初の単語はスピードで、二番目はカラー、三番目は種です。"<speed>.<colour>.<species>"

上記の例では三つのバインディングを作成しています。Q1はバインディングキー"*.orange.*"で、Q2は"*.*.rabbit""lazy.#"です。

これらのバインディングの要約は以下のようになります。

  • Q1はすべてのオレンジ色の動物。
  • Q2はラビット種のすべての動物と、lazyなすべての動物。

ルーティングキーに"quick.orange.rabbit"をセットしたメッセージは両方のキューに配信されます。"lazy.orange.elephant"のメッセージも同様に両方のキューに配信されます。一方、"quick.orange.fox"は前者のキューにのみ配信され、"lazy.brown.fox"は後者のキューにのみ配信されます。"lazy.pink.rabbit"は、二つのバインディングにマッチしますが、一度だけ後者のキューに配信されます。"quick.brown.fox"はマッチしないので破棄されます。

これらの規則を破り1単語あるいは4単語の"orange""quick.orange.male.rabbit"などのメッセージを送信するとどうなるのでしょうか? これらのメッセージはいずれのバインディングともマッチしないのでロストします。

一方"lazy.orange.male.rabbit"は4単語ですが最後のバインディングにマッチするので後者のキューに配信されます。

Topic exchange

トピックエクスチェンジは強力なので他のエクスチェンジのようにも振舞えます。

キューと"#" (hash)バインディングキーでバインドする場合、すべてのメッセージを受信します。ルーティングキーに依らないのでfanoutエクスチェンジのように振舞います。

特殊文字"*" (star)と"#" (hash)をバインディングに使っていない場合、トピックエクスチェンジはdirectのように振る舞います。

Putting it all together

ロギングシステムでtopicエクスチェンジを使うように変更していきます。ログのルーティングキーが二つの単語"<facility>.<severity>"を持つという想定で始めます。

コードそのものは先のチュートリアルとほとんど同じです。

EmitLogTopic.javaのコードです。

public class EmitLogTopic {

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv)
                  throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        String routingKey = getRouting(argv);
        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
        System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");

        connection.close();
    }
    //...
}

ReceiveLogsTopic.javaのコードです。

import com.rabbitmq.client.*;

import java.io.IOException;

public class ReceiveLogsTopic {
  private static final String EXCHANGE_NAME = "topic_logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    String queueName = channel.queueDeclare().getQueue();

    if (argv.length < 1) {
      System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
      System.exit(1);
    }

    for (String bindingKey : argv) {
      channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
    }

    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope,
                                 AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
      }
    };
    channel.basicConsume(queueName, true, consumer);
  }
}

Tutorial 1で示したクラスパスを、Windowsでは%CP%で設定し、以下のようなサンプルを実行します。

すべてのログを取得するには、

$ java -cp $CP ReceiveLogsTopic "#"

"kern"のすべてのログを取得するには、

$ java -cp $CP ReceiveLogsTopic "kern.*"

"critical"に関するログを取得するには、

$ java -cp $CP ReceiveLogsTopic "*.critical"

複数のバインディングを作成するには、

$ java -cp $CP ReceiveLogsTopic "kern.*" "*.critical"

ルーティングキー"kern.critical"でログを送信するには以下のようにします。

$ java -cp $CP EmitLogTopic "kern.critical" "A critical kernel error"

色々なパターンを試してみて下さい。Note that the code doesn't make any assumption about the routing or binding keys, you may want to play with more than two routing key parameters.*1

以下はちょっとしたクイズです。

"*"のバインディングは空のルーティングキーで送信されたメッセージをキャッチするか?

No.

       ./ReceiveLogsTopic "*"
       ./EmitLogTopic ""

キーに".."を用いるメッセージは"#.*"でキャッチできるか? また、1単語のメッセージはキャッチできるか?

No. (but I don't know why!)*2

       ./ReceiveLogsTopic "#.*"
       ./EmitLogTopic ".."

Yes

       ./ReceiveLogsTopic "#.*"
       ./EmitLogTopic "a"

"a.*.#"と"a.#"の違いは?

'a.*.#'は最初の単語が'a'かつ2単語以上の場合にマッチする。'a.#'は最初の単語が'a'かつ1単語以上の場合にマッチする。

       ./ReceiveLogsTopic "a.*.#"
       ./EmitLogTopic "a.b"
       ./ReceiveLogsTopic "a.#"
       ./EmitLogTopic "a.b"

(すべてのソースコードEmitLogTopic.javaReceiveLogsTopic.java) 次に、RPCとしてラウンドトリップメッセージを行うやり方をtutorial 6で見て行きます。

*1:よくわからん

*2:えっ何それは