kagamihogeの日記

kagamihogeの日記です。

RabbitMQチュートリアル6 RPCをテキトーに訳した

RabbitMQチュートリアル(https://www.rabbitmq.com/getstarted.html)の6 RPC(https://www.rabbitmq.com/tutorials/tutorial-six-java.html)をテキトーに訳した。

Remote procedure call (RPC)

(using the Java client)

二つ目のチュートリアルでは、複数のワーカーに長時間タスクを配分するワークキューの使い方について解説しました。

いま、リモートコンピュータ上の関数を実行して結果を待つ必要がある、としたらどうでしょうか。ワークキューとは少々事態が異なります。このパターンは一般的にはリモートプロシージャコール(Remote Procedure Call)ないしRPCと呼ばれています。

このチュートリアルでは、クライアントとスケーラブルなRPCサーバーから構成される、RPCシステムの構築にRabbitMQを使用します。分散実行するほどの長時間タスクの準備は面倒なので、フィボナッチ数を返すRPCサービスで代用します。

Client interface

RPCサービスの使用方法の説明に、シンプルなクライアントクラスを用意します。callという公開メソッドを持ち、RPCリクエストを送信して答えが返るまでブロックします。

FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();   
String result = fibonacciRpc.call("4");
System.out.println( "fib(4) is " + result);

A note on RPC

RPCは一般的な計算パターンですが、しばしば批判の的になります。プログラマが関数呼び出しがローカルか低速なRPCなのかに無頓着の場合に問題が発生します。こうした混乱は予測不可能なシステムに陥り、デバッグするのに余計な複雑さが増します。ソフトウェアをシンプルにせずに、RPCを誤用することはメンテナンスしづらいスパゲッティコードを生み出します。

これらを念頭に置き、以下のアドバイスを考慮に入れてください。

  • 関数呼び出しがローカルなのかリモートなのかをハッキリさせる。
  • システムのドキュメントを作る。コンポーネント間の依存関係をハッキリさせる。
  • エラーを処理する。RPCサーバが長時間ダウンする場合クライアントはどのような対処をすべきか?

疑問に思う場合はRPCを避けます。もし可能であれば、RPCライクのブロッキングよりも、非同期パイプラインを使いましょう。これにより、結果は非同期的にプッシュされます。

Callback queue

基本的にはRabbitMQ経由のRPCの実行は簡単です。クライアントはリクエストメッセージを送信し、サーバーはレスポンスメッセージで応答します。レスポンスを受信するには、リクエストを処理する'callback'キューに送信します。このサンプルでは(Javaクライアント内で排他的な)デフォルトキューを使います。

callbackQueueName = channel.queueDeclare().getQueue();

BasicProperties props = new BasicProperties
                            .Builder()
                            .replyTo(callbackQueueName)
                            .build();

channel.basicPublish("", "rpc_queue", props, message.getBytes());

// ... callback_queueからのレスポンスメッセージを読み込むコードが続く

Message properties

AMQPプロトコルにはメッセージ送信に関する定義済みの14個のプロパティがあります。これらのプロパティは、以下の例外を除き、滅多に使われません。

  • deliveryMode: メッセージをpersistentにする(値は2)か、transient(それ以外の値)にする。the second tutorialでこのプロパティを扱いました。
  • contentType: エンコーディングmime-typeを記載するのに使う。たとえば、よく使われるJSONエンコーディングの場合、このプロパティにapplication/jsonを設定するのは良いプラクティスです。
  • replyTo: 基本的にはコールバックキューの名前に使われる。
  • correlationId: リクエストとRPCレスポンスとの関連付けに使用する。

これまでのチュートリアルとの違いとしては以下のインポートが必要です。

import com.rabbitmq.client.AMQP.BasicProperties;

Correlation Id

上記のコードを見るとRPCリクエストのたびにコールバックキューを生成しているように見えます。これは極めて非効率で、より良い方法があります。クライアントごとに一つのコールバックキューを作るようにします。

その場合に新たな課題が発生します。そのキューでレスポンスを受信すると、そのレスポンスが属するリクエストが明確ではありません。そこでcorrelationIdを使います。すべてのリクエストでユニークとなる値をこれに設定します。そして、コールバックキューでのメッセージ受信時にこのプロパティを参照し、この値に基づいてリクエストとレスポンスを対応させます。不明なcorrelationIdの場合、属するリクエストが無いということなので、安全にそのメッセージを破棄出来ます。

エラーで落とすよりも、コールバックキューで不明なメッセージを無視すべきでは? と疑問に思うかもしれませんが、これはサーバーサイドでレースコンディションの可能性があります。滅多に起こりませんが、RPCサーバーが、リクエストの確認メッセージを送信する前だが回答を送信直後で、死ぬ場合に発生する可能性があります。仮にこれが発生したとすると、再開したRPCサーバはリクエストを再度処理します。クライアント側では重複したレスポンスを円満に(gracefully)処理する必要があり、RPCは理想的にはべき等であるべきです。

Summary

python-six.png

このサンプルのRPCは以下のような動作をします。

  • クライアントが起動すると、無名で排他的なコールバックキューを生成します。
  • RPCリクエストでは、クライアントは二つのプロパティと共にメッセージを送信します。replyToにはコールバックキューを設定し、correlationIdにはリクエストごとにユニークな値を設定します。
  • リクエストはrpc_queueキューに送信します。
  • RPCワーカー(サーバの別名)はrpc_queueでリクエストを待機します。リクエストが到着すると、ジョブを実行して、replyToフィールドのキューを使用してクライアントに結果をメッセージで送り返します。
  • クライアントはコールバックキューでデータを待機します。メッセージが到着すると、correlationIdプロパティをチェックします。リクエストの値とマッチする場合、アプリケーションにレスポンスを戻します。

Putting it all together

タスクとしてのフィボナッチは以下のようになります。

private static int fib(int n) throws Exception {
    if (n == 0) return 0;
    if (n == 1) return 1;
    return fib(n-1) + fib(n-2);
}

関数でフィボナッチを宣言します。妥当な正の整数の入力のみ来ると仮定します。(デカイ数は動作しないと思われます。おそらく低速な再帰的実行になります)

RPCサーバのコード RPCServer.java は以下のようになります。

private static final String RPC_QUEUE_NAME = "rpc_queue";

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);

