kagamihogeの日記

kagamihogeの日記です。

OracleからLogstashでElasticsearchにデータおくる

OracleからLogstashを経由してElasticsearchにデータを追加する。Oracleのtimestampカラムを使用して、Logstashが定期的に前回以降のデータを取得して、Elasticsearchのインデックスに追加する。KIbanaはデータ確認用としてのみ使用する。

なお、環境構築はdockerで行うのでELK Stackとは直接的に関係無い設定がこのエントリには含まれる。また、ホスト側はWIndows 10な点に注意。

手順

dockerのネットワーク作成

それぞれ異なるコンテナで動かすため、通信設定が必要。方法は幾つかあるけど、ここではあらかじめネットワークを作っておくやり方にした。

docker network create --driver bridge common_link

参考: https://qiita.com/reneice/items/20e981062b093264cd0a

Oracle Database 18c XE

http://kagamihoge.hatenablog.com/entry/2018/12/28/204636 を参考にdockerでOracle Database 18c XEをうごかす。なお、上記リンク先と異なる点として、docker-compose.ymlにネットワーク設定を追加している。修正後のファイルは以下の通り。

version: '3'
services:
  database:
    image: oracle/database:18.4.0-xe
    volumes:
      - C:\mydata\oracle\oradata:/opt/oracle/oradata
    ports:
      - 11521:1521
      - 18080:8080
      - 15500:5500
    environment:
      - ORACLE_PWD=oracle
    networks:
      - common_link
networks:
    common_link:
        external: true

Logstashが参照するテーブル定義は以下の通り。

CREATE TABLE LOGSTASH_SAMPLE (
  "ID"                 NUMBER(10, 0) NOT NULL,
  "TIMESTAMPE_VALUE"   TIMESTAMP(6) default current_timestamp NOT NULL,
  constraint "LOGSTASH_SAMPLE_PK" PRIMARY KEY("ID")
);

ELK Stackのdocker-compose.xml

Elasticsearch, Logstash, Kibanaのdocker-compose.xmlを作成する。

version: '3'
services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.0.1
    ports:
      - 9200:9200
      - 9300:9300
    volumes:
      - ./elasticsearch/data:/usr/share/elasticsearch/data
    networks:
      - common_link
    environment:
      - discovery.type=single-node
  logstash:
    image: docker.elastic.co/logstash/logstash-oss:7.0.1
    container_name: logstash
    volumes:
      - ./logstash/pipeline:/usr/share/logstash/pipeline
      - ./logstash/log:/usr/share/logstash/logs
      - ./logstash/jdbc:/opt/jdbc
      - ./logstash/metadata/:/usr/share/logstash/metadata
    depends_on:
      - elasticsearch
    networks:
      - common_link
  kibana:
    image: docker.elastic.co/kibana/kibana-oss:7.0.1
    ports:
      - 5601:5601
    depends_on:
      - elasticsearch
    networks:
      - common_link
networks:
    common_link:
        external: true
  • networks ですべてのコンテナを同一ネットワークに入れておく。また、Logstashのところで後述するがこれによってホスト名をdatabaseとかelasticsearchとかで参照可能になる。

Logstash

Logstashの設定について。まず、上記docker-compose.ymlのlogstashのvolumesでマウントしてるディレクトリについて。

  • logstash
    • jdbc - oracleに接続するためのJDBCドライバを配置。
    • metadata - timestampカラムに対する最終取得時刻を保存するファイルをここに自動保存する
    • pipeline - パイプラインと呼ばれる処理設定ファイルを配置。

パイプライン設定ファイル

logstashでは、パイプラインと呼ばれる処理設定ファイルを記述する。このエントリではlogstash/pipeline/sample.confファイルを作成する。

input {
  jdbc {
    jdbc_driver_library => "/opt/jdbc/ojdbc7.jar"
    jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
    jdbc_connection_string => "jdbc:oracle:thin:@database:1521/XEPDB1"
    jdbc_user => "system"
    jdbc_password => "**********"
    schedule => "* * * * *"
    statement_filepath => "/usr/share/logstash/pipeline/sql/sample.sql"
    tracking_column => timestampe_value
    tracking_column_type  => "timestamp"
    use_column_value => true
    jdbc_default_timezone => "Japan"
    last_run_metadata_path=> "/usr/share/logstash/metadata/last_run_metadata.txt"
    clean_run => true
  }
}

output {
  elasticsearch {
    hosts => ["elasticsearch:9200"]
    index => "sample_data"
    document_id => "sample_%{id}"
  }
}
  • jdbc_driver_class - かなりハマったが、Java::oracle.jdbc.driver.OracleDriver"と、プレフィクスにJava::と書かなければならない。
  • jdbc_connection_string - 前述の通りdockerのネットワーク設定をしてあるので、ここではdatabaseというホスト名でoracleにアクセスできる。
  • statement_filepath - データ取得に使うSQLファイルを指定。ここではファイルに外出ししているが、直接各ことも可能。ドキュメント参照。
  • tracking_column - データ取得時に参照するカラムを指定する。timestampe以外も使えるのでその辺はドキュメントを参照。
  • last_run_metadata_path - ここでは最終取得時刻を保存するファイルを指定。
  • clean_run - 開発用に毎回クリーン実行するためtrueにしている。

