kagamihogeの日記

kagamihogeの日記です。

Spring Batch 4.1.x - Reference Documentation - Scaling and Parallel Processingのテキトー翻訳

https://docs.spring.io/spring-batch/4.1.x/reference/html/scalability.html#scalability

https://qiita.com/kagamihoge/items/12fbbc2eac5b8a5ac1e0 俺の訳一覧リスト

1. Scaling and Parallel Processing

大半のバッチ処理はシングルスレッド・シングルプロセスジョブで問題無いので、より複雑な実装を検討する前にそれが要求を満たすかどうかチェックする事をおすすめします。実際のジョブのパフォーマンスを計測し、まず単純な実装で要求を満たせないかを確認します。最近の一般的なハードウェアであれば、数分で数百メガバイトのファイルを読み書き出来ます。

パラレル処理でジョブを実装する場合、Spring Batchにはこのチャプターで解説する各種オプションが存在します。いくつかの機能はこのチャプターでは解説していません。高レベルでは、パラレル処理には2つのモードがあります。

  • 単一プロセス、マルチスレッド
  • マルチプロセス

同様にこれをカテゴリに落とし込むと、

  • Multi-threaded Step(単一プロセス)
  • Parallel Steps(単一プロセス)
  • Remote Chunking of Step(マルチプロセス)
  • Partitioning a Step(単一 or マルチプロセス)

まず、単一プロセスについてみていき、それからマルチプロセスの方法について見ていきます。

1.1. Multi-threaded Step

パラレル処理にする最も簡単な方法はstep設定にTaskExecutorを追加します。

java configurationでは以下サンプルのようにstepにTaskExecutorを追加します。

Java Configuration

@Bean
public TaskExecutor taskExecutor(){
    return new SimpleAsyncTaskExecutor("spring_batch");
}

@Bean
public Step sampleStep(TaskExecutor taskExecutor) {
        return this.stepBuilderFactory.get("sampleStep")
                                .<String, String>chunk(10)
                                .reader(itemReader())
                                .writer(itemWriter())
                                .taskExecutor(taskExecutor)
                                .build();
}

このサンプルでは、taskExecutorTaskExecutorインタフェースを実装するbeanです。TaskExecutorはSpringの標準インタフェースなので、利用可能な実装の詳細についてはSpring User Guideを参照してください。最もシンプルなマルチスレッドなTaskExecutorSimpleAsyncTaskExecutorです。

上記設定により、read-process-アイテムのchunkのwrite(コミットインターパル)を実行するStepが、それぞれ別スレッドになります。これは、処理アイテムの順序が固定でなくなり、シングルスレッドとは異なりchunkは非連続なアイテムになりうる、という点に注意が必要です。task executorの制限(スレッドプールに戻すかどうかなど)に加えて、taskletにはthrottle limitがあり、このデフォルトは4です。スレッドプールをフルに使うにはこの値を増やしてください。

java configurationの場合はthrottle limitにビルダーでアクセスします。

Java Configuration

@Bean
public Step sampleStep(TaskExecutor taskExecutor) {
        return this.stepBuilderFactory.get("sampleStep")
                                .<String, String>chunk(10)
                                .reader(itemReader())
                                .writer(itemWriter())
                                .taskExecutor(taskExecutor)
                                .throttleLimit(20)
                                .build();
}

stepで使う何らかのプーリングするリソース、例えばDataSource、起因でコンカレンシーに制限がかかる場合があります。そうしたリソースのプールには少なくともstepの同時実行step数以上を指定してください。

一般的なバッチのユースケースにおけるマルチスレッドStepの使用には実用上の制限がいくつか存在します。Stepの登場人物(readerやwriterなど)はステートフルです。もし状態がスレッドで分割されていない場合、そのコンポーネントはマルチスレッドStepでは使えません。特に、Spring Batchの標準readerとwriterの大半がマルチスレッド用に設計されていません。しかし、ステートレスかスレッドセーフなreaderとwriterとであれば動作可能で、process indicator(Preventing State Persistence参照)を使用するサンプルがSpring Batch Samplesにあります(parallelJob)。process indicatorはDB入力テーブルで処理済みアイテムをトラッキングするのに使用します。

Spring BatchはItemWriterItemReaderの実装をいくつか提供しています。基本的には、Javadocでスレッドセーフかそうでないか、コンカレント環境で問題を避けるにはどうすべきか、の記述があります。Javadocに何も情報が無い場合、実装を見て何らかの状態を保持していないか確認してください。readerが非スレッドセーフなら、SynchronizedItemStreamReaderでデコレートするか、自前のsynchronizing delegatorを作成してください。read()同様processとwrite呼び出しを同期化可能で、これらはchunkのうち最も高コストな部分であり、シングルレスレッドよりも高速に完了する可能性があります。

1.2. Parallel Steps

パラレル化したいアプリケーションロジックを分割して独立したstepに出来るのであれば、単一処理内でその部分をパラレル化できます。Parallel Stepの設定は簡単にできます。

java configurationの場合の以下例では、(step1,step2)step3をパラレルにしています。

Java Configuration

@Bean
public Job job() {
    return jobBuilderFactory.get("job")
        .start(splitFlow())
        .next(step4())
        .build()        //builds FlowJobBuilder instance
        .build();       //builds Job instance
}

@Bean
public Flow splitFlow() {
    return new FlowBuilder<SimpleFlow>("splitFlow")
        .split(taskExecutor())
        .add(flow1(), flow2())
        .build();
}

@Bean
public Flow flow1() {
    return new FlowBuilder<SimpleFlow>("flow1")
        .start(step1())
        .next(step2())
        .build();
}

@Bean
public Flow flow2() {
    return new FlowBuilder<SimpleFlow>("flow2")
        .start(step3())
        .build();
}

@Bean
public TaskExecutor taskExecutor(){
    return new SimpleAsyncTaskExecutor("spring_batch");
}

flowを実行するのに使用するTaskExecutor実装を設定で指定可能です。デフォルトはSyncTaskExecutorですが、パラレルでstepを実行するには非同期のTaskExecutorが必要です。exitステータスとtransitioningの集約前にsplitの各flowが完了する事をjobは保証します。

詳細はSplit Flowsのセクションを参照してください。

1.3. Remote Chunking

remote chunkingでは、Step処理は複数プロセスに分割し、なんらかのミドルウェアを用いて互いに通信します。以下がイメージ図です。

Figure 1. Remote Chunking

マスターコンポーネントは単一プロセスで、スレーブは複数のリモートプロセスです。マスターがボトルネックにならなければこのパターンはベストなので、アイテム読み込みよりも後続処理の方が高コストな構成向きです(実際良くある)

マスターはSpring BatchのStepで、ItemWriterミドルウェアにアイテムのchunkをメッセージとして何らかの方法で送信するもので置き換えます。スレーブはミドルウェアに応じた標準リスナー(例えば、JMSの場合、MesssageListenerの実装になる)*1で、その役割は、ChunkProcessorを介して、標準ItemWriter, ItemProcessor + ItemWriterでアイテムのchunkを処理することです。このパターンの利点はreader, processor, writerにSpring Batchの標準クラスが使える点です(stepをローカルで実行するのと同じように使える)。アイテムは動的に分割されてミドルウェア経由で共有されるので、リスナがすべてeager consumerの場合、負荷分散は自動的に行われます。

ミドルウェアはdurableで、各メッセージに対する単一コンシューマと配信保証が必要です。この候補は明らかにJMSですが、グリッドコンピューティングと共有メモリプロダクトには他の選択肢(JavaSpacesなど)もあります。

詳細はSpring Batch Integration - Remote Chunkingを参照してください。

1.4. Partitioning

Spring BatchはStepパーティション化してリモート実行するSPI(Service Provider Interface)もあります。リモートのStepインスタンスはローカルで設定して実行すると同じように使えます。以下がイメージ図です。

Figure 2. Partitioning

左側のJobStepインスタンスのシーケンスで、一つのStepがマスターとラベル付けされています。図のスレーブはすべて同一のStepインスタンスで、このStepはマスター側に置くことも可能で、結果も同じになります。スレーブは基本的にはリモートサービスですがローカルスレッドにも出来ます。このパターンでは、マスターからスレーブに送信するメッセージのdurableや配信保証は不要です。JobRepositoryメタデータJob実行において各スレーブが一度だけ実行されたことを保証します。

Spring BatchのSPIはStepの実装(PartitionStep)と指定環境用に実装が必要な2つのstrategy interfacesという構成です。strategy interfacesはPartitionHandlerStepExecutionSplitterで、その役割は以下のシーケンス図の通りです。

Figure 3. Partitioning SPI

上図右側のStepは"リモート"スレーブ、基本的には、これは多数のオブジェクトで処理を行い、PartitionStepが処理を駆動します。

以下の図はjava configurationでのPartitionStep設定例です。

Java Configuration

@Bean
public Step step1Master() {
    return stepBuilderFactory.get("step1.master")
        .<String, String>partitioner("step1", partitioner())
        .step(step1())
        .gridSize(10)
        .taskExecutor(taskExecutor())
        .build();
}

マルチスレッドstepのthrottle-limit同様、grid-sizeでtask executorが単一ステップからのリクエストで飽和する事を防止します。

Spring Batch Samplesのテストスイートにコピーして修正可能なサンプルがあります(Partition*Job.xml)。

Spring Batchは"step1:partition0"の形で次々とパーティションのstepを実行します。一貫性のため"step1:master"でマスターstepを呼びたい場合があります。stepにはエイリアスを指定可能です(id属性の代わりにname属性を指定する)。

1.4.1. PartitionHandler

PartitionHandlerはリモーティングやグリッド環境に関するコンポーネントです。リモートStepインスタンスStepExecutionリクエストを、DTOなど環境が指定するフォーマットでラップして、送信します。入力データの分割方法やStep結果の集約方法については関知しません。通常、復帰やフェイルオーバーについても関知せず、これは基本的には各環境がそうした機能を備えているためです。Spring Batchはそうした環境とは独立したリスタート機能を提供します。失敗したJobはリスタート可能で、失敗したStepsのみ再実行します。

PartitionHandlerには各環境用の実装を指定します。これには、RMIEJBwebサービス、JMS、Java Spaces、共有メモリグリッド(TerracottaCoherenceなど)、グリッド実行環境(GridGainなど)、があります。Spring Batchはプロプライエタリなグリッドやリモーティング環境用の実装は含みません。

なお、Spring Batchには、TaskExecutorを使用してローカルの複数スレッド下でStepを実行するPartitionHandlerの実装があります。TaskExecutorPartitionHandlerがそれです。

TaskExecutorPartitionHandlerjava configurationで以下例のように明示的に設定します。

Java Configuration

@Bean
public Step step1Master() {
    return stepBuilderFactory.get("step1.master")
        .partitioner("step1", partitioner())
        .partitionHandler(partitionHandler())
        .build();
}

@Bean
public PartitionHandler partitionHandler() {
    TaskExecutorPartitionHandler retVal = new TaskExecutorPartitionHandler();
    retVal.setTaskExecutor(taskExecutor());
    retVal.setStep(step1());
    retVal.setGridSize(10);
    return retVal;
}

gridSizeはstep数を決定するので、これはTaskExecutorのスレッドプールサイズに合わせます。使用可能なスレッド数より大きい値も設定可能で、これにより作業ブロックが小さくなります(which makes the blocks of work smaller.)。

TaskExecutorPartitionHandlerはIOバウンドのStepインスタンスで有効で、たとえば大規模ファイルのコピーやファイルシステムからコンテンツ管理システムへのレプリケートなど、などです。また、Step実装をリモート実行するプロキシにすることも可能です(Spring Remotingなど)。

1.4.2. Partitioner

Partitionerはシンプルな責務を持ちます。新規step実行(リスタート考慮の必要が無い)の入力パラメータとしてexecution contextを生成します。以下インタフェース定義のように一つのメソッドを持ちます。

public interface Partitioner {
    Map<String, ExecutionContext> partition(int gridSize);
}

このメソッドの戻り値はstep実行のユニーク名とExecutionContext形式の入力パラメータを関連付けます。この名前はパーテーション化したStepExecutionsのstep名としてバッチメタデータに出現します。ExecutionContextはname-valueの保存場所と見なせるので、主キー範囲・行番号・入力ファイルの位置、を持ちます。リモートStepは通常、次セクションで解説するように、#{…​}でコンテキスト入力にバインドします(stepスコープに遅延バインディング)。

step実行の名前(Partitionerが返すMapのキー)はJobの全step実行でユニークにする必要がありますが、他に制限はありません。最も単純な命名(かつユーザが識別しやすい)はプレフィクス+サフィックス命名規約で、プレフィクスは実行step名(Jobでユニーク)で、サフィックスはカウンタにします。Spring BatchのSimplePartitionerはこの規約に沿います。

PartitionNameProviderパーティションとは別のパーティション名を返すためのインタフェースを使うことも出来ます。Partitionerがこれを実装する場合、リスタートすると、その名前だけをクエリします(only the names are queried)。パーティショニングが高コストの場合はこれは最適化に役立ちます。PartitionNameProviderが返す名前はPartitionerと一致する必要があります。

1.4.3. Binding Input Data to Steps

PartitionHandlerで同一の設定でstepを実行し、そのstepにExecutionContextから実行時に入力パラメータをバインドすることは、非常に便利です。これはSpring BatchのStepScope機能で簡単に実現できます(Late Bindingで解説)。例えば、Partitionerが生成するExecutionContextを、キーfileNameでstepごとに異なるファイル(やディレクトリ)を指す場合、Partitionerは以下表のような内容になります。

Table 1. Example step execution name to execution context provided by Partitioner targeting directory processing