channel.basicQos(1);

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);

System.out.println(" [x] Awaiting RPC requests");

while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();

    BasicProperties props = delivery.getProperties();
    BasicProperties replyProps = new BasicProperties
                                     .Builder()
                                     .correlationId(props.getCorrelationId())
                                     .build();

    String message = new String(delivery.getBody());
    int n = Integer.parseInt(message);

    System.out.println(" [.] fib(" + message + ")");
    String response = "" + fib(n);

    channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes());

    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}

サーバのコードは単純です。

  • これまで通り、コネクションとチャネルを確立し、キューを宣言します。
  • 一つ以上のサーバープロセスを実行したい場合、複数サーバーに負荷を均等に分散させるため、channel.basicQosにprefetchCountを設定します。
  • キューへのアクセスにはbasicConsumeを使います。その後、リクエストメッセージを待機するwhileループに入り、タスク実行後にレスポンスを送り返します。

RPCクライアントとなる RPCClient.java は以下のようになります。

private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;
private QueueingConsumer consumer;

public RPCClient() throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    connection = factory.newConnection();
    channel = connection.createChannel();

    replyQueueName = channel.queueDeclare().getQueue(); 
    consumer = new QueueingConsumer(channel);
    channel.basicConsume(replyQueueName, true, consumer);
}

public String call(String message) throws Exception {     
    String response = null;
    String corrId = java.util.UUID.randomUUID().toString();

    BasicProperties props = new BasicProperties
                                .Builder()
                                .correlationId(corrId)
                                .replyTo(replyQueueName)
                                .build();

    channel.basicPublish("", requestQueueName, props, message.getBytes());

    while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        if (delivery.getProperties().getCorrelationId().equals(corrId)) {
            response = new String(delivery.getBody());
            break;
        }
    }

    return response; 
}

public void close() throws Exception {
    connection.close();
}

クライアントはこれまでと比べ若干変更しています。

  • コネクションとチャネルを確立、返信用の排他的な'callback'キューを宣言します。
  • RPCレスポンスを受け取るため、'callback'キューをサブスクライブします。
  • callメソッドが実際のRPCリクエストになります。
  • ここで、ユニークなcorrelationIdを生成して保存します。whileループは適切なレスポンスをキャッチするのにこの値を使います。
  • 次に、リクエストメッセージを二つのパラメータreplyTocorrelationIdを設定してパブリッシュします。
  • 適切なレスポンスが到着するまで待機します。
  • whileループは単純に、レスポンスメッセージに対してcorrelationIdが一致するかチェックします。マッチすればレスポンスを保存します。
  • 最後にユーザに対してレスポンスの結果を戻します。

クライアントリクエストは以下のように構築します。

RPCClient fibonacciRpc = new RPCClient();

System.out.println(" [x] Requesting fib(30)");   
String response = fibonacciRpc.call("30");
System.out.println(" [.] Got '" + response + "'");

fibonacciRpc.close();

のこりはすべてのサンプルソースコードから参照してください。RPCClient.javaRPCServer.javaになります。

コンパイルとセットアップにはこれまで通りのクラスパスを使います。(tutorial oneを参照)

$ javac -cp rabbitmq-client.jar RPCClient.java RPCServer.java

サンプルのRPCサービスは以下のように開始します。

$ java -cp $CP RPCServer
 [x] Awaiting RPC requests

クライアントを実行してフィボナッチ数をリクエストするには以下のようにします。

$ java -cp $CP RPCClient
 [x] Requesting fib(30)

このサンプルで示した設計はRPCサービスの唯一の実装方法ではありませんが、以下の利点があります。

  • RPCサーバがスローダウンしても、別のサーバを実行することでスケールアップが可能。別のコンソールでRPCServerを実行してみてください。
  • クライアント側では、RPCは送信要求と単一メッセージのみの受信を行います。queueDeclareなどの同期的呼び出しは必要ありません。結果としてこのRPCクライアントは、一つのRPCリクエストに一回のネットワークラウンドトリップのみ必要とします。

これらのコードは極めてシンプルですが、より複雑な問題の解決には適しません。

  • 実行中のサーバが無い場合クライアントはどのような振る舞いをすべきか?
  • クライアントはRPCのタイムアウトを持つべきか?
  • サーバが異常動作をして例外を発生する場合、クライアントにどう伝えるべきか?
  • 処理前に不正なメッセージが到着することを防ぐ必要がある(境界や型チェックなど)。

キューの参照にはrabbitmq-management pluginが有用です。