データ取得用sql

select id, timestampe_value from logstash_sample where timestampe_value > :sql_last_value
  • :sql_last_value - last_run_metadata_pathに保存してある値をこれで参照できる。これによって、logstashがこのSQLを実行するたびに最終取得時刻以降のデータを差分取得してくる。

last_run_metadata_pathの最終取得時刻保存ファイル

ここでは最終取得時刻が保存される。中身はこんな感じ。

--- !ruby/object:DateTime '2019-05-03 17:10:45.559465000 +09:00'

実行

この状態でELK stackのdockerを起動する。正常動作すれば、logstashが定期的にoracleからelasticsearchにデータを出力するログが流れる。

logstash         | [2019-05-06T02:58:02,743][INFO ][logstash.inputs.jdbc     ] (0.144195s) select id, timestampe_value from logstash_sample where timestampe_value > TIMESTAMP '1970-01-01 09:00:00.000000 +09:00'
logstash         | [2019-05-06T02:59:00,376][INFO ][logstash.inputs.jdbc     ] (0.006701s) select id, timestampe_value from logstash_sample where timestampe_value > TIMESTAMP '2019-05-03 17:10:45.559465 +09:00'
logstash         | [2019-05-06T03:00:00,170][INFO ][logstash.inputs.jdbc     ] (0.000849s) select id, timestampe_value from logstash_sample where timestampe_value > TIMESTAMP '2019-05-03 17:10:45.559465 +09:00'

アトラス作品ファンのオフ会 平成最期の眼鏡祭(2019/04/13)に行ってきた

Twipla - 4/13 平成最期の眼鏡祭!に行ってきました。当日の様子はtwitterハッシュタグ#眼鏡祭0413で追えます。

f:id:kagamihoge:20190417184803j:plain

眼鏡祭とは?

眼鏡祭はペルソナを中心としたアトラス作品ファンの大規模オフ会です。前回で10周年を超え、ここ数年の参加人数は安定的に約200人を集めています。場所は新宿のキリストンカフェ東京をワンフロア貸し切っての開催。天候は快晴で気持ちの良い一日でした。

眼鏡祭はどんなところ?

コスプレ可能なオフ会、が一番目を引く点です。以下はコスプレ参加者全員の集合写真で、眼鏡祭の雰囲気をとても良く伝えています。最近ではコスプレ可能なイベントは増えてきましたが、アトラス作品のみのコスプレでこれだけ集まるのが眼鏡祭の特徴です。なお、コスプレ参加者の割合は7~8割です。

眼鏡祭は何をする?

オフ会なので、唯一のコンテンツはアトラス作品ファン同士の交流です。コスプレは目立ちますが、あくまでも交流のツールに過ぎません。今回の会話ネタとしては、発売から少々時期は経ちましたがPQ2セガフェス2019「大アトラス展」の感想、4/24-25 PERSONA SUPER LIVE P-SOUND STREET 2019 ~Q番シアターへようこそ~に対する期待感などなど。アトラス関連に限らず、ゲーム・マンガ・アニメ、いわゆるオタクな話題で一日中盛り上がるのが眼鏡祭です。ちなみに、私はアニメ荒野のコトブキ飛行隊を推しました。

眼鏡祭ではコスプレを仮装と呼んでいます*1。眼鏡祭はオフ会なので、コスプレは交流をしやすくするツール、という位置付けです。コスプレは推しキャラや好きな作品を自分自身で表現します。初対面同士であっても、コスプレを切欠に話が膨らむ事が眼鏡祭ではそこかしこで見られます。だからなのか、コスプレをした事がなくても、コスプレに挑戦する事を眼鏡祭では尊ぶ空気があります。

ただし、コスプレ撮影には不向きな環境な点は注意が必要です。会場はカフェ・レストランなため照明は控えめですし、更衣室はフロアを暗幕で区切っただけで着替えやメイクはやり辛いし、荷物置き場も限られます。いわゆるコスイベや撮影スタジオに比べると施設・設備はかなり劣ります。そのため、コスプレ撮影だけを目的に眼鏡祭に来ると肩透かしになるでしょう。

眼鏡祭はどんな人が来るの?

年齢構成は、元々がP4のオフ会だったため、やや高めで30歳以上が主です。アルコール提供があるため20歳以上が必須です。男女比は、パっと見た感じでは偏りは無いように感じます。7~8割が常連(3回以上参加)で、1~2割が初参加です。言い換えると、200人居る会場のうち、5人に話しかけると1人は初めての人がいます。

アトラスのファン度合いは人それぞれです。P5だけ知ってます!という若者から、アトラスのゲームならおおむね全部プレイ済みという古強者まで。人によって知識と経験に差があり混沌としているのに、不思議と調和しているのが眼鏡祭の魅力です。つまり、大人の振る舞いが出来る参加者が多い、という事です。

眼鏡祭はひとりで来ても楽しめる?

楽しめます。手前味噌の手法ですが、以下に眼鏡祭の初参加者のtweetをいくつか引用させていただきます。