Step Execution Name (key) ExecutionContext (value)
filecopy:partition0 fileName=/home/data/one
filecopy:partition1 fileName=/home/data/two
filecopy:partition2 fileName=/home/data/three

Java Configuration

これにより、以下サンプルのように、execution contextにファイル名は遅延バインディングでstepにバインドされます。

@Bean
public MultiResourceItemReader itemReader(
        @Value("#{stepExecutionContext['fileName']}/*") Resource [] resources) {
        return new MultiResourceItemReaderBuilder<String>()
                        .delegate(fileReader())
                        .name("itemReader")
                        .resources(resources)
                        .build();
}

*1:The slaves are standard listeners for whatever middleware is being used

Spring Batch 4.1.x - Reference Documentation - ItemReaders and ItemWriters - 1.7-1.15のテキトー翻訳

https://docs.spring.io/spring-batch/4.1.x/reference/html/readersAndWriters.html#readersAndWriters

https://qiita.com/kagamihoge/items/12fbbc2eac5b8a5ac1e0 俺の訳一覧リスト

1.7. XML Item Readers and Writers

Spring BatchはXMLを読み込みJavaオブジェクトへのマッピングとその逆の書き込みを行うtransactionalな機能を提供します。

Constraints on streaming XML I/OにはStAX APIを使用しますが、これは他の標準XMLパースAPIバッチ処理には向かないためです(DOMはメモリに入力全体をロードするし、SAXはパース処理の制御をコールバックでしか出来ません)

Spring BatchにおけるXML入出力の扱いを知る必要があります。まず、読み込みと書き込みはいくつか異なる概念がありますが、Spring Batch XMLとしては共通です。XML処理では、トークン処理するレコード行(FieldSet)ではなく、レコードに対応する'fragments'のコレクションとしてXMLリソースを捉えます。以下がそのイメージです。

Figure 1. XML Input

ここでは'trade'タグを'root element'として定義しています。'fragment'はすべて''と''で挟みます。Spring BatchはfragmentsをオブジェクトにバインドするのにObject/XML Mapping (OXM)を使います。なお、Spring Batchは特定のバインドライブラリには依存しません。基本的な使い方としてはSpring OXMにデリゲートし、これは汎用的にOXMを抽象化したものです。Spring OXMの依存性は選択可能で、必要であればSpring Batchの特定のインタフェースを実装することも可能です。OXM機能の関連イメージ図は以下です。

Figure 2. OXM Binding

OXMの概要とレコード表現にXML framentsを使うことを簡単に解説し、基礎を確認したところでreaderとwriterの詳細を解説します。

1.7.1. StaxEventItemReader

StaxEventItemReaderの設定ではXML入力ストリームからレコードを処理するための基本的なセットアップを行います。まず、以下のStaxEventItemReaderで処理可能なXMLレコードセットを考えます。

<?xml version="1.0" encoding="UTF-8"?>
<records>
    <trade xmlns="http://springframework.org/batch/sample/io/oxm/domain">
        <isin>XYZ0001</isin>
        <quantity>5</quantity>
        <price>11.39</price>
        <customer>Customer1</customer>
    </trade>
    <trade xmlns="http://springframework.org/batch/sample/io/oxm/domain">
        <isin>XYZ0002</isin>
        <quantity>2</quantity>
        <price>72.99</price>
        <customer>Customer2c</customer>
    </trade>
    <trade xmlns="http://springframework.org/batch/sample/io/oxm/domain">
        <isin>XYZ0003</isin>
        <quantity>9</quantity>
        <price>99.99</price>
        <customer>Customer3</customer>
    </trade>
</records>

XMLレコードの処理には以下が必要です。

  • Root Element Name: マッピング対象オブジェクトとなるfragmentのroot要素の名前。上記設定ではtradeがそれに当たります。
  • Resource: 読み込むファイルのSpringのResource
  • Unmarshaller: XML fragmentからオブジェクトにマッピングするためのSpring OXMが提供するアンマーシャル機能。

以下の例は、tradeというroot要素、org/springframework/batch/item/xml/domain/trades.xmlというリソース、tradeMarshallerというアンマーシャル、を定義しています。

Java Configuration

@Bean
public StaxEventItemReader itemReader() {
        return new StaxEventItemReaderBuilder<Trade>()
                        .name("itemReader")
                        .resource(new FileSystemResource("org/springframework/batch/item/xml/domain/trades.xml"))
                        .addFragmentRootElements("trade")
                        .unmarshaller(tradeMarshaller())
                        .build();

}

上の例では、XStreamMarshallerを使用しています。このクラスは最初のkeyがfragment(root要素)名でvalueがバインド型のmapを渡せます。FieldSet同様、フィールドにマッピングするroot以外の要素はmapのkey/valueで指定します。設定では、このエイリアスを設定するためのSpring configuration utilityを使います。

Java Configuration

@Bean
public XStreamMarshaller tradeMarshaller() {
        Map<String, Class> aliases = new HashMap<>();
        aliases.put("trade", Trade.class);
        aliases.put("price", BigDecimal.class);
        aliases.put("isin", String.class);
        aliases.put("customer", String.class);
        aliases.put("quantity", Long.class);

        XStreamMarshaller marshaller = new XStreamMarshaller();

        marshaller.setAliases(aliases);

        return marshaller;
}

入力では、readerは次のfragmentが現れたと解釈するまでXMLリソースを読みます。デフォルトでは、raaderが次のfragmentが現れたと解釈するのは要素名とのマッチングで行います。readerはfragmentからスタンドアローンXMLドキュメントを生成し、XMLJavaオブジェクトにマッピングするデシリアライザー(基本的にはSpring OXM Unmarshallerのラッパー)に渡します。

まとめると、Spring configurationのインジェクションを使用して上記の流れをJavaコードで書くと以下のようになります。

StaxEventItemReader<Trade> xmlStaxEventItemReader = new StaxEventItemReader<>();
Resource resource = new ByteArrayResource(xmlResource.getBytes());

Map aliases = new HashMap();
aliases.put("trade","org.springframework.batch.sample.domain.trade.Trade");
aliases.put("price","java.math.BigDecimal");
aliases.put("customer","java.lang.String");
aliases.put("isin","java.lang.String");
aliases.put("quantity","java.lang.Long");
XStreamMarshaller unmarshaller = new XStreamMarshaller();
unmarshaller.setAliases(aliases);
xmlStaxEventItemReader.setUnmarshaller(unmarshaller);
xmlStaxEventItemReader.setResource(resource);
xmlStaxEventItemReader.setFragmentRootElementName("trade");
xmlStaxEventItemReader.open(new ExecutionContext());

boolean hasNext = true;

Trade trade = null;

while (hasNext) {
    trade = xmlStaxEventItemReader.read();
    if (trade == null) {
        hasNext = false;
    }
    else {
        System.out.println(trade);
    }
}

1.7.2. StaxEventItemWriter

StaxEventItemWriterResource, マーシャラー, rootTagNameが必要です。マーシャラーに渡すJavaオブジェクト(通常はSpring OXM Marshaller)はカスタムイベントライターでResourceに書き込みます。カスタムイベントライターはOXMツールが個々のfragmentでパブリッシュですStartDocumentEndDocumentのイベントをフィルタします。以下はStaxEventItemWriterを使う例です。

Java Configuration

@Bean
public StaxEventItemWriter itemWriter(Resource outputResource) {
        return new StaxEventItemWriterBuilder<Trade>()
                        .name("tradesWriter")
                        .marshaller(tradeMarshaller())
                        .resource(outputResource)
                        .rootTagName("trade")
                        .overwriteOutput(true)
                        .build();

}

上の設定例は3つの必須プロパティとoverwriteOutput=true属性を設定しており、この属性は本チャプター前半で説明したように、既存ファイルを上書き可能かどうかを指定します。以下のサンプルのwriterで使用しているマーシャラーは上述のreadringのサンプルで使用したものと完全に同じな点に注意してください。

Java Configuration

@Bean
public XStreamMarshaller customerCreditMarshaller() {
        XStreamMarshaller marshaller = new XStreamMarshaller();

        Map<String, Class> aliases = new HashMap<>();
        aliases.put("trade", Trade.class);
        aliases.put("price", BigDecimal.class);
        aliases.put("isin", String.class);
        aliases.put("customer", String.class);
        aliases.put("quantity", Long.class);

        marshaller.setAliases(aliases);

        return marshaller;
}

以上を要約すると、以下のようなJavaコードになります。

FileSystemResource resource = new FileSystemResource("data/outputFile.xml")

Map aliases = new HashMap();
aliases.put("trade","org.springframework.batch.sample.domain.trade.Trade");
aliases.put("price","java.math.BigDecimal");
aliases.put("customer","java.lang.String");
aliases.put("isin","java.lang.String");
aliases.put("quantity","java.lang.Long");
Marshaller marshaller = new XStreamMarshaller();
marshaller.setAliases(aliases);

StaxEventItemWriter staxItemWriter =
        new StaxEventItemWriterBuilder<Trade>()
                                .name("tradesWriter")
                                .marshaller(marshaller)
                                .resource(resource)
                                .rootTagName("trade")
                                .overwriteOutput(true)
                                .build();

staxItemWriter.afterPropertiesSet();

ExecutionContext executionContext = new ExecutionContext();
staxItemWriter.open(executionContext);
Trade trade = new Trade();
trade.setPrice(11.39);
trade.setIsin("XYZ0001");
trade.setQuantity(5L);
trade.setCustomer("Customer1");
staxItemWriter.write(trade);

1.8. JSON Item Readers And Writers

Spring Batchは以下フォーマットのJSONリソースの読み書き機能を提供します。

[
  {
    "isin": "123",
    "quantity": 1,
    "price": 1.2,
    "customer": "foo"
  },
  {
    "isin": "456",
    "quantity": 2,
    "price": 1.4,
    "customer": "bar"
  }
]

JSONリソースとは、個々のアイテムに対応するJSONオブジェクトの配列、という想定です。Spring Batchは特定のJSONライブラリには依存しません。

1.8.1. JsonItemReader

JsonItemReaderJSONのパースとバインディングorg.springframework.batch.item.json.JsonObjectReaderインタフェースの実装にデリゲートします。chunkでJSONオブジェクトを読み込むためにストリーミングAPIを使用して実装するように設計されています。現在は2つの実装が含まれます。

  • Jackson org.springframework.batch.item.json.JacksonJsonObjectReader
  • Gson org.springframework.batch.item.json.GsonJsonObjectReader

JSONレコードを処理するには以下が必要です。

  • Resource: 読み込むJSONファイルのSpring Resource.
  • JsonObjectReader: JSONオブジェクトリーダーでパースしてアイテムをJSONオブジェクトにバインドする。

以下の例は、前述のJSONリソースorg/springframework/batch/item/json/trades.jsonをJacksonベースのJsonObjectReaderで処理するJsonItemReaderの定義例です。

@Bean
public JsonItemReader<Trade> jsonItemReader() {
   return new JsonItemReaderBuilder<Trade>()
                 .jsonObjectReader(new JacksonJsonObjectReader<>(Trade.class))
                 .resource(new ClassPathResource("trades.json"))
                 .name("tradeJsonItemReader")
                 .build();
}

1.8.2. JsonFileItemWriter

JsonFileItemWriterはアイテムのマーシャリングをorg.springframework.batch.item.json.JsonObjectMarshallerにデリゲートします。このインタフェースの仕様はオブジェクトを取りJSON Stringにマーシャリングします。現在2つの実装があります。

  • Jackson org.springframework.batch.item.json.JacksonJsonObjectMarshaller
  • Gson org.springframework.batch.item.json.GsonJsonObjectMarshaller

JSONレコードを書き込むには以下が必要です。

  • Resource: 書き込み先のJSONファイルのSpring Resouce
  • JsonObjectMarshaller: オブジェクトをJSONフォーマットにマーシャリングするオブジェクト。

以下はJsonFileItemWriterの定義例です。

@Bean
public JsonFileItemWriter<Trade> jsonFileItemWriter() {
   return new JsonFileItemWriterBuilder<Trade>()
                 .jsonObjectMarshaller(new JacksonJsonObjectMarshaller<>())
                 .resource(new ClassPathResource("trades.json"))
                 .name("tradeJsonFileItemWriter")
                 .build();
}

1.9. Multi-File Input

単一Stepで複数ファイルを処理したい場合があります。ファイルがすべて同一フォーマットの場合、MultiResourceItemReaderXMLとフラットフファイル処理用のその種の入力をサポートします。ディレクトリに以下ファイルがあるとします。

file-1.txt  file-2.txt  ignored.txt

file-1.txtfile-2.txtは同一フォーマットで、業務上の仕様により、一緒に処理する必要があります。MultiResourceItemReaderでは以下例のようにワイルカードで両ファイルを読み込めます。

Java Configuration

@Bean
public MultiResourceItemReader multiResourceReader() {
        return new MultiResourceItemReaderBuilder<Foo>()
                                        .delegate(flatFileItemReader())
                                        .resources(resources())
                                        .build();
}

デリゲート先は単にFlatFileItemReaderです。上の設定は両ファイルから入力を読み込み、ロールバックとリスタートを処理します。注意点として、ItemReader同様、リスタート時に入力(この場合はファイル)を増やすと何らかの問題を起こす原因となる場合があります。バッチジョブは正常終了するまで固有のディレクトリを持つことを推奨します。

※ 入力リソースの順序はMultiResourceItemReader#setComparator(Comparator)で決められ、is preserved between job runs in restart scenario.

1.10. Database