眼鏡祭は初参加でもひとりでも楽しめるよう様々な工夫を凝らしています。200人規模のオフ会に来るのに大変な度胸が必要なのは運営側も良く承知しています。例えば、不安なので知り合い同士で参加したければテーブル配置が近くなるよう配慮してくれますし、上記ツイートのように名札で名前が分かるようにしたり、望まないSNSのID交換がなるべく起こらないようにする工夫、写真撮影とSNSアップは本人の同意を必ず得るルール、などなど。もちろんコミュニケーションは最終的には自分次第です。しかし、それを後押しする細やかなアイデアが眼鏡祭の魅力を底上げしています。

眼鏡祭はどなたが運営しているの?

眼鏡祭は有志スタッフで運営しています。会場や飲食提供を除き、何らかのイベント会社ではありません。

上記は眼鏡祭の主宰の運営スタッフ募集のツイートです。眼鏡祭は2次会・3次会含めるとほぼ1日200人を動かす、オフ会としては相当に大規模なイベントです。目に見える仕事だけでも、会場設営・誘導・受付・各テーブルのリーダ・更衣室管理……他には、金銭や名簿管理・各種連絡・イラストや注意書や名札等制作、事務系作業も沢山あります。このように、やる事は様々なので、誰でも何かしらのスキルを活かせます。

眼鏡祭に参加して有志スタッフに回る方は沢山います。感謝や感動を伝える方法は色々ですが、スタッフ募集に応じるのもその一つでしょう。

さいごに

眼鏡祭主宰マソーさんに有志スタッフの皆さん、今回も楽しくイベントを過ごせました。一日中オタクの話題を気兼ねなくしていられるオフラインのイベントはとても居心地が良いです。また、俺と会話して頂いた皆さん、このレポートで勝手ながらtweetを引用させて頂いた皆さん*2にも感謝致します。

眼鏡祭は次にいつ開催するの?

リンク

*1:ただし、このレポートでは分かりやすさを優先して「コスプレ」と表記します

*2:もし何らかの支障があればお手数ですが https://twitter.com/kagamihoge にご連絡下さい

Spring Batch 4.1.x - Reference Documentation - Spring Batch Integrationのテキトー翻訳

https://docs.spring.io/spring-batch/4.1.x/reference/html/spring-batch-integration.html#springBatchIntegration

https://qiita.com/kagamihoge/items/12fbbc2eac5b8a5ac1e0 俺の訳一覧リスト

1. Spring Batch Integration

1.1. Spring Batch Integration Introduction

Spring Batchのユーザの多くはそのスコープ外の機能を必要とする事があり、それらはSpring Integrationで効率的かつ簡潔に実装可能な場合があります。逆に、Spring IntegrationのユーザがSpring Batchの機能を必要としたり両フレームワークの効率的な連携を必要とする場合もあります。これについて、いくつかのパターンとユースケースがあり、Spring Batch Integrationで扱います。

Spring BatchとSpring Integrationの境目は常に明瞭なわけではなく、以下2つのアドバイスが役に立ちます。粒度に気を配り、共通パターンを適用する(Think about granularity, and apply common patterns.)。共通パターンのいくつかは本リファレンスで解説します。

バッチ処理にメッセージング追加によりオペレーション自動化・関心事の分離・strategizing*1が可能となります。例えば、メッセージをジョブ実行のトリガにし、メッセージ送信を様々な方法で行います。または、ジョブが完了もしくは失敗時に、そのイベントがメッセージ送信をトリガし、コンシューマはアプリケーション本体とは無関係な運用上の関心事を行います。メッセージングはジョブ内に埋め込むことも可能です(例えば、チャネルから処理対象のアイテムを読み込んだり書き込んだりする)。リモートパーティショニングとリモートチャンキングは多数のワーカーにワークロードを分散する手段を提供します。

このセクションでは以下のキーコンセプトを解説します。

1.1.2. Launching Batch Jobs through Messages

コアSpring Batch APIからバッチジョブを開始するには、基本的には以下2つの選択肢があります。

  • CommandLineJobRunnerCLI経由。
  • JobOperator.start()もしくはJobLauncher.run()のプログラム経由。

たとえば、シェルスクリプトでBatch Jobsを実行する場合はCommandLineJobRunnerを使います。もしくは、直接JobOperatorを使う場合もあります(webアプリケーションの一部としてSpring Batchを使う場合など)。しかし、さらに複雑なユースケースではどうでしょうか? Batch Job用のデータ取得に(S)FTPにポーリングすつとか、アプリケーションで同時に複数データソースを扱う必要がある場合です。たとえば、web経由のデータファイルだけでなくFTPやその他からも受信する場合です。これはSpring Batch実行前に入力ファイル変換が必要になります。

そこで、Spring Integrationと各種アダプターでバッチジョブ実行が柔軟になります。たとえば、File Inbound Channel Adapterによりファイルシステム上のディレクトリを直接モニタし、入力ファイルが到着したらBatch Jobの開始が出来ます。また、Spring Integration flowsで複数の異なるアダプタを使用し、設定のみで同時に複数データソースからバッチジョブで使うデータを簡単に取り込めます。Spring Integrationでこれらを実装するのは簡単で、疎結合が容易であり、JobLauncherのイベント駆動実行が出来ます。

Spring Batch Integrationにはバッチジョブを起動するJobLaunchingMessageHandlerクラスがあります。JobLaunchingMessageHandlerの入力はSpring Integrationのメッセージで、これはJobLaunchRequestペイロードを持ちます。このクラスは起動するJobとBatch jobの起動に必要なJobParametersのラッパーです。

以下はBatch jobを開始する一般的なSpring Integration message flowのイメージ図です。EIP (Enterprise Integration Patterns) websiteにメッセージングのアイコンとその解説があります。

Figure 1. Launch Batch Job

Transforming a file into a JobLaunchRequest

package io.spring.sbi;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.integration.launch.JobLaunchRequest;
import org.springframework.integration.annotation.Transformer;
import org.springframework.messaging.Message;

import java.io.File;

public class FileMessageToJobRequest {
    private Job job;
    private String fileParameterName;

    public void setFileParameterName(String fileParameterName) {
        this.fileParameterName = fileParameterName;
    }

    public void setJob(Job job) {
        this.job = job;
    }

    @Transformer
    public JobLaunchRequest toRequest(Message<File> message) {
        JobParametersBuilder jobParametersBuilder =
            new JobParametersBuilder();

        jobParametersBuilder.addString(fileParameterName,
            message.getPayload().getAbsolutePath());

        return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
    }
}

The JobExecution Response

バッチジョブを実行すると、JobExecutionを返します。このインスタンスで実行ステータスを確認します。JobExecutionが正常に作成可能であれば、実際の実行が成功するかどうかに関わらず、JobExecutionは常に返されます。

JobExecutionインスタンスが返されるかどうかはTaskExecutor次第です。synchronous(single-threaded) TaskExecutor実装を使う場合、JobExecutionのレスポンスはジョブが完了してから返します。asynchronous TaskExecutorの場合、JobExecutionは即時返します。ユーザはJobExecutionidを取得(JobExecution.getJobId()を使用)してから、JobExplorerでjobの更新後ステータスをJobRepositoryにクエリーします。詳細については、 Spring Batch reference documentationのQuerying the Repositoryを参照してください。

Spring Batch Integration Configuration

以下の設定はfile inbound-channel-adapterを生成して指定ディレクトリのCSVファイルをリッスンし、そのファイルをtransformer(FileMessageToJobRequest)に渡し、Job Launching Gatewayでjobを起動し、logging-channel-adapterJobExecutionのログ出力をします。

Java Configuration

@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
    FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
    fileMessageToJobRequest.setFileParameterName("input.file.name");
    fileMessageToJobRequest.setJob(personJob());
    return fileMessageToJobRequest;
}

@Bean
public JobLaunchingGateway jobLaunchingGateway() {
    SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
    simpleJobLauncher.setJobRepository(jobRepository);
    simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
    JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);

    return jobLaunchingGateway;
}

@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) {
    return IntegrationFlows.from(Files.inboundAdapter(new File("/tmp/myfiles")).
                    filter(new SimplePatternFileListFilter("*.csv")),
            c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1))).
            handle(fileMessageToJobRequest()).
            handle(jobLaunchingGateway).
            log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").
            get();
}

Example ItemReader Configuration

いま、ファイルのポーリングしてジョブ起動するとして、Spring BatchのItemReaderでjobパラメータ"input.file.name"のディレクトリを使用する設定を行います。bean設定は以下のようになります。

Java Configuration

@Bean
@StepScope
public ItemReader sampleReader(@Value("#{jobParameters[input.file.name]}") String resource) {
...
    FlatFileItemReader flatFileItemReader = new FlatFileItemReader();
    flatFileItemReader.setResource(new FileSystemResource(resource));
...
    return flatFileItemReader;
}

上記例のポイントはResourceプロパティ用に#{jobParameters['input.file.name']}をインジェクトし、ItemReader beanをStep scopeにしています。beanをStepスコープにすることで遅延バインディングが使用可能となり、これによりjobParameters変数にアクセスできます。

1.2. Available Attributes of the Job-Launching Gateway

job-launching gatewayの以下の属性でjobの制御設定が出来ます。

  • id: 基底Spring bean定義である以下どちらかのインスタンスを識別する。
    • EventDrivenConsumer
    • PollingConsumer(実際の実装はコンポーネントの入力チャネルがSubscribableChannelPollableChannelかどうかに依存)
  • auto-startup: エンドポイントがスタートアップ時に自動開始するかのBooleanフラグ。デフォルトtrue
  • request-channel: エンドポイントの入力MessageChannel
  • reply-channel: 実行結果のJobExecutionペイロードの送り先MessageChannel
  • reply-timeout: リプライチャネルにリプライメッセージが正常に送信されるのをゲートウェイがwaitする時間(ミリ秒)を指定します。超える場合は例外をスローします。この属性はチャネルがブロックする場合にだけ適用します(例、bounded queue channelがフルなど)。また、注意点として、DirectChannelに送信の場合、その実行は送信スレッドで行います。よって、送信オペレーションの失敗はさらに下流コンポーネントが起こしている可能性があります。reply-timeout属性は基底のMessagingTemplateインスタンスsendTimeoutプロパティにマップします。未指定の場合、属性のデフォルトは-1、これの意味は、デフォルトではGatewayは無限にwaitする、になります。
  • job-launcher: オプション。カスタムJobLauncher beanの参照。未指定ではアダプタはjobLauncheridで登録するインスタンスを再利用します。デフォルトインスタンスが無い場合、例外をスローします。
  • order: エンドポイントをサブスクライバとしてSubscribableChannelに接続した時の呼び出し順序を指定。