通常のエンタープライズアプリケーションでは、データベースはバッチにおいて中心となるストレージ機構です。しかし、バッチはそのシステムを動作させるデータセットのサイズが大きくという点で他のアプリケーションとは異なります。SQLが100万行返す場合、全行読み終えるまで結果セットをメモリに保持するでしょう。Spring Batchはこの課題に対して2種類のソリューションを提供します。

  • Cursor-based ItemReader Implementations
  • Paging ItemReader Implementations

1.10.1. Cursor-based ItemReader Implementations

データベースカーソルはバッチ開発者の基本的なアプローチで、これは'ストリームな'リレーショナルデータという問題に対するデータベースの解決策です。Java ResultSetクラスは基本的にはカーソル操作を行うオブジェクト指向の機能です。ResultSetはデータの現在行のカーソルを保持します。ResultSetnextを呼ぶとカーソルは次の行に移ります。Spring BatchのカーソルベースのItemReader実装は、初期化時にカーソルをオープンしてread呼び出しの度に1行カーソルを進めて、処理用にマッピングオブジェクトを返します。closeはすべてのリソースの解放を保証するために呼びます。Spring coreのJdbcTemplateはこの問題を回避するのにResultSetの全行をすべてマッピングするコールバックを使用し、呼び出し側に制御を戻す前にcloseします。ただ、バッチでは、stepが完了するまでwaitすることになります。以下の図はカーソルベースItemReaderの動作概要です。注意点として、この例はSQL(最も一般的なので)を使用していますが、任意のテクノロジでこのベーシックなアプローチを採れます。

Figure 3. Cursor Example

これは基本的なパターンを図示しています。'FOO'テーブルは3つのカラム、ID, NAME, BARがあり、IDが1より大きく7より小さい行をselectしています。カーソルはID 2(1行目)から開始します。この結果行はFooオブジェクトにマッピングします。再度read()を呼ぶとカーソルは次の行に移動し、FooのIDは3になります。読み込み結果はread後に書き込まれ、その後オブジェクトはGC可能となります(このオブジェクトへの参照が無いと仮定)。

JdbcCursorItemReader

JdbcCursorItemReaderはカーソルベースのJDBC実装です。ResultSetを直接処理し、DataSourceから得たコネクションに対してSQLを実行します。サンプルでは以下のデータベーススキーマを使います。

CREATE TABLE CUSTOMER (
   ID BIGINT IDENTITY PRIMARY KEY,
   NAME VARCHAR(45),
   CREDIT FLOAT
);

通常は各行に対するドメインオブジェクトを使うので、以下の例はRowMapper実装によりCustomerCreditマッピングしています。

public class CustomerCreditRowMapper implements RowMapper<CustomerCredit> {

    public static final String ID_COLUMN = "id";
    public static final String NAME_COLUMN = "name";
    public static final String CREDIT_COLUMN = "credit";

    public CustomerCredit mapRow(ResultSet rs, int rowNum) throws SQLException {
        CustomerCredit customerCredit = new CustomerCredit();

        customerCredit.setId(rs.getInt(ID_COLUMN));
        customerCredit.setName(rs.getString(NAME_COLUMN));
        customerCredit.setCredit(rs.getBigDecimal(CREDIT_COLUMN));

        return customerCredit;
    }
}

JdbcCursorItemReaderJdbcTemplateと同じインタフェースを使うため、ItemReaderとの比較のため、JdbcTemplateでのデータ読み込み方法を見ておくと参考になります。例として、CUSTOMERデータベースに1,000行あるとします。最初はJdbcTemplateを使う例です。

//説明簡略化のため、dataSourceは準備済みとする。
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
List customerCredits = jdbcTemplate.query("SELECT ID, NAME, CREDIT from CUSTOMER",
                                          new CustomerCreditRowMapper());

上のコードを実行すると、customerCredits listには1,000CustomerCreditオブジェクトが入ります。queryメソッドの内部では、DataSourceからコネクションを取得し、SQLを実行し、ResultSetの各行に対しmapRowメソッドを呼びます。JdbcCursorItemReaderと比較すると、以下のようなコードになります。

JdbcCursorItemReader itemReader = new JdbcCursorItemReader();
itemReader.setDataSource(dataSource);
itemReader.setSql("SELECT ID, NAME, CREDIT from CUSTOMER");
itemReader.setRowMapper(new CustomerCreditRowMapper());
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
    customerCredit = itemReader.read();
    counter++;
}
itemReader.close();

上のコードを実行すると、counterは1,000になります。上のコードでcustomerCreditをリストに入れる場合、最終的な結果はJdbcTemplateの場合と同じになります。ただし、ItemReaderの大きな利点はアイテムのストリーム処理が可能な点にあります。readメソッドを1度だけ呼んだ後にそのアイテムをItemWriterで書き込み、それから、次のアイテムをreadで取得します。これによりアイテム読み込みと書き込みの'chunk'処理と定期コミットが可能となり、高パフォーマンスバッチ処理の基礎となります。また、以下例のように、Spring Batch Stepへ簡単にインジェクション出来ます。

Java Configuration

@Bean
public JdbcCursorItemReader<CustomerCredit> itemReader() {
        return new JdbcCursorItemReaderBuilder<CustomerCredit>()
                        .dataSource(this.dataSource)
                        .name("creditReader")
                        .sql("select ID, NAME, CREDIT from CUSTOMER")
                        .rowMapper(new CustomerCreditRowMapper())
                        .build();

}
Additional Properties

Javaでカーソルオープンする際は多様なオプションがあり、JdbcCursorItemReaderにも設定可能な多数のプロパティが存在します。

Table 2. JdbcCursorItemReader Properties

ignoreWarnings SQLWarningsログ出力するか例外を発生させるか。デフォルトはtrue(warningsをログ出力)
fetchSize JDBCドライバにDBからフェッチする行数のヒントを指定。ItemReaderで使用するResultSetが行を要求する時の行数。デフォルトは未指定。
maxRows 内部のResultSetが保持可能な最大行数。
queryTimeout ドライバがStatementをwaitする秒数。limitを超える場合、DataAccessExceptionをスローする。(詳細はドライバのドキュメントを参照)
verifyCursorPosition ItemReaderで保持するのと同じResultSetRowMapperに渡すため、そこでResultSet.next()を呼ぶことが可能であり、reader内部のカウントと食い違う可能性があります。このプロパティをtrueにすると、RowMapper呼び出し後にカーソル位置が異なる場合、例外をスローします。
saveState ItemStream#update(ExecutionContext)ExecutionContextにreaderのstateをセーブするかどうかの設定。デフォルト```true````。
driverSupportsAbsolute JDBCドライバがResultSetのabsolute rowをサポートするどうかを指定します。ResultSet.absolute()をサポートするJDBCドライバの場合trueを推奨します。大規模データセットを処理するstepが失敗する場合に特にパフォーマンス改善が見込めます。デフォルトはfalse
setUseSharedExtendedConnection カーソルで使うコネクションを他の処理でも使うかどうか、つまり同一トランザクションを共有するような、を指定します。falseでは、カーソルは固有のコネクションでオープンし、以降のstep処理で開始するトランザクションには参加しません。trueでは、クローズと各コミット後の解放をしないようにDataSourceをExtendedConnectionDataSourceProxyでラップします。trueにする場合、カーソルオープンに使うstatementは'READ_ONLY'および'HOLD_CURSORS_OVER_COMMIT'オプションで生成します。これによりstep処理で実行するトランザクション開始とコミットでカーソルを開いたままに出来ます。この機能を使用するには、データベースが本機能をサポートし、JDBCドライバ 3.0以降の必要があります。デフォルトfalse
HibernateCursorItemReader

SpringユーザがORMを使うかどうか、JdbcTemplateを使うかHibernateTemplateを使うのかに影響するような、重要な決定を下すのと同様、Spring Batchユーザにも同じ選択肢があります。HibernateCursorItemReaderはカーソルのHibernate版の実装です。バッチでHibernateを使うことには賛否があります。Hibernateは元々オンラインアプリケーション用に開発されてきたという点が大きく影響しています。しかし、だからバッチ処理には使えない、という意味にはなりません。問題解決にはスタンダードなsessionではなくStatelessSessionを使うのが手軽です。これはキャッシュとHiberanteが採用するdirty checkingを無くしたもので、バッチで何等かの問題を起こす可能性があります。statelessと通常のhibernate sesseionの詳細な違いについては、各バージョンのhibernateのドキュメントを参照してください。HibernateCursorItemReaderではHQLステートメントSessionFactoryを渡すと、JdbcCursorItemReaderと同じようにreadごとに1アイテムが返されます。以下の例はJDBC readerのと同じ'customer credit'の設定です。

HibernateCursorItemReader itemReader = new HibernateCursorItemReader();
itemReader.setQueryString("from CustomerCredit");
//説明簡略化のため、sessionFactoryは定義済みとする。
itemReader.setSessionFactory(sessionFactory);
itemReader.setUseStatelessSession(true);
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
    customerCredit = itemReader.read();
    counter++;
}
itemReader.close();

上記設定のItemReaderは、Cutomerテーブルに対するhibernateマッピングファイルが正しく作られていれば、JdbcCursorItemReaderで説明したの同様にCustomerCreditを返します。'useStatelessSession'プロパティのデフォルトはtrueですがOn/Offが可能なことを示す説明のために指定しています。また、内部的なカーソルのフェッチサイズをsetFetchSizeプロパティで設定可能な点も重要です。JdbcCursorItemReader同様、以下例のように、設定は単純です。

Java Configuration

@Bean
public HibernateCursorItemReader itemReader(SessionFactory sessionFactory) {
        return new HibernateCursorItemReaderBuilder<CustomerCredit>()
                        .name("creditReader")
                        .sessionFactory(sessionFactory)
                        .queryString("from CustomerCredit")
                        .build();
}
StoredProcedureItemReader

場合によってはストアドプロシージャでカーソルのデータを取得する必要があります。StoredProcedureItemReaderJdbcCursorItemReaderのような挙動をしますが、カーソル取得にクエリではなく、カーソルを返すストアドプロシージャを実行します。このストアドプロシージャは3つの異なる方法でカーソルを返します。

  • ResultSetを返す(SQL Server, Sybase, DB2, Derby, and MySQLを使用する場合)
  • 出力パラメータとしてref-cursorを返す(OraclePostgreSQLを使う場合)
  • ストアド関数呼び出しの戻り値

以下の例は前述の'customer credit'と同じものを使う設定です。

Java Configuration

@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
        StoredProcedureItemReader reader = new StoredProcedureItemReader();

        reader.setDataSource(dataSource);
        reader.setProcedureName("sp_customer_credit");
        reader.setRowMapper(new CustomerCreditRowMapper());

        return reader;
}

上の例は結果を返すためのResultSetを返すためにストアドプロシージャを使用します(3つの方法の内1番目)。

ストアドプロシージャがref-cursor(2番目の方法)を返す場合、ref-cursorを返す出力パラメータのポジションを指定する必要があります。以下の例はref-cursorを最初のパラメータとして設定する方法です。

Java Configuration

@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
        StoredProcedureItemReader reader = new StoredProcedureItemReader();

        reader.setDataSource(dataSource);
        reader.setProcedureName("sp_customer_credit");
        reader.setRowMapper(new CustomerCreditRowMapper());
        reader.setRefCursorPosition(1);

        return reader;
}

ストアドファンクションがカーソルを返す(3番目の方法)場合、"function"プロパティをtrueにします。デフォルトはfalseです。

Java Configuration

@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
        StoredProcedureItemReader reader = new StoredProcedureItemReader();

        reader.setDataSource(dataSource);
        reader.setProcedureName("sp_customer_credit");
        reader.setRowMapper(new CustomerCreditRowMapper());
        reader.setFunction(true);

        return reader;
}

いずれの方法においても、RowMapperDataSourceとプロシージャ名の定義が必要です。

ストアドプロシージャやファンクションがパラメータを取る場合、parametersで宣言と設定をします。以下はOracleの場合で、3つのパラメータを宣言しています。1つ目はref-cusorを返す出力パラメータで、2つ目と3つ目はINTEGERを取るパラメータです。

Java Configuration