1.3. Sub-Elements

GatewayPollableChannelからメッセージを受信する場合、グローバルデフォルトのPollerを指定するか、Job Launching GatewayPoller sub-elementを指定する必要があります。

Java Configuration

@Bean
@ServiceActivator(inputChannel = "queueChannel", poller = @Poller(fixedRate="1000"))
public JobLaunchingGateway sampleJobLaunchingGateway() {
    JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher());
    jobLaunchingGateway.setOutputChannel(replyChannel());
    return jobLaunchingGateway;
}

1.3.1. Providing Feedback with Informational Messages

Spring Batchのjobは長時間実行が可能なので、進行状況の取得が重要な場合があります。たとえば、バッチジョブの一部または全部を失敗する場合、ステークホルダで通知を受けたい場合があります。Spring Batchは以下を通じてそうした情報を収集する仕組みがあります。

  • Active polling
  • Event-driven listeners

Spring Batchのjobを非同期実行する場合(例えばJob Launching Gatewayを使用)、JobExecutionインスタンスを返します。この場合、継続的にポーリングしてステータス更新するためにJobExecution.getJobId()を使えます。JobExplorerを使用してJobRepositoryからJobExecutionの更新されたインスタンスを取得します。しかし、これは最良とは言えず、イベント駆動がより適しています。

このため、Spring Batchは、3つの汎用リスナーを用意しています。

  • StepListener
  • ChunkListener
  • JobExecutionListener

以下イメージでは、Spring BatchのjobはStepExecutionListenerも設定しています。このとき、Spring Integrationはイベント受信とイベント前後にstepを処理します。たとえば、受信したStepExecutionRouterでチェックします。チェック結果に基づき、各種処理(例:Mail Outbound Channel Adapterにメッセージをルーティング)が可能で、状態に応じたEmail通知を送信できます。

Figure 2. Handling Informational Messages

以下2例は、StepExecutionイベントでGatewayにメッセージ送信するリスナー設定と、その出力をlogging-channel-adapterにログ出力します。

まず、通知インテグレーションのbeanを生成します。

Java Configuration

@Bean
@ServiceActivator(inputChannel = "stepExecutionsChannel")
public LoggingHandler loggingHandler() {
    LoggingHandler adapter = new LoggingHandler(LoggingHandler.Level.WARN);
    adapter.setLoggerName("TEST_LOGGER");
    adapter.setLogExpressionString("headers.id + ': ' + payload");
    return adapter;
}

@MessagingGateway(name = "notificationExecutionsListener", defaultRequestChannel = "stepExecutionsChannel")
public interface NotificationExecutionListener extends StepExecutionListener {}

※ configurationに@IntegrationComponentScanが必要です。

次に、jobにstep-levelリスナーを追加します。

Java Configuration

public Job importPaymentsJob() {
    return jobBuilderFactory.get("importPayments")
        .start(stepBuilderFactory.get("step1")
                .chunk(200)
                .listener(notificationExecutionsListener())
                ...
}

1.3.2. Asynchronous Processors

非同期プロセッサ(Asynchronous Processors)でアイテム処理のスケーリングが出来ます。非同期プロセッサのユースケースでは、AsyncItemProcessorはディスパッチャとして振る舞い、新スレッドでItemProcessorのロジックを実行します。アイテムが完了すると、書き込むためにAsynchItemWriterFutureが渡されます。

このため、基本的にはfork-joinを実装出来るような、非同期アイテム処理でパフォーマンスが向上する可能性があります。AsyncItemWriterは結果を収集してすべての結果が利用可能になり次第chunkにwrite backします。

以下はAsyncItemProcessor設定例です。

Java Configuration

@Bean
public AsyncItemProcessor processor(ItemProcessor itemProcessor, TaskExecutor taskExecutor) {
    AsyncItemProcessor asyncItemProcessor = new AsyncItemProcessor();
    asyncItemProcessor.setTaskExecutor(taskExecutor);
    asyncItemProcessor.setDelegate(itemProcessor);
    return asyncItemProcessor;
}

delegateプロパティはItemProcessor beanを設定し、taskExecutorプロパティは適当なTaskExecutorを設定します。

以下はAsyncItemWriterの設定例です。

Java Configuration

@Bean
public AsyncItemWriter writer(ItemWriter itemWriter) {
    AsyncItemWriter asyncItemWriter = new AsyncItemWriter();
    asyncItemWriter.setDelegate(itemWriter);
    return asyncItemWriter;
}

こちらも、delegateプロパティには実際の処理をするItemWriter beanを設定します。

1.3.3. Externalizing Batch Process Execution

これまでの解説ではSpring IntegrationがSpring Batchをラップするユースケースを見てきました。しかし、Spring Batchが内部的にSpring Integrationを使うことも可能です。この場合、Spring Batchのユーザは外部プロセスにアイテム処理あるいはchunkをデリゲートします。これにより複雑な処理の負荷軽減が可能です。Spring Batch Integrationは以下のための専用機能があります。

  • Remote Chunking
  • Remote Partitioning

Remote Chunking

Figure 3. Remote Chunking

ChunkMessageChannelItemWriterでchunk処理を外部化し、アイテムを外に送信して結果を収集します。送信すると、Spring Batchは読み込みとアイテムのグループ化を継続し、結果待ちをしません。ChunkMessageChannelItemWriterは結果の収集と統合をしてSpring Batchに処理を戻します。

Spring Integrationを使用して、処理のコンカンレンシーを完全に制御します(DirectChannelの代わりにQueueChannelを使用するなど)。 Channel Adapters (JMSやAMQPなど)のSpring Integrationリッチコレクションに依存することで、Batchジョブのchunkを外部システムに分散出来ます。

remote chunkするstepのシンプルなjobは以下のような設定になります。

Java Configuration

public Job chunkJob() {
     return jobBuilderFactory.get("personJob")
             .start(stepBuilderFactory.get("step1")
                     .<Person, Person>chunk(200)
                     .reader(itemReader())
                     .writer(itemWriter())
                     .build())
             .build();
 }

ItemReaderにはマスターでデータ読み込みに使用するbeanを指定します。ItemWriterには上述の専用ItemWriterChunkMessageChannelItemWriter)を指定します。processor(必要であれば設定)はマスターの設定にはありませんが、これはワーカーで設定します。以下の設定は基本的なマスターのセットアップです。必要に応じて、スロットルリミットなど、コンポーネントのプロパティを設定してください。

Java Configuration

@Bean
public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
    factory.setBrokerURL("tcp://localhost:61616");
    return factory;
}

/*
 * Configure outbound flow (requests going to workers)
 */
@Bean
public DirectChannel requests() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlows
            .from(requests())
            .handle(Jms.outboundAdapter(connectionFactory).destination("requests"))
            .get();
}

/*
 * Configure inbound flow (replies coming from workers)
 */
@Bean
public QueueChannel replies() {
    return new QueueChannel();
}

@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlows
            .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("replies"))
            .channel(replies())
            .get();
}

/*
 * Configure the ChunkMessageChannelItemWriter
 */
@Bean
public ItemWriter<Integer> itemWriter() {
    MessagingTemplate messagingTemplate = new MessagingTemplate();
    messagingTemplate.setDefaultChannel(requests());
    messagingTemplate.setReceiveTimeout(2000);
    ChunkMessageChannelItemWriter<Integer> chunkMessageChannelItemWriter
            = new ChunkMessageChannelItemWriter<>();
    chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate);
    chunkMessageChannelItemWriter.setReplyChannel(replies());
    return chunkMessageChannelItemWriter;
}

上の設定ではいくつかのbeanを設定しています。ActiveMQでメッセージングミドルウェアと、Spring Integrationのinbound/outbound JMSアダプタを設定しています。前述のitemWriter beanはjob stepで参照しており、ChunkMessageChannelItemWriterでメッセージングを介してchunkを書き込みます。

次にワーカーの設定を見ていきます。

Java Configuration

@Bean
public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
    factory.setBrokerURL("tcp://localhost:61616");
    return factory;
}

/*
 * Configure inbound flow (requests coming from the master)
 */
@Bean
public DirectChannel requests() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlows
            .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
            .channel(requests())
            .get();
}

/*
 * Configure outbound flow (replies going to the master)
 */
@Bean
public DirectChannel replies() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlows
            .from(replies())
            .handle(Jms.outboundAdapter(connectionFactory).destination("replies"))
            .get();
}

/*
 * Configure the ChunkProcessorChunkHandler
 */
@Bean
@ServiceActivator(inputChannel = "requests", outputChannel = "replies")
public ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler() {
    ChunkProcessor<Integer> chunkProcessor
            = new SimpleChunkProcessor<>(itemProcessor(), itemWriter());
    ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler
            = new ChunkProcessorChunkHandler<>();
    chunkProcessorChunkHandler.setChunkProcessor(chunkProcessor);
    return chunkProcessorChunkHandler;
}

これら設定項目の多くはマスターのものと似通っています。ワーカーはSpring Batch JobRepositoryにもjob設定にもアクセスしません。中核のbeanはchunkProcessorChunkHandlerです。ChunkProcessorChunkHandlerchunkProcessorプロパティはSimpleChunkProcessorを取り、これはItemWriter(とオプションでItemProcessor)を指定し、これ(ら)がマスターからchunk受信時にワーカーで動作します。

詳細については、Remote Chunkingの"Scalability"チャプターを参照してください。

4.1以降、Spring Batch Integrationはremote chunkingセットアップの簡素化用に@EnableBatchIntegrationを追加しました。これによりアプリケーションコンテキストに2つのbeanを作ります。

  • RemoteChunkingMasterStepBuilderFactory: マスターstepの設定に使用
  • RemoteChunkingWorkerBuilder: リモートワーカのintegration flow設定に使用