@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
        List<SqlParameter> parameters = new ArrayList<>();
        parameters.add(new SqlOutParameter("newId", OracleTypes.CURSOR));
        parameters.add(new SqlParameter("amount", Types.INTEGER);
        parameters.add(new SqlParameter("custId", Types.INTEGER);

        StoredProcedureItemReader reader = new StoredProcedureItemReader();

        reader.setDataSource(dataSource);
        reader.setProcedureName("spring.cursor_func");
        reader.setParameters(parameters);
        reader.setRefCursorPosition(1);
        reader.setRowMapper(rowMapper());
        reader.setPreparedStatementSetter(parameterSetter());

        return reader;
}

パラメータ宣言に加えて、パラメータをセットするPreparedStatementSetter実装の指定も必要です。上記は上述のJdbcCursorItemReaderと同様の動作をします。Additional Propertiesも同様にStoredProcedureItemReaderに適用されます。

1.10.2. Paging ItemReader Implementations

結果グループを返す個々の問い合わせを複数回実行するデータベースカーソルを使う方法もあります。この結果グループをページと呼称します。個々の問い合わせには開始行番号とページの行数を指定します。

JdbcPagingItemReader

ページングするItemReaderの実装の一つがJdbcPagingItemReaderです。JdbcPagingItemReaderPagingQueryProviderを必要とし、このクラスはページ内の行を取得するSQLクエリを作成する責務を持ちます。データベースはそれぞれ固有のページングを持つので、データベースに合わせたPagingQueryProviderを指定します。なお、SqlPagingQueryProviderFactoryBeanは使用するデータベースを自動検出して適切なPagingQueryProvider実装を決定します。これにより設定は簡素化されるので、このクラスを使うことを推奨します。

SqlPagingQueryProviderFactoryBeanにはselectと``fromが必要です。また、オプションでwhereも指定可能です。これらの句とsortKey```をSQLステートメントの構築に使います。

sortKeyがユニーク制約を持つ事は重要で、複数回ジョブ実行時にデータが無くならいことを保証します。

readerがオープンすると、他のItemReader同様にread呼び出しをすると1アイテムが返ります。この裏側では、返す行が必要になるとページングが発生します。

以下の設定例は以前に解説したカーソルベースのItemReaderと同じ'customer credit'を使います。

Java Configuration

@Bean
public JdbcPagingItemReader itemReader(DataSource dataSource, PagingQueryProvider queryProvider) {
        Map<String, Object> parameterValues = new HashMap<>();
        parameterValues.put("status", "NEW");

        return new JdbcPagingItemReaderBuilder<CustomerCredit>()
                                           .name("creditReader")
                                           .dataSource(dataSource)
                                           .queryProvider(queryProvider)
                                           .parameterValues(parameterValues)
                                           .rowMapper(customerCreditMapper())
                                           .pageSize(1000)
                                           .build();
}

@Bean
public SqlPagingQueryProviderFactoryBean queryProvider() {
        SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();

        provider.setSelectClause("select id, name, credit");
        provider.setFromClause("from customer");
        provider.setWhereClause("where status=:status");
        provider.setSortKey("id");

        return provider;
}

ここで設定したItemReaderRowMapper(必須)でCustomerCreditを返します。'pageSize'プロパティは個々のクエリ実行時にデータベースから読み込むエンティティ数を指定します。

'parameterValues'プロパティにはクエリのパラメータのMapを指定します。whereで名前付きパラメータを使う場合、keyをパラメータの名前と一致させます。従来からの'?'パラメータを使う場合、keyは1から始めるプレースホルダー番号と一致させます。

JpaPagingItemReader

ページングするItemReaderの実装のもう一つはJpaPagingItemReaderです。JPAにはHibernateStatelessSessionに類する機能が無いため、JPA仕様が提供する範囲の機能を使う必要があります。JPAはページングをサポートするため、バッチ処理JPAを使うのは自然な選択です。ページが読み込まれると、そのエンティティはdetachしてpersistence contextはクリアし、これによりエンティティはページ処理後にGC可能になります。

JpaPagingItemReaderにはJPQLを宣言してEntityManagerFactoryを渡します。他のItemReader同様にreadで1アイテム返します。この裏側では、返すエンティティが必要になるとページングが発生します。以下の設定例は以前に解説したJDBC readerと同じ'customer credit'を使います。

Java Configuration

@Bean
public JpaPagingItemReader itemReader() {
        return new JpaPagingItemReaderBuilder<CustomerCredit>()
                                           .name("creditReader")
                                           .entityManagerFactory(entityManagerFactory())
                                           .queryString("select c from CustomerCredit c")
                                           .pageSize(1000)
                                           .build();
}

ここで設定したItemReaderは上で解説したJdbcPagingItemReaderと同様にCustomerCreditを返します。なお、CustomerCreditJPAアノテーションかORMマッピングファイルで正しく設定しているとします。'pageSize'プロパティは個々のクエリ実行が返すエンティティ数を指定します。

1.10.3. Database ItemWriters

フラットファイルおよびXMLファイルは固有のItemWriterを持ちますが、これはデータベースの世界とは同等ではありません。トランザクションはすべてのケースで必要な機能を提供します。ファイルのItemWriter実装はトランザクションであるかのように振る舞う必要があり、書き込みアイテムのトラッキングと適切なタイミングでフラッシュかクリアをします。データベースではこれは不要で、これはwriteがトランザクション内で行われるためです。ユーザはItemWriterを実装するDAOを作成するか、汎用書き込み処理をするカスタムItemWriterの一つを使用します。どちらの方法でも問題なく動作します。一つ注意する点としてはパフォーマンスとバッチ出力が提供するエラーハンドリング機能です。ItemWriterにhiberanateを使うのは一般的ですがJDBCのバッチモードと同様の問題が発生する可能性があります。バッチ化したデータベース出力は、フラッシュに注意してデータにエラーが無ければ、固有の欠点はありません。しかし、書き込み中のエラー発生は、以下図のように、例外を引き起こしたアイテムを知る方法が無いし、even if any individual item was responsibleなので、混乱を招く可能性があります。

Figure 4. Error On Flush

書き込み前にバッファするアイテムは、コミット前にバッファがフラッシュするまでエラーはスローされません。例えば、chunkで20アイテム書き込むとして、15番目のアイテムがDataIntegrityViolationExceptionをスローします。実際に書き込むまでエラー発生を知る方法が無いので、Stepでは全20アイテムの書き込みが正常終了と見なします。Session#flush()すると、バッファが空になり例外がヒットします。この時点では`stepに出来ることは何もありません。トランザクションロールバックが必須となります。通常、この例外はアイテムのスキップ(skip/retryポリシーに依る)となり、再度の書き込みはしません。しかし、バッチにおいては、問題を引き起こしたアイテムを知る方法がありません。エラー発生時にバッファ全体を書き込みます。これを解消する唯一の方法は、以下のように、1アイテムごとにフラッシュします。

Figure 5. Error On Write

これも、特にHibernateを使う場合では、一般的なパターンで、ItemWriter実装のシンプルガイドラインではwrite()ごとにフラッシュします。これにより確実にアイテムスキップが可能となります, with Spring Batch internally taking care of the granularity of the calls to ItemWriter after an error.

1.11. Reusing Existing Services

バッチシステムは他のアプリケーションと組み合わせて使う場合があります。よくあるのはオンラインシステムで、 but it may also support integration or even a thick client application by moving necessary bulk data that each application style uses. よって、バッチジョブでも既存DAOやサービスを使いまわしたいことが良くあります。Springコンテナは必要クラスをインジェクションすることでこれを簡単に実装できます。しかし、既存サービスをItemReaderItemWriterとして振る舞わせたいケースがあり、これはSpring Batchクラスの依存性を満たすか、サービスそれ自身をstepのItemReaderにします。各サービスをラップするアダプターを作るのは簡単ですが、これは良くあるパターンなので、Spring BatchはItemReaderAdapterItemWriterAdapterの実装を用意しています。両クラスとも一般的なデリゲートパターンを実装しており設定は簡単です。以下はItemReaderAdapterの例です。

Java Configuration

@Bean
public ItemReaderAdapter itemReader() {
        ItemReaderAdapter reader = new ItemReaderAdapter();

        reader.setTargetObject(fooService());
        reader.setTargetMethod("generateFoo");

        return reader;
}

@Bean
public FooService fooService() {
        return new FooService();
}

重要な点としてtargetMethodreadと同じ仕様にする必要があります。つまり返すデータが無い場合nullを返します。もしくはObjectを返します。ItemWriterの実装次第では、フレームワークが処理終了を検出するのを妨げる何かがあると、無限ループや不正確な失敗が発生します。以下はItemWriterAdapterの例です。

Java Configuration

@Bean
public ItemWriterAdapter itemWriter() {
        ItemWriterAdapter writer = new ItemWriterAdapter();

        writer.setTargetObject(fooService());
        writer.setTargetMethod("processFoo");

        return writer;
}

@Bean
public FooService fooService() {
        return new FooService();
}

1.12. Validating Input

本チャプターでは入力をパースする各種の方法について解説してきました。それぞれ主要の実装は既定の形式('well-formed')でなければ例外をスローします。FixedLengthTokenizerデータ範囲がおかしければ例外をスローします。また、RowMapperFieldSetMapperのインデックスにアクセスが存在していなかったり期待と異なるフォーマットだと例外をスローします。これらの例外はreadを返す前にスローします。しかし、返すアイテムがvalidかどうかは扱いません。たとえば、フィールドの一つが年齢だとして、明らかに負の値はありえません。存在してかつ数値であれば正しくパースは可能ですが、例外はスローしません。validation frameworksはすでに多数存在するのでSpring Batch固有のものは提供しません。代わりに、シンプルなインタフェースValidatorがあり、任意のフレームワークがこれを実装します。

public interface Validator<T> {

    void validate(T value) throws ValidationException;

}

オブジェクトがinvalidなら例外をスローしてそうでなければ通常通り返します。Spring BatchはValidatingItemProcessorを提供します。

Java Configuration

@Bean
public ValidatingItemProcessor itemProcessor() {
        ValidatingItemProcessor processor = new ValidatingItemProcessor();

        processor.setValidator(validator());

        return processor;
}

@Bean
public SpringValidator validator() {
        SpringValidator validator = new SpringValidator();

        validator.setValidator(new TradeValidator());

        return validator;
}

また、Bean Validation API (JSR-303) でアイテムをvalidationするにはBeanValidatingItemProcessorが使えます。

class Person {

    @NotEmpty
    private String name;

    public Person(String name) {
     this.name = name;
    }

    public String getName() {
     return name;
    }

    public void setName(String name) {
     this.name = name;
    }

}

BeanValidatingItemProcessor beanをアプリケーションコンテキスに宣言してchunk stepにprocessorとして登録することでアイテムをvalidateします。

@Bean
public BeanValidatingItemProcessor<Person> beanValidatingItemProcessor() throws Exception {
    BeanValidatingItemProcessor<Person> beanValidatingItemProcessor = new BeanValidatingItemProcessor<>();
    beanValidatingItemProcessor.setFilter(true);

    return beanValidatingItemProcessor;
}

1.13. Preventing State Persistence

デフォルトではすべてのItemReader, ItemWriterはコミット前にExecutionContextに現在の状態を保存しますが、これが望ましい振る舞いではない場合があります。たとえば、process indicatorで再実行可能('rerunnable')なdatabse readerを作成する場合です。これは入力データに処理済みかどうかを示すのカラムを設けます。あるレコードが読み込み(や書き込み)したら処理済みフラグをfalseからtrueにします。SQLステートメントwhere PROCESSED_IND = falseなどを持たせ、リスタートの場合に未処理レコードのみ返すようにします。この場合、現在行番号などはリスタートに無関係なので、状態を持つ必要はありません。このため、readerとwriterには'saveState'プロパティがあります。

Java Configuration

@Bean
public JdbcCursorItemReader playerSummarizationSource(DataSource dataSource) {
        return new JdbcCursorItemReaderBuilder<PlayerSummary>()
                                .dataSource(dataSource)
                                .rowMapper(new PlayerSummaryMapper())
                                .saveState(false)
                                .sql("SELECT games.player_id, games.year_no, SUM(COMPLETES),"
                                  + "SUM(ATTEMPTS), SUM(PASSING_YARDS), SUM(PASSING_TD),"
                                  + "SUM(INTERCEPTIONS), SUM(RUSHES), SUM(RUSH_YARDS),"
                                  + "SUM(RECEPTIONS), SUM(RECEPTIONS_YARDS), SUM(TOTAL_TD)"
                                  + "from games, players where players.player_id ="
                                  + "games.player_id group by games.player_id, games.year_no")
                                .build();

}

上で設定するItemReaderはいずれの実行においてもExecutionContextにエントリを作りません。

1.14. Creating Custom ItemReaders and ItemWriters

これまで、このチャプターではSpring Batchの読み込みと書き込みの基本仕様と汎用の実装クラスを解説してきました。しかし、これらはすべて汎用品であり、そのクラスではカバーできないケースは多数存在します。このセクションでは、シンプルなケースを通して、カスタムのItemReaderItemWriterの作成と仕様の正しい実装方法について解説します。readerやwriterをリスタート可能にするにはItemReaderに加えてItemStreamも実装します。

1.14.1. Custom ItemReader Example

解説のために、指定リストから読み込むシンプルなItemReaderを作ります。ItemReaderreadの基本的な仕様の実装かあ始めます。

public class CustomItemReader<T> implements ItemReader<T>{

    List<T> items;

    public CustomItemReader(List<T> items) {
        this.items = items;
    }

    public T read() throws Exception, UnexpectedInputException,
       NonTransientResourceException, ParseException {

        if (!items.isEmpty()) {
            return items.remove(0);
        }
        return null;
    }
}

上のクラスはアイテムのリストを指定して1アイテムを返し、リストからそのアイテムを削除します。リストが空になると、nullを返すことで、ItemReaderの最も基本的な仕様を満たします。テストコードは以下になります。

List<String> items = new ArrayList<String>();
items.add("1");
items.add("2");
items.add("3");

ItemReader itemReader = new CustomItemReader<String>(items);
assertEquals("1", itemReader.read());
assertEquals("2", itemReader.read());
assertEquals("3", itemReader.read());
assertNull(itemReader.read());

Making the ItemReader Restartable

次にItemReaderをリスタート可能にします。現状では、処理中断して再開すると、ItemReaderは最初から開始します。これが妥当な事も多いですが、場合によっては中断箇所からリスタートするバッチジョブが望ましい事があります。キーポイントはreaderがstatefulかstatelessかす。stateless readerはrestartabilityの懸念は無いですが、statefulはリスタートで最終状態を再構築の必要があります。このため、出来る限り、カスタムreaderをstatelessにする事を推奨します。これによりrestartabilityの懸念が無くなります。

状態を保存する場合、ItemStreamを使います。

public class CustomItemReader<T> implements ItemReader<T>, ItemStream {

    List<T> items;
    int currentIndex = 0;
    private static final String CURRENT_INDEX = "current.index";

    public CustomItemReader(List<T> items) {
        this.items = items;
    }

    public T read() throws Exception, UnexpectedInputException,
        ParseException, NonTransientResourceException {

        if (currentIndex < items.size()) {
            return items.get(currentIndex++);
        }

        return null;
    }

    public void open(ExecutionContext executionContext) throws ItemStreamException {
        if(executionContext.containsKey(CURRENT_INDEX)){
            currentIndex = new Long(executionContext.getLong(CURRENT_INDEX)).intValue();
        }
        else{
            currentIndex = 0;
        }
    }

    public void update(ExecutionContext executionContext) throws ItemStreamException {
        executionContext.putLong(CURRENT_INDEX, new Long(currentIndex).longValue());
    }

    public void close() throws ItemStreamException {}
}

ItemStreamupdateの度にItemReaderの現在インデックスを'current.index'キーでExecutionContextに保存します。ItemStream openでは、ExecutionContextにキーでエントリがあるかどうかをチェックします。もしキーが有れば現在インデックスを移動します。これはかなりシンプルな例ですが、仕様を満たしています。

ExecutionContext executionContext = new ExecutionContext();
((ItemStream)itemReader).open(executionContext);
assertEquals("1", itemReader.read());
((ItemStream)itemReader).update(executionContext);

List<String> items = new ArrayList<String>();
items.add("1");
items.add("2");
items.add("3");
itemReader = new CustomItemReader<String>(items);

((ItemStream)itemReader).open(executionContext);
assertEquals("2", itemReader.read());

ItemReaders実装の多くは固有のリスタートロジックを持っています。例えばJdbcCursorItemReaderはカーソルの最終処理行のrow IDを保存します。

ExecutionContextのキーはちょっとした程度のものでは無い点に注意してください。StepのすべてのItemStreamsで同一のExecutionContextを使うためです。基本的には、単にクラス名がプレフィクスのキーで十分に一意になります。しかし、まれに、同一stepに同一のItemStreamがある場合(出力に2ファイル必要な場合こうなる)があり、ユニークにするにはもう一工夫が必要です。このため、Spring BatchのItemReaderItemWriter実装の多くはキー名をオーバーライドするsetName()を持っています。

1.14.2. Custom ItemWriter Example

カスタムItemWriterの実装は様々な点でItemReaderと同じですが、but differs in enough ways as to warrant its own example. しかし、restartabilityの付与は基本的に変わらないので、ここの例では扱いません。ItemReader同様、サンプルをなるべくシンプルにするためにListを使います。

public class CustomItemWriter<T> implements ItemWriter<T> {

    List<T> output = TransactionAwareProxyFactory.createTransactionalList();

    public void write(List<? extends T> items) throws Exception {
        output.addAll(items);
    }

    public List<T> getOutput() {
        return output;
    }
}

Making the ItemWriter Restartable

ItemWriterをリスタート可能にするには、ItemReaderと同様のプロセスを踏み、execution contextを同期化するためにItemStreamを追加して実装します。サンプルでは、処理アイテム数カウントとフッター行にこれを追加することになるでしょう。これを実装する必要が出た場合ItemWriterItemStreamを実装し、このItemStreamを再オープン時にexecution contextからカウンタを再構築します。

実際には、別のwriterにデリゲートするItemWriters自身もリスタート可能(例えばファイル書き込み時など)だとか、トランザクショナルなリソースに書き込むためにリスタートの必要が無いつまりステートレス、な事があります。ステートフルなwriterを使う場合ItemWriterと同様にItemStreamも注意して実装してくだい。こうしたwriterのクライアントはItemStreamな事にも注意を払う必要があり、設定でstreamとして登録する必要がある場合があります。

1.15. Item Reader and Writer Implementations

ここでは、これまでのセクションで解説していないreaderとwriterの実装について紹介します。

1.15.1. Decorators

既存のItemReaderに特殊な振る舞いを追加したい場合があります。Spring BatchはItemReaderおよびItemWriter実装に振る舞いを追加するデコレータを用意しています。

Spring Batchには以下のデコレータがあります。

SynchronizedItemStreamReader

ItemReaderがスレッドセーフでは無い場合、Spring BatchのSynchronizedItemStreamReaderにより、ItemReaderをスレッドセーフに出来ます。Spring BatchはSynchronizedItemStreamReaderを生成するSynchronizedItemStreamReaderBuilderを用意しています。

SingleItemPeekableItemReader

Spring BatchにはItemReaderにpeekメソッドを追加するデコレータがあります。peekメソッドは1つ先のアイテムを読み込めます。peedを繰り返し読んでも同一アイテムが返され、そのアイテムは次回のreadメソッド呼び出し結果と同一です。Spring BatchはSingleItemPeekableItemReaderを生成するSingleItemPeekableItemReaderBuilderを用意しています。

SingleItemPeekableItemReaderのpeekメソッドは非スレッドセーフで、これはマルチスレッドでpeekを保証できないと考えられるためです。peekしたスレッドのうち1つだけが次回の読み込みでそのアイテムを読み込む可能性があります。

MultiResourceItemWriter

MultiResourceItemWriterResourceAwareItemWriterItemStreamをラップし、現在のリソースに書き込んだアイテム数がitemCountLimitPerResourceを超える場合に次の出力リソースを新規作成します。Spring BatchはMultiResourceItemWriterを生成するMultiResourceItemWriterBuilderを用意しています。

ClassifierCompositeItemWriter

ClassifierCompositeItemWriterは、指定するClassifier実装のルーターパターンに基づき、各アイテムごとにItemWriter実装コレクションのうち1つを呼び出します。すべてのデリゲート先がスレッドセーフであればこの実装はスレッドセーフになります。Spring BatchはClassifierCompositeItemWriterを生成するClassifierCompositeItemWriterBuilderを用意しています。

ClassifierCompositeItemProcessor

ClassifierCompositeItemProcessorは、指定するClassifier実装のルーターパターンに基づき、ItemProcessor実装コレクションのうち1つを呼び出すItemProcessorです。Spring BatchはClassifierCompositeItemProcessorを生成するClassifierCompositeItemProcessorBuilderを用意しています。

1.15.2. Messaging Readers And Writers

Spring Batchではメッセージングシステム用のreaderとwriterを用意しています。

AmqpItemReader

AmqpItemReaderはexchangeからメッセージを受信したり変換するのにAmqpTemplateを使うItemReaderです。Spring BatchはAmqpItemReaderを生成するAmqpItemReaderBuilderを用意しています。

AmqpItemWriter

AmqpItemWriterはAMQP exchangeへメッセージ送信するのにAmqpTemplateを使うItemWriterです。AmqpTemplateにnama未指定の場合はnameless exchangeにメッセージを送信します。Spring BatchはAmqpItemWriterを生成するAmqpItemWriterBuilderを用意しています。

JmsItemReader

JmsItemReaderJmsTemplateを使用するJMS用のItemReaderです。テンプレートにはデフォルトのdestinationを持たせてくてださい。read()メソッドでのアイテム取得に使います。Spring BatchはJmsItemReaderを生成するJmsItemReaderBuilderを用意しています。

JmsItemWriter

JmsItemWriterJmsTemplateを使用するJMS用のItemWriterです。テンプレートにはデフォルトのdestinationを持たせてくてださい。write(List)でのアイテム送信に使います。Spring BatchはJmsItemWriterを生成するJmsItemWriterBuilderを用意しています。

1.15.3. Database Readers

Spring Batchは以下のdatabase readerを提供します。

Neo4jItemReader

Neo4jItemReaderはpaging techniqueを使用してgraph database Neo4jからオブジェクトを読み込むItemReaderです。Spring BatchはNeo4jItemReaderを生成するNeo4jItemReaderBuilderを用意しています。

MongoItemReader

MongoItemReaderはpaging techniqueを使用してMongoDBからドキュメントを読み込むItemReaderです。Spring BatchはMongoItemReaderを生成するMongoItemReaderBuilderを用意しています。

HibernateCursorItemReader

HibernateCursorItemReaderHibernateベースでDBレコードを読み込むItemStreamReaderです。HQLクエリを実行すると、初期化し、read()が呼ばれると結果をイテレートし、現在行に対応するオブジェクトを逐次返します。Spring BatchはHibernateCursorItemReaderを生成するHibernateCursorItemReaderBuilderを用意しています。

HibernatePagingItemReader

HibernatePagingItemReaderHibernateベースでDBレコードを読み込むItemReaderで、一度に固定数アイテム読み込みます。Spring BatchはHibernatePagingItemReaderを生成するHibernatePagingItemReaderBuilderを用意しています。

RepositoryItemReader

RepositoryItemReaderPagingAndSortingRepositoryを使用してレコードを読み込むItemReaderです。Spring BatchはRepositoryItemReaderを生成するRepositoryItemReaderBuilderを用意しています。

1.15.4. Database Writers

Spring Batchは以下のdatabase writerを提供します。

Neo4jItemWriter

Neo4jItemWriterはNeo4jデータベースに書き込むItemWriterの実装です。Spring BatchはNeo4jItemWriterを生成するNeo4jItemWriterBuilderを用意しています。

MongoItemWriter

MongoItemWriterはSpring DataのMongoOperationsの実装を使用してMongoDBに書き込むItemWriterの実装です。Spring BatchはMongoItemWriterを生成するMongoItemWriterBuilderを用意しています。

RepositoryItemWriter

RepositoryItemWriterはSpring DataのCrudRepository用のItemWriterラッパーです。Spring BatchはRepositoryItemWriterを生成するRepositoryItemWriterBuilderを用意しています。

HibernateItemWriter

HibernateItemWriterは現在のHibernate sessionに居ないエンティティのsaveやupdateをするItemWriterです。Spring BatchはHibernateItemWriterを生成するHibernateItemWriterBuilderを用意しています。

JdbcBatchItemWriter

JdbcBatchItemWriterは全アイテムのステートメントをバッチ実行するのにNamedParameterJdbcTemplateの機能を使います。Spring BatchはJdbcBatchItemWriterを生成するJdbcBatchItemWriterBuilderを用意しています。

JpaItemWriter

JpaItemWriterは現在の永続化コンテキストに居ないエンティティをmergeするのにJPAEntityManagerFactoryを使うItemWriterです。Spring BatchはJpaItemWriterを生成するJpaItemWriterBuilderを用意しています。

GemfireItemWriter

GemfireItemWriterはkey/valueでGemFireにアイテムを格納するのにGemfireTemplateを使うItemWriterです。Spring BatchはGemfireItemWriterを生成するGemfireItemWriterBuilderを用意しています。

1.15.5. Specialized Readers

Spring Batchには以下の特化reader(specialized readers)があります。

LdifReader

LdifReaderはLDIF(LDAP Data Interchange Format)をResourceから読み込み・パースしてreadLdapAttributeを返します。Spring BatchはLdifReaderを生成するLdifReaderBuilderを用意しています。

MappingLdifReader

MappingLdifReaderはLDIF (LDAP Data Interchange Format)をResourceから読み込み・パースしてLDIFレコードをPOJOマッピングします。readはPOJOを返します。Spring BatchはMappingLdifReaderを生成するMappingLdifReaderBuilderを用意しています。

1.15.6. Specialized Writers

Spring Batchには以下の特化writerがあります。

SimpleMailMessageItemWriter

SimpleMailMessageItemWriterはメール送信するItemWriterです。実際のメール送信はMailSenderにデリゲートします。Spring BatchはSimpleMailMessageItemWriterを生成するSimpleMailMessageItemWriterBuilderを用意しています。

1.15.7. Specialized Processors

Spring Batchには以下の特化processorがあります。

ScriptItemProcessor

ScriptItemProcessorスクリプトに処理したいアイテムを渡し、processorの戻り値がスクリプトの結果になります。Spring BatchはMappingLdifReaderを生成するMappingLdifReaderBuilderを用意しています。

Spring Batch 4.1.x - Reference Documentation - ItemReaders and ItemWriters - 1.1-1.6のテキトー翻訳

https://docs.spring.io/spring-batch/4.1.x/reference/html/readersAndWriters.html#readersAndWriters

https://qiita.com/kagamihoge/items/12fbbc2eac5b8a5ac1e0 俺の訳一覧リスト

1. ItemReaders and ItemWriters

すべてのバッチ処理は、大規模データの読み込み・何らかの計算あるいは変換処理・結果の書き込み、というシンプルな形で説明が可能です。Spring Batchはバルク読み込みと書き込みを実行する3つの中核インタフェース、ItemReader, ItemProcessor, ItemWriter、を用意しています。

1.1. ItemReader

コンセプトはシンプルですが、ItemReaderは多数の異なる種類の入力からデータを受け取るクラスです。よくある例は以下の通りです。

  • フラットファイル: フラットファイルのitem readerはフラットファイルからデータ行を読み込みます。このファイルは基本的にレコード定義を、各フィールドはファイルの固定位置で定義するか、なんらかの特殊文字(カンマ)による区切り、で行います。
  • XML: XMLItemReadersは、パース処理するテクノロジとは独立してXMLを処理し、オブジェクトのマッピングとvalidateをします。入力データはXSDスキーマに対するXMLファイルvalidationが可能です。
  • Database: DBリソースのresultsetからprocessに回すオブジェクトにマッピングします。デフォルトのSQL ItemReader 実装は戻りオブジェクトにRowMapperを実行し、リスタートする場合はカレントの行をトラッキングし、基本的な統計を保存し、後述するトランザクション機能を提供します。

様々な用途が考えられますが、このチャプターでは基本的な事柄に焦点をあてます。利用可能なすべてのItemReader実装のリストはAppendix Aを参照してください。

ItemReaderは、以下のインタフェース定義のように、汎用入力操作用の基礎的インタフェースです。

public interface ItemReader<T> {

    T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;

}

readメソッドはItemReaderの中核を定義しています。このメソッドは1つのitemを返すか、これ以上読み込みものが無い場合はnullを返します。itemは、ファイルの1行、DBの1行、XMLの1要素、を表します。これらのitemは扱いやすいドメインオブジェクト(Trade, Fooなど)に基本的には変換しますが、必須ではありません。

ItemReaderの実装は基本的には単方向のみです。ただし、基底リソースがtransactional(JMSキューなど)の場合、ロールバック発生時のその次にread呼び出しをすると同一のlogical itemを返す可能性があります。また、ItemReaderで処理するアイテムが無いけど例外をスローしないケースは一考の余地があります。たとえば、DBのItemReaderのクエリが0件の場合、readの初回呼び出しでnullを返します。

1.2. ItemWriter

ItemWriterは機能的にはItemReaderと似ていますがそれとは逆の操作をします。何らかのリソースを、配置・open・closeが必要な点は同じで、読み込みではなく書き込む点が異なります。DBやキューの場合、行う操作は、insert, update, sendになります。出力のシリアライズフォーマットはバッチジョブそれぞれ固有のものになります。

ItemReader同様ItemWriterは、以下のような、極めて汎用的なインタフェース定義です。

public interface ItemWriter<T> {

    void write(List<? extends T> items) throws Exception;

}

ItemReaderread同様、writeItemWriterの振る舞いの基礎を提供します。open状態のwriterに渡されるitemのリストを書き込みます。基本的に、itemは複数まとめて('batched' together)chunkに入れて出力するという想定なので、インタフェースはwriter自身でitemを作成するのではなく、itemのリストを受け取ります。リストの書き込み後、writeメソッドを返す前に、状況に応じたflushを実行します。例えば、Hibernate DAOに書き込む場合、各アイテムごとに複数回書き込みを実行します。その後writerはreturn前にhibernate sessionのflushを呼び出します。

1.3. ItemProcessor

ItemReaderItemWriterはここに書きたいタスクがある場合には有用ですが、書き込む前に実行したいビジネスロジックとは何でしょうか。書き込みと読み込みの両方で可能な一つのやり方にcomposite patternがあります。これは、別のItemWriterを持つItemWriterか、もしくは、別のItemReaderを持つItemReaderです。以下はその例です。

public class CompositeItemWriter<T> implements ItemWriter<T> {

    ItemWriter<T> itemWriter;

    public CompositeItemWriter(ItemWriter<T> itemWriter) {
        this.itemWriter = itemWriter;
    }

    public void write(List<? extends T> items) throws Exception {
        //Add business logic here
       itemWriter.write(items);
    }

    public void setDelegate(ItemWriter<T> itemWriter){
        this.itemWriter = itemWriter;
    }
}

上のクラスは、何らかのビジネスロジック実行後に、別のItemWriterにデリゲートします。このパターンは同様にItemReaderでも使用可能で、メインとなるItemReaderの入力を基にして更に別の参照データを得るような場合に使えます。write呼び出しを制御したい場合にも有用です。ただし、実際の書き込み前にアイテムを変換しておきたい場合、そのクラス自身ではwriteの必要はありません。そこではただ単にアイテムの修正のみ行います。この場合、Spring Batchの、以下インフェース定義を持つItemProcessorを使います。

public interface ItemProcessor<I, O> {

    O process(I item) throws Exception;
}

ItemProcessorはシンプルです。1つのオブジェクトを与え、変換して返します。入出力のオブジェクトは同一型になる場合もならない場合もあります。ビジネスロジックはprocess内で適用する点と、その中身はロジックを作成する開発者に完全に委ねられている、という点が重要です。ItemProcessorはstepに直接ワイヤリングできます。たとえば、ItemReaderFooクラスで読み込んで書き込み前にBarに変換する必要がある、とします。以下は変換を実行するItemProcessorの例です。

public class Foo {}

public class Bar {
    public Bar(Foo foo) {}
}

public class FooProcessor implements ItemProcessor<Foo,Bar>{
    public Bar process(Foo foo) throws Exception {
        // FooからBarへのシンプルな変換
        return new Bar(foo);
    }
}

public class BarWriter implements ItemWriter<Bar>{
    public void write(List<? extends Bar> bars) throws Exception {
        //barsの書き込み
    }
}

上の例では、Foo, Barのクラスがあり、ItemProcessorを実装するFooProcessorがあります。変換はシンプルですが、これ以外の型へ変換することも可能です。BarWriterBarオブジェクトを書き込み、これ以外の型が来る場合は例外をスローします。同じく、FooProcessorFoo以外の場合に例外をスローします。FooProcessorは以下例のようにStepにインジェクションします。

Java Configuration

@Bean
public Job ioSampleJob() {
        return this.jobBuilderFactory.get("ioSampleJOb")
                                .start(step1())
                                .end()
                                .build();
}

@Bean
public Step step1() {
        return this.stepBuilderFactory.get("step1")
                                .<String, String>chunk(2)
                                .reader(fooReader())
                                .processor(fooProcessor())
                                .writer(barWriter())
                                .build();
}

1.3.1. Chaining ItemProcessors

大抵のケースで変換は一つで十分ですが、複数のItemProcessor実装を一緒に実行したい場合はどうでしょうか。前に開設したcomposite patternで実現可能です。FooからBarに変換し、それからFooBarに変換して書き込む例は以下です。

public class Foo {}

public class Bar {
    public Bar(Foo foo) {}
}

public class Foobar {
    public Foobar(Bar bar) {}
}

public class FooProcessor implements ItemProcessor<Foo,Bar>{
    public Bar process(Foo foo) throws Exception {
        //Perform simple transformation, convert a Foo to a Bar
        return new Bar(foo);
    }
}

public class BarProcessor implements ItemProcessor<Bar,Foobar>{
    public Foobar process(Bar bar) throws Exception {
        return new Foobar(bar);
    }
}

public class FoobarWriter implements ItemWriter<Foobar>{
    public void write(List<? extends Foobar> items) throws Exception {
        //write items
    }
}

FooProcessorBarProcessorは、以下例のように、結果としてFoobar```を返します。

CompositeItemProcessor<Foo,Foobar> compositeProcessor =
                                      new CompositeItemProcessor<Foo,Foobar>();
List itemProcessors = new ArrayList();
itemProcessors.add(new FooTransformer());
itemProcessors.add(new BarTransformer());
compositeProcessor.setDelegates(itemProcessors);

先の例同様に、composite processorStepに設定します。

Java ConfigurationStepListener

@Bean
public Job ioSampleJob() {
        return this.jobBuilderFactory.get("ioSampleJob")
                                .start(step1())
                                .end()
                                .build();
}

@Bean
public Step step1() {
        return this.stepBuilderFactory.get("step1")
                                .<String, String>chunk(2)
                                .reader(fooReader())
                                .processor(compositeProcessor())
                                .writer(foobarWriter())
                                .build();
}

@Bean
public CompositeItemProcessor compositeProcessor() {
        List<ItemProcessor> delegates = new ArrayList<>(2);
        delegates.add(new FooProcessor());
        delegates.add(new BarProcessor());

        CompositeItemProcessor processor = new CompositeItemProcessor();

        processor.setDelegates(delegates);

        return processor;
}

1.3.2. Filtering Records

item processorの良くある使い方の一つはItemWriterに渡す前にレコードをフィルタリングします。フィルタはスキップとは異なるアクションです。スキップはレコードがinvalidなことを示し、一方、フィルタは書き込まないレコードを意味します。

例えば、三種の異なるタイプのレコード、挿入・更新・削除、を持つファイルを読むバッチジョブを考えます。対象システムでレコード削除が未サポートの場合、削除レコードはItemWriterには送りたくありません。しかし、レコード自体は不正データでは無いので、スキップではなくフィルタをかけたいと考えます。この結果、ItemWriterは挿入・更新レコードのみ受け取ります。

レコードをフィルタするには、ItemProcessornullを返します。フレームワークは戻り値nullがあるとItemWriterに渡すレコードリストにそのアイテムを追加しません。ItemProcessorの例外スローはスキップになります。

1.3.3. Fault Tolerance

chunkをロールバックすると、読み込みでキャッシュしたアイテムのreprocessが可能です。stepにfault tolerant(skipやretry処理)を設定する場合、ItemProcessorはべき等に実装します。基本的には、ItemProcessorで入力アイテムは一切変更せず、変更は結果となるインスタンスにだけ行います。

1.4. ItemStream

ItemReadersItemWritersはそれぞれ固有の役割を持ちますが、両者に共通な役割のインタフェースがあります。基本的に、これは、バッチジョブのscopeの一部となり、readerとwriterでopen,closeをし、永続化状態のメカニズムを必要とします。ItemStreamは以下のインタフェースを提供します。

public interface ItemStream {

    void open(ExecutionContext executionContext) throws ItemStreamException;

    void update(ExecutionContext executionContext) throws ItemStreamException;

    void close() throws ItemStreamException;
}

各メソッドの説明の前に、ExecutionContextについて触れます。ItemStreamも実装するItemReaderのクライアントはread前にopenを呼び出し、ファイルなど何らかのリソースのopenやコネクション取得を行います。同様の制限がItemWriter実装にも適用されます。Chapter 2で触れたように、ExecutionContextに再開用データがある場合、初期状態ではない位置からItemReaderItemWriterを開始するためにそのデータを用います。逆に、openしたリソースの安全な解放を保証するべくcloseを呼びます。updateは現在保持している状態をExecutionContextにロードするために呼ばれます。現在の状態をコミット前にDBへの永続化を保証するため、コミット前に呼ばれます。

特殊な場合としてItemStreamのクライアントがStepの場合、ExecutionContextが各StepExecutionで作成され、ユーザはexecutitonの状態を格納でき、同一のJobInstanceで再開する場合はその状態を返します。Quartzに詳しい場合は、QuartsのJobDataMapに良く似たセマンティクスになっています。

1.5. The Delegate Pattern and Registering with the Step

CompositeItemWriterは、Spring Batchの共通パターンの一つ、delegation patternの例です。委譲先はStepListenerなどののコールバックインタフェースを実装する場合があります。そのインタフェースを実装し、かつ、JobStepの一部としてSpring Batch Coreと連携させる場合、Stepに明示的な登録が必要となる場合がほとんどです。reader, writer, processorStepに直接ワイヤリングすると、ItemStreamStepListenerを実装している場合、自動登録されます。しかし、Stepは委譲先を関知しないため、以下例のように、これらをlistenerやstreamとしてインジェクションする必要があります。

Java ConfigurationStepListener

@Bean
public Job ioSampleJob() {
        return this.jobBuilderFactory.get("ioSampleJob")
                                .start(step1())
                                .end()
                                .build();
}

@Bean
public Step step1() {
        return this.stepBuilderFactory.get("step1")
                                .<String, String>chunk(2)
                                .reader(fooReader())
                                .processor(fooProcessor())
                                .writer(compositeItemWriter())
                                .stream(barWriter())
                                .build();
}

@Bean
public CustomCompositeItemWriter compositeItemWriter() {

        CustomCompositeItemWriter writer = new CustomCompositeItemWriter();

        writer.setDelegate(barWriter());

        return writer;
}

@Bean
public BarWriter barWriter() {
        return new BarWriter();
}

1.6. Flat Files

バルクデータ連携の最も一般的な方式の一つはフラットファイルです。その構造(XSD)定義に対する合意に基づくXMLとは異なり、フラットファイルを読む側は前もってその構造を把握しておく必要があります。通常、フラットファイルは以下2つのどちらかのタイプ、デリミタか固定長、になります。デリミタファイルはデリミタ、カンマなど、でフィールドを区切ります。固定長ファイルはフィールド長が決まっています。

1.6.1. The FieldSet

Spring Batchでフラットファイルを処理する場合、入力・出力を問わず、最も重要なクラスの一つがFieldSetです。世の中多数のアーキテクチャとライブラリがファイル読み込み機能を提供しますが、これらは通常Stringかその配列を返します。これはやりたい事の半分でしかありません。FieldSetはファイルリソースとフィールドのバインディングを行うためのSpring Batchの機能です。これにより、DB入力と同じ方法でファイル入力も扱えます。FieldSetはコンセプト的にはJDBCResultSetと似ています。FieldSetは単一の引数、String配列のトークン、を取ります。オプションで、フィールド名も設定可能で、このフィールドはインデックスか名前のどちらかでアクセスします。

String[] tokens = new String[]{"foo", "1", "true"};
FieldSet fs = new DefaultFieldSet(tokens);
String name = fs.readString(0);
int value = fs.readInt(1);
boolean booleanValue = fs.readBoolean(2);

FieldSetには、Date, BigDecimalなどのオプションが多数存在します。FieldSetの最大の利点はフラットファイル入力のパースに一貫性を持たせられる点です。バッチジョブのパースをそれぞれ好き勝手に異なる方法でするより、フォーマット例外のハンドリングや、シンプルなデータ変換において、一貫性を持たせられます。

1.6.2. FlatFileItemReader

フラットファイルはおおむね2次元(表形式)データを持ちます。Spring Batchフレームワークでのフラットファイル読み込みはFlatFileItemReaderを呼ぶクラスで設定します。FlatFileItemReaderはフラットファイルの読み込みとパースの基本的な機能を提要します。FlatFileItemReaderの2つの重要な依存性はResourceLineMapperです。LineMapperは次のセクションで解説します。resourceプロパティはSpring CoreのResourceです。この型のbeanの生成方法のドキュメントはSpring Framework, Chapter 5. Resourcesにあります。よって、このガイドではResourceオブジェクトの生成については以下のシンプルな例に留めます。

Resource resource = new FileSystemResource("resources/trades.csv");

複雑なバッチ環境では、ディレクトリ構造はEAIインフラで管理する事が多く、ここでは、外部インターフェース向けのdrop zoneはFTPアップロード先からバッチ処理ディレクトリまたはその逆へのファイル移動で確立します。ファイル移動ユーティリティはSpring Batchのスコープ外ですが、ジョブストリームのステップにファイル移動ユーティリティを使う事はよくあります。バッチアーキテクチャは処理対象のファイルの場所だけは知っている必要があります。Spring Batchは開始ポイントからパイプへのデータフィード処理を開始します。なお、Spring Integrationはこの種のサービスを多数提供します。

FlatFileItemReaderのその他のプロパティには、以下表のような、データ処理方法の指定が可能です。

Table 1. FlatFileItemReader Properties

Property Type Description
comments String[] コメント行を示すline prefixesを指定
encoding String テキストエンコーディングを指定。デフォルトはCharset.defaultCharset()
lineMapper LineMapper StringからアイテムのObjectに変換
linesToSkip int ファイル先頭から無視する行数
recordSeparatorPolicy RecordSeparatorPolicy 行端識別に使用し、クオートで囲む1レコードを複数行にまたがらせたい場合などに使用
resource Resource 読込対象リソース
skippedLinesCallback LineCallbackHandler スキップファイル行を受けるインタフェース。linesToSkipが2の場合このインタフェースは2回呼ばれる。
strict boolean strictモードでは入力リソースが存在しない場合readerはExecutionContextに 例外をスローする。そうでない場合、ログ出力して処理継続する。

LineMapper

RowMapperResultSetなどの低レベル要素を受け取りObjectを返すのと同様、フラットファイルの処理ではStringの行をObjectに変換します。

public interface LineMapper<T> {

    T mapLine(String line, int lineNumber) throws Exception;

}

基本要素として、現在行とその行番号があり、このmapperはドメインオブジェクトを返します。RowMapper同様、行番号とそれに関連付けられたResultSetの各行のように、行番号と各行を持ちます。一意確認用にドメインジェクトと行番号を関連付けたり、ログに行番号を含めたりが出来ます。ただし、RowMapperと異なり、LineMapperには生データの行が渡されるので、前述のように、これだけでは不十分です。本ドキュメント後半で解説するように、オブジェクトにマッピング可能なFieldSetに行をトークンで分割してください。

LineTokenizer

入力行をFieldSet````に変換するインタフェースが必要です。FieldSetに変換したフラットファイルデータのフォーマットは多数考えられるためです。Spring Batchでは、それ用のインタフェースがLineTokenizer```です。

public interface LineTokenizer {

    FieldSet tokenize(String line);

}

LineTokenizerの役割は、入力行(理論上Stringには複数行含める事も可能)を与えると、その行に対するFieldSetを返します。FieldSetFieldSetMapperに渡します。Spring Batchは以下のLineTokenizer実装を提供します。

  • DelimitedLineTokenizer: レコードのフィールドがデリミタで区切られているファイルで使用する。最も一般的なのはカンマで、pipeやセミコロンもよく使います。
  • FixedLengthTokenizer: レコードのフィールドが固定長のファイルで使用する。各フィールド長は個々のレコードタイプごとに定義する。
  • PatternMatchingCompositeLineTokenizer: 行がパターンにマッチすると対応するLineTokenizerを使用する。

FieldSetMapper

FieldSetMapperには単一メソッドmapFieldSetがあり、FieldSetを取りオブジェクトにマッピングします。このオブジェクトは、job仕様に応じて、DTOドメインオブジェクト・配列などになります。FieldSetMapperLineTokenizerと組み合わせて、リソースから適当な型にデータ行を変換するために使います。インタフェース定義は以下の通りです。

public interface FieldSetMapper<T> {

    T mapFieldSet(FieldSet fieldSet) throws BindException;

}

JdbcTemplateRowMapperと同じようなパターンになっています。

DefaultLineMapper

これまでの解説でフラットファイル読み込みのための基本的なインターフェース定義を見てきました。3つの基本的なステップが明確になりました。

  1. ファイルから1行読み込む。
  2. StringLineTokenizer#tokenize()に渡してFieldSetを取得する。
  3. トークン処理が返すFieldSetFieldSetMapperに渡し、ItemReader#read()が結果を返す。

上述の2つのインタフェースが2つのタスク、FieldSet変換とFieldSetからドメインオブジェクトへのマッピング、を分離しています。LineTokenizerの入力はLineMapper(行)の入力と対応関係にあり、FieldSetMapperの出力はLineMapperの出力と対応関係があるので、LineTokenizerFieldSetMapperの両方を使うデフォルト実装を用意しています。DefaultLineMapperは、以下のようなクラス定義で、大抵はこの振る舞いで十分です。

public class DefaultLineMapper<T> implements LineMapper<>, InitializingBean {

    private LineTokenizer tokenizer;

    private FieldSetMapper<T> fieldSetMapper;

    public T mapLine(String line, int lineNumber) throws Exception {
        return fieldSetMapper.mapFieldSet(tokenizer.tokenize(line));
    }

    public void setLineTokenizer(LineTokenizer tokenizer) {
        this.tokenizer = tokenizer;
    }

    public void setFieldSetMapper(FieldSetMapper<T> fieldSetMapper) {
        this.fieldSetMapper = fieldSetMapper;
    }
}

上述のデフォルト実装の機能は、(以前のバージョンのように)reader自体に組み込むのではなく、行を直接処理する場合のパース処理に高い柔軟性をユーザに提供しています。

Simple Delimited File Reading Example

以下は実際のシナリオに沿ってフラットファイルを読み込む方法の例の解説です。このバッチジョブは以下のファイルからフットボールプレイヤーを読み込みます。

ID,lastName,firstName,position,birthYear,debutYear
"AbduKa00,Abdul-Jabbar,Karim,rb,1974,1996",
"AbduRa00,Abdullah,Rabih,rb,1975,1999",
"AberWa00,Abercrombie,Walter,rb,1959,1982",
"AbraDa00,Abramowicz,Danny,wr,1945,1967",
"AdamBo00,Adams,Bob,te,1946,1969",
"AdamCh00,Adams,Charlie,wr,1979,2003"

ファイルの中身は以下のPlayerドメインオブジェクトにマッピングします。

public class Player implements Serializable {

    private String ID;
    private String lastName;
    private String firstName;
    private String position;
    private int birthYear;
    private int debutYear;

    public String toString() {
        return "PLAYER:ID=" + ID + ",Last Name=" + lastName +
            ",First Name=" + firstName + ",Position=" + position +
            ",Birth Year=" + birthYear + ",DebutYear=" +
            debutYear;
    }

    // setters and getters...
}

FieldSetPlayerマッピングするには、以下のように、プレイヤーを返すFieldSetMapperを定義します。

protected static class PlayerFieldSetMapper implements FieldSetMapper<Player> {
    public Player mapFieldSet(FieldSet fieldSet) {
        Player player = new Player();

        player.setID(fieldSet.readString(0));
        player.setLastName(fieldSet.readString(1));
        player.setFirstName(fieldSet.readString(2));
        player.setPosition(fieldSet.readString(3));
        player.setBirthYear(fieldSet.readInt(4));
        player.setDebutYear(fieldSet.readInt(5));

        return player;
    }
}

次に、FlatFileItemReaderを正しく設定してreadを呼ぶことで、ファイルを読み込みます。

FlatFileItemReader<Player> itemReader = new FlatFileItemReader<Player>();
itemReader.setResource(new FileSystemResource("resources/players.csv"));
//DelimitedLineTokenizer defaults to comma as its delimiter
DefaultLineMapper<Player> lineMapper = new DefaultLineMapper<Player>();
lineMapper.setLineTokenizer(new DelimitedLineTokenizer());
lineMapper.setFieldSetMapper(new PlayerFieldSetMapper());
itemReader.setLineMapper(lineMapper);
itemReader.open(new ExecutionContext());
Player player = itemReader.read();

readはファイルの各行を基にPlayerオブジェクトを返します。EOFに達するとnullを返します。

Mapping Fields by Name

DelimitedLineTokenizerFixedLengthTokenizerの双方で使用可能な機能があり、JDBCResultSetと似たような機能を持ちます。フィールド名をLineTokenizerに設定してマッピング関数の可読性を上げられます。まず、フラットファイルの全フィールドのカラム名をtokenizerに設定します。以下がその例です。

tokenizer.setNames(new String[] {"ID", "lastName","firstName","position","birthYear","debutYear"});

FieldSetMapperは以下のように上記のカラム名を使います。

public class PlayerMapper implements FieldSetMapper<Player> {
    public Player mapFieldSet(FieldSet fs) {

       if(fs == null){
           return null;
       }

       Player player = new Player();
       player.setID(fs.readString("ID"));
       player.setLastName(fs.readString("lastName"));
       player.setFirstName(fs.readString("firstName"));
       player.setPosition(fs.readString("position"));
       player.setDebutYear(fs.readInt("debutYear"));
       player.setBirthYear(fs.readInt("birthYear"));

       return player;
   }
}

Automapping FieldSets to Domain Objects

多くの場合、FieldSetMapperを書くことはJdbcTemplateRowMapper書くことと同じくらいに面倒です。名前とマッチするフィールドに、JavaBeanのsetterを用いて、自動マッピングするFieldSetMapperをSpring Batchは提供しています。再度フットボールの例にとると、BeanWrapperFieldSetMapperは以下のようになります。

Java Configuration

@Bean
public FieldSetMapper fieldSetMapper() {
        BeanWrapperFieldSetMapper fieldSetMapper = new BeanWrapperFieldSetMapper();

        fieldSetMapper.setPrototypeBeanName("player");

        return fieldSetMapper;
}

@Bean
@Scope("prototype")
public Player player() {
        return new Player();
}

FieldSetの各エントリに対し、マッパーはPlayer新規インスタンス(このためprototypeスコープが飛鳥。)の対応するsetterを参照します。これはSpringコンテナがプロパティ名にマッチするsetterを参照するのと同様です。FieldSetの使用可能なフィールドをマッピングし、Playerオブジェクトを返します。上記設定以外のコードは不要です。

Fixed Length File Formats

これまではデリミタファイルの詳細のみ解説してきました。しかし、それだけでは片手落ちです。固定長フォーマットのフラットファイルを使用する組織は数多く存在します。固定長の例は以下の通りです。

UK21341EAH4121131.11customer1
UK21341EAH4221232.11customer2
UK21341EAH4321333.11customer3
UK21341EAH4421434.11customer4
UK21341EAH4521535.11customer5

1つの大きなフィールドに見えますが、実際には4つの独立したフィールドです。

  1. ISIN: 注文商品の一意識別子 - 12文字
  2. Quantity: 個数 - 3文字
  3. Price: 価格 - 5文字
  4. Customer: 顧客ID - 9文字

FixedLengthLineTokenizerを設定する場合、以下例のように、長さをレンジ形式で指定する必要があります。

※ レンジ形式は専用のproperty editorのRangeArrayPropertyEditorApplicationContextに入れる必要があります。ただし、このbeanはbatch namespaceを使う場合はApplicationContextに自動設定されます。

Java Configuration

@Bean
public FixedLengthTokenizer fixedLengthTokenizer() {
        FixedLengthTokenizer tokenizer = new FixedLengthTokenizer();

        tokenizer.setNames("ISIN", "Quantity", "Price", "Customer");
        tokenizer.setColumns(new Range(1-12),
                                                new Range(13-15),
                                                new Range(16-20),
                                                new Range(21-29));

        return tokenizer;
}

FixedLengthLineTokenizerはこれまでに説明してきたLineTokenizerの一種なので、デリミタの場合と同様にFieldSetを返します。その出力処理についても同様で、BeanWrapperFieldSetMapperなどが使えます。

Multiple Record Types within a Single File

これまでのファイル読み込みサンプルは説明簡略化のため、ファイルの全レコードが同一フォーマットである、という仮定を置いていました。しかし、そうでない場合もあります。各レコードの異なるフォーマットに異なるトークン処理をして異なるオブジェクトにマッピングするファイルもありえます。以下はそうしたファイルの抜粋です。

USER;Smith;Peter;;T;20014539;F
LINEA;1044391041ABC037.49G201XX1383.12H
LINEB;2134776319DEF422.99M005LI

このファイルには3種類のレコード、USER", "LINEA", and "LINEB"、があります。"USER"はUserに相当し、"LINEA"と"LINEB"は共にLineに相当し、"LINEA"は"LINEB"よりも多くのデータを持ちます。

ItemReaderは個々の行を独立に読みますが、ItemWriterが正しくアイテムを受け取れるように、各行に対して異なるLineTokenizerFieldSetMapperを指定する必要があります。PatternMatchingCompositeLineMapperは、パターンとLineTokenizerマッピングおよびパターンとFieldSetMapperマッピング、を指定することで、これを簡単に設定できます。

Java Configuration

@Bean
public PatternMatchingCompositeLineMapper orderFileLineMapper() {
        PatternMatchingCompositeLineMapper lineMapper =
                new PatternMatchingCompositeLineMapper();

        Map<String, LineTokenizer> tokenizers = new HashMap<>(3);
        tokenizers.put("USER*", userTokenizer());
        tokenizers.put("LINEA*", lineATokenizer());
        tokenizers.put("LINEB*", lineBTokenizer());

        lineMapper.setTokenizers(tokenizers);

        Map<String, FieldSetMapper> mappers = new HashMap<>(2);
        mappers.put("USER*", userFieldSetMapper());
        mappers.put("LINE*", lineFieldSetMapper());

        lineMapper.setFieldSetMappers(mappers);

        return lineMapper;
}

この例では、"LINEA"と"LINEB" はそれぞれ別のLineTokenizerですが、FieldSetMapperは同じものを使います。

PatternMatchingCompositeLineMapperPatternMatcher#matchで各行に対する委譲先の選択を行います。PatternMatcherには2つのワイルドカード特殊文字が使えます。クエスチョンマーク("?")は1文字のみにマッチし、アスタリスク("")はゼロ文字以上にマッチします。上の設定例では、すべてのパターンの最後にアスタリスクを付与しており、各行のプレフィックスとマッチするようにしています。PatternMatcherは、設定順序に関わらず、常に可能な限り最も一致するパターンにマッチします(matches the most specific pattern possible)。このため、"LINE"と"LINEA"がパターンリストにある場合、"LINEA"はパターン"LINEA"にマッチし、"LINEB"はパターン"LINE"にマッチします。なお、アスタリスクのみ("")はデフォルトとして振る舞い、他パターンがマッチしないすべての行にマッチします。

Java Configuration

...
tokenizers.put("*", defaultLineTokenizer());
...

また、トークン処理にPatternMatchingCompositeLineTokenizerを単独で使う事も可能です。

複数行にまたがるレコードを持つフラットファイルもあります。これの対処には、さらに複雑な方法が必要となります。このための一般的なパターンの解説はmultiLineRecordsのサンプルコードにあります。

Exception Handling in Flat Files

トークン処理が例外をスローするケースは多数考えられます。フラットファイルが不完全で不正確なフォーマットのレコードを持つ場合があります。大抵の場合は、ログに問題・オリジナルの行・行番後を出力し、エラー行をスキップします。ログは後々に別のバッチジョブや手動調査に使います。このため、Spring Batchはパース例外処理、FlatFileParseExceptionFlatFileFormatException、の例外の階層を持っています。ファイル読み込み時に何らかのエラーが発生する場合、FlatFileItemReaderFlatFileParseExceptionをスローします。LineTokenizer実装はトークン処理中に発生したエラーを示すFlatFileFormatExceptionをスローします。

IncorrectTokenCountException

DelimitedLineTokenizerFixedLengthLineTokenizerFieldSetの生成にカラム名を指定します。しかし、カラム名の個数がトークン処理時にマッチしない場合、FieldSetは生成できず、トークンの個数と期待値を持つIncorrectTokenCountExceptionをスローします。

tokenizer.setNames(new String[] {"A", "B", "C", "D"});

try {
    tokenizer.tokenize("a,b,c");
}
catch(IncorrectTokenCountException e){
    assertEquals(4, e.getExpectedCount());
    assertEquals(3, e.getActualCount());
}

tokenizerには4つのカラム名を設定していますが、ファイルからは3トークンしか検出出来ないと、IncorrectTokenCountExceptionをスローします。

IncorrectLineLengthException

固定長フォーマットの場合、デリミタとは異なり、パース時に各カラムが厳密に定義した長さに従う必要があります。行の長さが異なる場合、例外をスローします。

tokenizer.setColumns(new Range[] { new Range(1, 5),
                                   new Range(6, 10),
                                   new Range(11, 15) });
try {
    tokenizer.tokenize("12345");
    fail("Expected IncorrectLineLengthException");
}
catch (IncorrectLineLengthException ex) {
    assertEquals(15, ex.getExpectedLength());
    assertEquals(5, ex.getActualLength());
}

上記のtokenizerの設定範囲は1-5, 6-10, and 11-15です。よって、行の合計の長さは15です。しかし、上の例では、長さ5の行が渡され、IncorrectLineLengthExceptionをスローします。最初のカラムのみマッピングするよりも、例外スローによって行処理を早めに失敗させる事が可能となり、FieldSetMapperで2カラム目を読み込む際にエラーにするよりも多くの情報を返せます。しかし、行の長さが常に一定ではないケースも存在します。このため、行の長さのvalidationは'strict'プロパティによりオフにできます。

tokenizer.setColumns(new Range[] { new Range(1, 5), new Range(6, 10) });
tokenizer.setStrict(false);
FieldSet tokens = tokenizer.tokenize("12345");
assertEquals("12345", tokens.readString(0));
assertEquals("", tokens.readString(1));

上の例は、okenizer.setStrict(false)以外、一つ前の例とおおむね同一です。この設定により、tokenizerで行のトークン処理時に長さチェックをしなくなります。これでFieldSetは正しく生成されて返されます。なお、残りの値については空のトークンになります。

1.6.3. FlatFileItemWriter

フラットファイル書き込みには読み込みと同じ解決すべき問題と課題があります。transactionalにデリミタあるいは固定長フォーマットで書き込み可能なstepにする必要があります。

LineAggregator

LineTokenizer同様、アイテムをStringに変換する必要があり、ファイルへ書き込むために複数フィールドを単一の文字列へ集約する必要があります。Spring Batchでは、これはLineAggregatorが担います。インタフェース定義は以下の通りです。

public interface LineAggregator<T> {

    public String aggregate(T item);

}

LineAggregatorLineTokenizerは論理的な対応関係にあります。LineTokenizerStringFieldSetにし、一方、LineAggregatoritemStringにします。

PassThroughLineAggregator

LineAggregatorの一番簡単な実装はPassThroughLineAggregatorで、オブジェクトがすでに文字列であるか、オブジェクトの文字列表現を書き込んでも問題無い、と想定できる場合に使います。

public class PassThroughLineAggregator<T> implements LineAggregator<T> {

    public String aggregate(T item) {
        return item.toString();
    }
}

上の実装は、文字列生成を直接制御する必要はあるものの、FlatFileItemWriterの利点であるトランザクションやリスタート機能などは必要な場合に便利です。

Simplified File Writing Example

LineAggregatorインタフェースとその一番簡単な実装のPassThroughLineAggregatorを見たので書き込みの基本的なフローを解説します。

  1. 書き込むオブジェクトをLineAggregatorに渡してStringを得る。
  2. 返されたStringが設定したファイルに書き込まれる。

以下のFlatFileItemWriterの抜粋がそのコード部分です。

public void write(T item) throws Exception {
    write(lineAggregator.aggregate(item) + LINE_SEPARATOR);
}

簡単な設定例は以下の通りです。

Java Configuration

@Bean
public FlatFileItemWriter itemWriter() {
        return  new FlatFileItemWriterBuilder<Foo>()
                                   .name("itemWriter")
                                   .resource(new FileSystemResource("target/test-outputs/output.txt"))
                                   .lineAggregator(new PassThroughLineAggregator<>())
                                   .build();
}

FieldExtractor

上の例はファイル書き込みの一番簡単な使用法として有用です。しかし、FlatFileItemWriterのユーザは基本的にはドメインオブジェクトを書き込む必要があるため、これを行に変換する必要があります。ファイル読み込みでは、以下が必要でした。

  1. ファイルから1行読み込む。
  2. その行をLineTokenizer#tokenize()に渡してFieldSetを得る。
  3. トークン処理結果のFieldSetFieldSetMapperに渡し、ItemReader#read()が結果を返す。

ファイル書き込みも似た構造になりますが手順は逆になります。

  1. writerに書き込むアイテムを渡す。
  2. アイテムのフィールドを配列に変換。
  3. 配列を行に集約する。

フレームワークは書き込みたいオブジェクトのフィールドを知らないので、アイテムを配列に変換するタスク用のFieldExtractorを指定する必要があります。インタフェース定義は以下になります。

public interface FieldExtractor<T> {

    Object[] extract(T item);

}

FieldExtractorの実装はオブジェクトのフィールドから配列を生成し、デリミタ区切りの要素や固定長の行の一部にこの配列を使用します。

PassThroughFieldExtractor

配列・CollectionFieldSetなどコレクションを書き込む必要があるケースは多くあります。これらのコレクション型から配列を"抽出"するのは極めて単純で、コレクションから配列に変換します。よって、PassThroughFieldExtractorはそういう場合に使います。なお、非コレクション型のオブジェクトを渡す場合、PassThroughFieldExtractorはそのアイテムのみを含む配列を返します。

BeanWrapperFieldExtractor

前述の読み込みセクションのBeanWrapperFieldSetMapper同様、変換を自分で書くのではなく、ドメインオブジェクトを配列に変換したい場合に使います。BeanWrapperFieldExtractorのこの機能は以下の例のように使います。

BeanWrapperFieldExtractor<Name> extractor = new BeanWrapperFieldExtractor<Name>();
extractor.setNames(new String[] { "first", "last", "born" });

String first = "Alan";
String last = "Turing";
int born = 1912;

Name n = new Name(first, last, born);
Object[] values = extractor.extract(n);

assertEquals(first, values[0]);
assertEquals(last, values[1]);
assertEquals(born, values[2]);

extractor実装には必須プロパティが1つだけあり、マッピング用に使うフィールド名を渡しますBeanWrapperFieldSetMapperはオブジェクトのsetterにFieldSetマッピングするのにフィールド名を使います。BeanWrapperFieldExtractorはオブジェクト配列を生成するためにgetterとフィールド名の配列とをマッピングします。フィールド名配列の順序が配列内のフィールドの順序となる点に注意してください。

Delimited File Writing Example

一番簡単なフラットファイルフォーマットはすべてのフィールドがデリミタで区切ったものです。この場合はDelimitedLineAggregatorを使います。以下の例はクレジットと顧客アカウントを表すシンプルなオブジェクトを出力する例です。

public class CustomerCredit {

    private int id;
    private String name;
    private BigDecimal credit;

    //説明簡略化のためgetters,settersは省略
}

ドメインオブジェクトを使うため、FieldExtractor実装をデリミタ指定で必要があります。

Java Configuration

@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
        BeanWrapperFieldExtractor<CustomerCredit> fieldExtractor = new BeanWrapperFieldExtractor<>();
        fieldExtractor.setNames(new String[] {"name", "credit"});
        fieldExtractor.afterPropertiesSet();

        DelimitedLineAggregator<CustomerCredit> lineAggregator = new DelimitedLineAggregator<>();
        lineAggregator.setDelimiter(",");
        lineAggregator.setFieldExtractor(fieldExtractor);

        return new FlatFileItemWriterBuilder<CustomerCredit>()
                                .name("customerCreditWriter")
                                .resource(outputResource)
                                .lineAggregator(lineAggregator)
                                .build();
}