これらAPIは以下図のように多数のコンポーネントの設定に使用します。

Figure 4. Remote Chunking Configuration

マスター側では、以下の宣言によりマスターstepを設定します。

  • アイテム読み込みとワーカー送信用のreader
  • ワーカーにリクエストを送信する出力チャネル(リクエスト送信("Outgoing requests"))
  • ワーカーから返信を受信する入力チャネル(返信受信("Incoming replies"))

ChunkMessageChannelItemWriterMessagingTemplateの明示的な設定は必要ありません(必要であれば明示的な設定を行う)。

ワーカー側では、RemoteChunkingWorkerBuilderでワーカーに以下を設定します。

  • 入力チャネルでマスターが送信したリクエストをリッスン(リクエスト受信("Incoming requests"))
  • 各リクエストに対しChunkProcessorChunkHandlerhandleChunkメソッドを介してItemProcessorItemWriterを呼び出す。
  • 出力チャネルでマスターに返信を送信(返信送信"Outgoing replies")

SimpleChunkProcessorChunkProcessorChunkHandlerの明示的な設定は必要ありません(必要であれば明示的な設定を行う)。

以下はこれらAPIの設定例です。

@EnableBatchIntegration
@EnableBatchProcessing
public class RemoteChunkingJobConfiguration {

    @Configuration
    public static class MasterConfiguration {

        @Autowired
        private RemoteChunkingMasterStepBuilderFactory masterStepBuilderFactory;

        @Bean
        public TaskletStep masterStep() {
            return this.masterStepBuilderFactory.get("masterStep")
                       .chunk(100)
                       .reader(itemReader())
                       .outputChannel(requests()) // リクエストをワーカーに送信
                       .inputChannel(replies())   // ワーカーから返信を受信
                       .build();
        }

        // ミドルウェアのbeanをここでセットアップ(詳細省略)

    }

    @Configuration
    public static class WorkerConfiguration {

        @Autowired
        private RemoteChunkingWorkerBuilder workerBuilder;

        @Bean
        public IntegrationFlow workerFlow() {
            return this.workerBuilder
                       .itemProcessor(itemProcessor())
                       .itemWriter(itemWriter())
                       .inputChannel(requests()) // リクエストをマスターから受信
                       .outputChannel(replies()) // マスターに返信を送信
                       .build();
        }

        // ミドルウェアのbeanをここでセットアップ(詳細省略)

    }

}

remote chunking jobのより複雑な例についてはこちらを参照してください。

Remote Partitioning

Figure 5. Remote Partitioning

Remote Partitioningはアイテム処理ではなくボトルネックを引き起こすI/O関連で有効です。Remote Partitioningの場合、Spring Batchのstepを実行するワーカーに処理を外出しします。このため、ワーカーはItemReader, ItemProcessor, ItemWriterを持ちます。これ用に、Spring Batch IntegrationにはMessageChannelPartitionHandlerがあります

PartitionHandlerの実装はMessageChannelを使用してリモートワーカに指示を出してそのレスポンスを受け取ります。これらはリモートワーカと通信するのに使う(JMSとAMQPなど)トランポートのクラス群です。

"Scalability"チャプターのセクションのremote partitioningでは、リモートパーティショニングの概念と設定に必要なコンポーネントの概要と、ジョブ実行を複数のローカルスレッドにパーティショニングするデフォルトのTaskExecutorPartitionHandlerを使うサンプルについて、解説しています。複数JVMにリモートパーティショニングするには、追加で以下2つのコンポーネントが必要です。

  • A remoting fabric or grid environment
  • remoting fabricあるいはgrid environmentを使用するPartitionHandler実装。

remote chunking同様、JMSを"remoting fabric"に使えます。その場合、上述の通り、PartitionHandler実装にMessageChannelPartitionHandlerを使います。以下のサンプルはpartitioned jobが設定済みとして、JMS設定とMessageChannelPartitionHandlerの例です。

Java Configuration

/*
 * Configuration of the master side
 */
@Bean
public PartitionHandler partitionHandler() {
    MessageChannelPartitionHandler partitionHandler = new MessageChannelPartitionHandler();
    partitionHandler.setStepName("step1");
    partitionHandler.setGridSize(3);
    partitionHandler.setReplyChannel(outboundReplies());
    MessagingTemplate template = new MessagingTemplate();
    template.setDefaultChannel(outboundRequests());
    template.setReceiveTimeout(100000);
    partitionHandler.setMessagingOperations(template);
    return partitionHandler;
}

@Bean
public QueueChannel outboundReplies() {
    return new QueueChannel();
}

@Bean
public DirectChannel outboundRequests() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow outboundJmsRequests() {
    return IntegrationFlows.from("outboundRequests")
            .handle(Jms.outboundGateway(connectionFactory())
                    .requestDestination("requestsQueue"))
            .get();
}