上の例では、このチャプター前半で解説したBeanWrapperFieldExtractorCustomerCreditのnameとcreditフィールドを配列に変換し、各フィールドをカンマで書き込みます。

なお、以下例のように、FlatFileItemWriterBuilder.DelimitedBuilderBeanWrapperFieldExtractorDelimitedLineAggregatorを内部的に自動生成する使い方も可能です。

Java Configuration

@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
        return new FlatFileItemWriterBuilder<CustomerCredit>()
                                .name("customerCreditWriter")
                                .resource(outputResource)
                                .delimited()
                                .delimiter("|")
                                .names(new String[] {"name", "credit"})
                                .build();
}

Fixed Width File Writing Example

デリミタがフラットファイルの唯一のフォーマットではありません。固定長という、フィールドの区切りに各カラムが長さを持つ場合もあります。Spring BatchはFormatterLineAggregatorで固定長を扱います。上の説明で使用したCustomerCreditの場合、以下のように使います。

Java Configuration

@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
        BeanWrapperFieldExtractor<CustomerCredit> fieldExtractor = new BeanWrapperFieldExtractor<>();
        fieldExtractor.setNames(new String[] {"name", "credit"});
        fieldExtractor.afterPropertiesSet();

        FormatterLineAggregator<CustomerCredit> lineAggregator = new FormatterLineAggregator<>();
        lineAggregator.setFormat("%-9s%-2.0f");
        lineAggregator.setFieldExtractor(fieldExtractor);

        return new FlatFileItemWriterBuilder<CustomerCredit>()
                                .name("customerCreditWriter")
                                .resource(outputResource)
                                .lineAggregator(lineAggregator)
                                .build();
}