@Bean
@ServiceActivator(inputChannel = "inboundStaging")
public AggregatorFactoryBean partitioningMessageHandler() throws Exception {
    AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
    aggregatorFactoryBean.setProcessorBean(partitionHandler());
    aggregatorFactoryBean.setOutputChannel(outboundReplies());
    // configure other propeties of the aggregatorFactoryBean
    return aggregatorFactoryBean;
}

@Bean
public DirectChannel inboundStaging() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow inboundJmsStaging() {
    return IntegrationFlows
            .from(Jms.messageDrivenChannelAdapter(connectionFactory())
                    .configureListenerContainer(c -> c.subscriptionDurable(false))
                    .destination("stagingQueue"))
            .channel(inboundStaging())
            .get();
}

/*
 * Configuration of the worker side
 */
@Bean
public StepExecutionRequestHandler stepExecutionRequestHandler() {
    StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler();
    stepExecutionRequestHandler.setJobExplorer(jobExplorer);
    stepExecutionRequestHandler.setStepLocator(stepLocator());
    return stepExecutionRequestHandler;
}

@Bean
@ServiceActivator(inputChannel = "inboundRequests", outputChannel = "outboundStaging")
public StepExecutionRequestHandler serviceActivator() throws Exception {
    return stepExecutionRequestHandler();
}

@Bean
public DirectChannel inboundRequests() {
    return new DirectChannel();
}

public IntegrationFlow inboundJmsRequests() {
    return IntegrationFlows
            .from(Jms.messageDrivenChannelAdapter(connectionFactory())
                    .configureListenerContainer(c -> c.subscriptionDurable(false))
                    .destination("requestsQueue"))
            .channel(inboundRequests())
            .get();
}

@Bean
public DirectChannel outboundStaging() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow outboundJmsStaging() {
    return IntegrationFlows.from("outboundStaging")
            .handle(Jms.outboundGateway(connectionFactory())
                    .requestDestination("stagingQueue"))
            .get();
}

また、以下サンプルのようにhandler属性にpartitionHandler beanを設定をしてください。

Java Configuration

        public Job personJob() {
                return jobBuilderFactory.get("personJob")
                                .start(stepBuilderFactory.get("step1.master")
                                                .partitioner("step1.worker", partitioner())
                                                .partitionHandler(partitionHandler())
                                                .build())
                                .build();
        }

リモートパーティショニングのjobのより複雑な例についてはこちらを参照してください。

リモートパーティショニングのセットアップを簡易化するための@EnableBatchIntegrationがあります。このアノテーションはリモートパーティショニングの2つのbeanを提供します。

  • RemotePartitioningMasterStepBuilderFactory: マスターのstepを設定するのに使用。
  • RemotePartitioningWorkerStepBuilderFactory: ワーカーのstepを設定するのに使用。

これらAPIは以下図のように多数のコンポーネントの設定に使用します。

Figure 6. Remote Partitioning Configuration (with job repository polling)

Figure 7. Remote Partitioning Configuration (with replies aggregation)

マスター側では、RemotePartitioningMasterStepBuilderFactoryを使用して以下を宣言することでマスターstepを設定します。

  • データをパーティション化するのに使用するPartitioner
  • ワーカーにリクエスト送信するための出力チャネル ("Outgoing requests")
  • ワーカーからの返信を受信するための入力チャネル("Incoming replies")(replies aggregation設定時)
  • ポーリングのインターバルとタイムアウト(job repository polling設定時)

MessageChannelPartitionHandlerMessagingTemplateの明示的な設定は必要ありません(必要な場合は設定する)。

反対のワーカー側では、RemotePartitioningWorkerStepBuilderFactoryを使用してワーカーを設定します。

  • 入力チャネルでマスターからのリクエスト送信をリッスンする("Incoming requests")
  • リクエストに対してStepExecutionRequestHandlerhandleメソッドを呼ぶ。
  • マスターに出力チャネルを通じて返信を送信する("Outgoing replies")

StepExecutionRequestHandlerの明示的な設定は必要ありません(必要な場合は設定する)。

これらのAPIの使用例は以下の通りです。

@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
public class RemotePartitioningJobConfiguration {

    @Configuration
    public static class MasterConfiguration {

        @Autowired
        private RemotePartitioningMasterStepBuilderFactory masterStepBuilderFactory;

        @Bean
        public Step masterStep() {
                 return this.masterStepBuilderFactory
                    .get("masterStep")
                    .partitioner("workerStep", partitioner())
                    .gridSize(10)
                    .outputChannel(outgoingRequestsToWorkers())
                    .inputChannel(incomingRepliesFromWorkers())
                    .build();
        }

        // Middleware beans setup omitted

    }

    @Configuration
    public static class WorkerConfiguration {

        @Autowired
        private RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory;

        @Bean
        public Step workerStep() {
                 return this.workerStepBuilderFactory
                    .get("workerStep")
                    .inputChannel(incomingRequestsFromMaster())
                    .outputChannel(outgoingRepliesToMaster())
                    .chunk(100)
                    .reader(itemReader())
                    .processor(itemProcessor())
                    .writer(itemWriter())
                    .build();
        }

        // Middleware beans setup omitted

    }

}

*1:strategyパターンのようにバッチ処理を独立した関数的な単位と見なせるようになる、といった意味合いと思われ