今までに見てきた例とほぼ同じですが、formatプロパティ値はここで初めて登場します。

...
FormatterLineAggregator<CustomerCredit> lineAggregator = new FormatterLineAggregator<>();
lineAggregator.setFormat("%-9s%-2.0f");
...

これの内部実装はJava 5で導入されたFormatterをベースにしています。JavaFormatterC言語printfをベースにしています。formatter設定の詳細についてはFormatterを参照してください。

なお、以下例のように、FlatFileItemWriterBuilder.FormattedBuilderBeanWrapperFieldExtractorFormatterLineAggregatorを内部的に自動生成する使い方も可能です。

Java Configuration

@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
        return new FlatFileItemWriterBuilder<CustomerCredit>()
                                .name("customerCreditWriter")
                                .resource(outputResource)
                                .formatted()
                                .format("%-9s%-2.0f")
                                .names(new String[] {"name", "credit"})
                                .build();
}

Handling File Creation

FlatFileItemReaderとファイルリソースとの関係は極めてシンプルです。readerを初期化すると、(存在する)ファイルをオープンし、無ければ例外をスローします。ファイル書き込みはこのようにシンプルではありません。一見、FlatFileItemWriterにも同様なシンプルな関係があるように思えます。ファイルがあれば例外をスローし、無ければ生成して書き込みを開始する。ただし、Jobリスタートがあり得る場合は問題を起こす可能性があります。通常のリスタートでは、ファイルが存在する場合、最終ポジションから書き込みを開始し、存在しなければ例外をスローします。しかし、このjobのファイル名が常に同一の場合どうなるでしょうか? この場合、リスタートで無ければ、存在するファイルを削除したいと考えるはずです。こういった課題があるため、FlatFileItemWriterにはshouldDeleteIfExistsプロパティがあります。このプロパティをtrueにするとwriterオープン時に同一名のファイルが存在すれば削除します。