kagamihogeの日記

kagamihogeの日記です。

Spring Batch 4.1.x - Reference Documentation - Configuring a Stepのテキトー翻訳

https://docs.spring.io/spring-batch/4.1.x/reference/html/step.html#configureStep

https://qiita.com/kagamihoge/items/12fbbc2eac5b8a5ac1e0 俺の訳一覧リスト

1. Configuring a Step

ドメインのチャプターで解説したように、Stepは、バッチjobのシーケンシャルで独立したフェーズをカプセル化したもので、バッチ処理の制御と定義に必要なすべての情報を持ちます。Stepの中身はJobを書く開発者次第なため、やや曖昧な説明になっています。Stepは開発者次第でシンプルにも複雑にもなります。シンプルなStepはファイルからDBにロードし、要求されるコードは少しかゼロです(実装次第)。複雑なStepは、以下イメージ図のような、複雑なビジネスルール処理の一部になるものがあります。

Figure 1. Step

1.1. Chunk-oriented Processing

Spring Batchは一般的な実装スタイルのうちチャンク指向処理スタイルを使います。チャンク指向の処理は、1度に1データ読み取ってトランザクション境界内で書き込むチャンクを作成します。ItemReaderから1アイテム読み、ItemProcessorに渡し、集約します。読み込んだアイテム数がコミット間隔に達すると、ItemWriterがチャンクを書き込み、トランザクションをコミットします。以下のイメージが一連の処理です。

Figure 2. Chunk-oriented Processing

以下のコードは概念を示すコードです。

List items = new Arraylist();
for(int i = 0; i < commitInterval; i++){
    Object item = itemReader.read()
    Object processedItem = itemProcessor.process(item);
    items.add(processedItem);
}
itemWriter.write(items);

1.1.1. Configuring a Step

Stepの依存性のリストは比較的短いですが、多くの関連クラスを持つ場合があるので非常に複雑です。

java設定の場合、以下例のように、Spring Batchのビルダーが使えます。

Java Configuration

/**
 * JobRepositoryは基本的にはautowiredされるので明示的な設定は不要です。
 */
@Bean
public Job sampleJob(JobRepository jobRepository, Step sampleStep) {
    return this.jobBuilderFactory.get("sampleJob")
                            .repository(jobRepository)
                .start(sampleStep)
                .build();
}

/**
 * TransactionManagerは基本的にはautowiredされるので明示的な設定は不要です。
 */
@Bean
public Step sampleStep(PlatformTransactionManager transactionManager) {
        return this.stepBuilderFactory.get("sampleStep")
                                .transactionManager(transactionManager)
                                .<String, String>chunk(10)
                                .reader(itemReader())
                                .writer(itemWriter())
                                .build();
}

上の設定はアイテム指向stepを作るのに必要な依存性だけを入れています。

  • reader: processorにアイテムを与えるItemReader
  • writer: ItemReaderからのアイテムを処理するItemWriter
  • transactionManager: トランザクションの開始とコミットをするSpringのPlatformTransactionManager
  • repository: JobRepositoryは定期的にStepExecutionExecutionContextを保存する(コミットの直前)。
  • chunk: ここではアイテムベースのstepを意味し、トランザクションコミットの処理アイテム数を指定している。

なお、repositoryのデフォルトのbeanはjobRepositorytransactionManagertransactionManger(どちらも@EnableBatchProcessingで自動設定されます)。また、ItemProcessorは無くても良く、これによりアイテムをreaderからwriterに直接渡せます。

1.1.3. The Commit Interval

前述の通り、stepはアイテムを読んで書き込み、設定したPlatformTransactionManagerで定期的にコミットします。commit-intervalが1の場合、1アイテム書き込むたびにコミットします。ただ、トランザクションの開始とコミットは高コストなので、多くの場合にこれは理想的とは言えません。理想的には、各トランザクションでなるべく多数のアイテム処理するのが好ましく、これはstepが相互作用するリソースと処理データの特性に完全に依存します。このため、あるコミットで処理するアイテム数は変更可能です。以下の例はcommit-intervalが10のtaskletstepです。

Java Configuration

@Bean
public Job sampleJob() {
    return this.jobBuilderFactory.get("sampleJob")
                     .start(step1())
                     .end()
                     .build();
}

@Bean
public Step step1() {
        return this.stepBuilderFactory.get("step1")
                                .<String, String>chunk(10)
                                .reader(itemReader())
                                .writer(itemWriter())
                                .build();
}

上の例では、各トランザクションで10アイテム処理します。処理開始時にトランザクションが開始します。そして、各アイテムをItemReaderreadで読むたびにカウンターがインクリメントします。10に達すると、集約アイテムのリストをItemWriterに渡し、トランザクションをコミットします。

Configuring a Step for Restart

Configuring and Running a JobではJobのリスタートについて解説しました。リスタートは様々な影響をstepに与えるため、従って、何らかの設定が必要となります。

Setting a Start Limit

Stepの最大実行回数を制御したい場合があります。たとえば、ある特定のStepがリソースを無効化して再実行前に手動で戻す必要があるので、一度だけ実行するようにしたい、などです。設定はstepレベルで行い、異なるstepで異なる設定が可能です。一度しか実行出来ないStepは、制限無しのStepがあるJobと一緒に入れられます。以下のコード例はstart limit設定の例です。

Java Configuration

@Bean
public Step step1() {
        return this.stepBuilderFactory.get("step1")
                                .<String, String>chunk(10)
                                .reader(itemReader())
                                .writer(itemWriter())
                                .startLimit(1)
                                .build();
}

上のstepは一度だけ実行可能です。再実行するとStartLimitExceededExceptionをスローします。なお、start-limitのデフォルト値はInteger.MAX_VALUEです。

Restarting a Completed Step

restartable jobの場合、初めは成功したかどうかに関わらず、常に実行したいstepがある場合があります。validation stepや処理前にリソースのクリーンアップをするStepなどです。restarted jobの通常処理では、正常終了したことを示す'COMPLETED'ステータスとなり、これはスキップします。allow-start-if-completeを"true"に設定するとその挙動をオーバーライドして常に実行するようになります。以下が例です。

Java Configuration

@Bean
public Step step1() {
        return this.stepBuilderFactory.get("step1")
                                .<String, String>chunk(10)
                                .reader(itemReader())
                                .writer(itemWriter())
                                .allowStartIfComplete(true)
                                .build();
}

Step Restart Configuration Example

以下はリスタート可能なstepを持つjobの設定例です。

Java Configuration

@Bean
public Job footballJob() {
        return this.jobBuilderFactory.get("footballJob")
                                .start(playerLoad())
                                .next(gameLoad())
                                .next(playerSummarization())
                                .end()
                                .build();
}

@Bean
public Step playerLoad() {
        return this.stepBuilderFactory.get("playerLoad")
                        .<String, String>chunk(10)
                        .reader(playerFileItemReader())
                        .writer(playerWriter())
                        .build();
}

@Bean
public Step gameLoad() {
        return this.stepBuilderFactory.get("gameLoad")
                        .allowStartIfComplete(true)
                        .<String, String>chunk(10)
                        .reader(gameFileItemReader())
                        .writer(gameWriter())
                        .build();
}

@Bean
public Step playerSummarization() {
        return this.stepBuilderFactor.get("playerSummarization")
                        .startLimit(2)
                        .<String, String>chunk(10)
                        .reader(playerSummarizationSource())
                        .writer(summaryWriter())
                        .build();
}

上の設定例のjobは、フットボールのデータロードしてサマリをします。3つのstep、playerLoad, gameLoad, playerSummarizationがあります。playerLoadはフラットファイルからプレイヤーデータをロードし、gameLoadは同様にゲームデータをロードします。最後に、playerSummarizationは、ゲームをベースに各プレイヤーの統計を出力します。ここでの想定は、playerLoadのファイルロードは一度だけにしたいが、gameLoadはゲームデータをディレクトリからファイルで取得し、DBに正常ロード後はそのファイルを削除する、とします。この場合、playerLoadは特に設定を必要としません。何度でも実行可能ですが、completeになると、以降はスキップします。しかしgameLoadは最終実行以降にファイルが追加された場合は都度実行の必要があります。毎回実行するために'allow-start-if-complete'を'true'にしています。(DBのgamesテーブルはprocess indicatorを持ち、次のsummarization stepでその新規ゲームデータを参照可能になっている、という想定)。summarization stepはこのjobで最も重要で、start limitは2にしています。stepが連続して失敗し、job実行を制御するオペレータにexit codeを返します。そして、手動変更しない限り再開は出来なくなります。

※ 本ドキュメントのjob例はサンプルプロジェクトのfootballJobとは異なります。

このセクションのまとめとしてfootballJobの例を3回実行したときに起きることを解説します。

Run 1:

  1. playerLoadを実行すると正常終了し、'PLAYERS'テーブルに400プレイヤー追加する。
  2. gameLoadを実行するとゲームデータの11ファイルを処理し、'GAMES'テーブルにロードする。
  3. playerSummarizationを開始すると5分後に失敗する。

Run 2:

  1. playerLoadは既に正常終了済みなので実行しない。allow-start-if-completeは'false'(デフォルト)
  2. gameLoadは再実行して更に別の2ファイルを処理し、前回様に'GAMES'テーブルにロードする(このデータについてはprocess indicatorが未処理となる)。
  3. playerSummarizationはすべての残ゲームデータ(process indicatorでフィルタリング)を処理開始して30分後に失敗する。

Run 3:

  1. playerLoadは既に正常終了済みなので実行しない。allow-start-if-completeは'false'(デフォルト)
  2. gameLoadは再実行して更に別の2ファイルを処理し、前回様に'GAMES'テーブルにロードする(このデータについてはprocess indicatorが未処理となる)。
  3. playerSummarizationは開始せずjobが即時killされる。この時点でplayerSummarizationは3回目で、リミットが2なため。リミットを上げるか、そのJobで新規のJobInstanceを作る必要がある。

1.1.5. Configuring Skip Logic

エラーが発生してもStepを失敗にはせず、代わりにスキップしたい場合があります。基本的には、そこでのデータや事象の意味を理解する人が決定します。たとえば、金融データでは、送金は完全に実行する必要あるので、スキップ可能でないものもあります。一方、ベンダーリストのロードは、スキップ可能な場合があります。フォーマット間違いや必須情報の漏れでロード出来ない場合、おおむねスキップで問題ありません。たいていの場合、こうした不正レコードはログ出力します。これについては後に述べるリスナーで処理します。

以下はskip limitの使用例です。

Java Configuration

@Bean
public Step step1() {
        return this.stepBuilderFactory.get("step1")
                                .<String, String>chunk(10)
                                .reader(flatFileItemReader())
                                .writer(itemWriter())
                                .faultTolerant()
                                .skipLimit(10)
                                .skip(FlatFileParseException.class)
                                .build();
}

上の例ではFlatFileItemReaderを使用しています。この場合、どの段階においても、FlatFileParseExceptionをスローするとそのアイテムはスキップされてskip limit合計10に対してカウントアップします。step実行中のread, process, writeのスキップ数は別々にカウントしますが、limitはその全スキップに対して適用します。skip limitに達すると、次の例外スローでstepは失敗します。つまり、11回のスキップが例外をトリガするが、10回ではありません(the eleventh skip triggers the exception, not the tenth.)。

上の例の問題点としては、FlatFileParseException以外のその他すべての例外でJobは失敗になります。場合によってはこれは正しい動作となります。しかし、さらに場合によっては、失敗させる例外を指定してその他の全例外はスキップする方が簡単な場合があります。以下はその例です。

Java Configuration

@Bean
public Step step1() {
        return this.stepBuilderFactory.get("step1")
                                .<String, String>chunk(10)
                                .reader(flatFileItemReader())
                                .writer(itemWriter())
                                .faultTolerant()
                                .skipLimit(10)
                                .skip(Exception.class)
                                .noSkip(FileNotFoundException.class)
                                .build();
}

スキップ可能例外クラスにjava.lang.Exceptionを指定することで、すべてのExceptionsをスキップ可能に指定しています。ただし、'除外'としてjava.io.FileNotFoundExceptionを指定することで、すべてのExceptionsからFileNotFoundException除外した設定になります。もしその除外した例外が発生する(つまりスキップ不能な例外)はfatalになります。

例外発生時において、スキップ可能かどうかはクラス階層上の最も近いスーパークラスが決定します。未分類の例外は'fatal'扱いになります。

skipnoSkipの呼び出し順序は特に意味を持ちません。

1.1.6. Configuring Retry Logic

たいていの場合、例外はskipするかStep失敗のどちらかにします。ただし、すべての例外が決定的なわけではありません。いま、読み込み中にFlatFileParseExceptionをスローする場合、そのレコードに対しては常に例外をスローします。ItemReaderのリセットは無意味です。対して、DeadlockLoserDataAccessExceptionなど、プロセスが別のプロセスでロックするレコードの更新をしようとする場合などは、待機して再試行すれば成功します。この場合、以下のようにリトライを設定します。

@Bean
public Step step1() {
        return this.stepBuilderFactory.get("step1")
                                .<String, String>chunk(2)
                                .reader(itemReader())
                                .writer(itemWriter())
                                .faultTolerant()
                                .retryLimit(3)
                                .retry(DeadlockLoserDataAccessException.class)
                                .build();
}

Stepでは、リトライ可能なアイテム数と、リトライ可能な例外リストを設定可能です。リトライの挙動の詳細についてはretryを参照してください。

1.1.7. Controlling Rollback

デフォルトでは、リトライかスキップかを問わず、ItemWriterがスローする例外はStepで制御するトランザクションロールバックさせます。スキップを前述のように設定する場合、ItemReaderがスローする例外はロールバックになりません。ただし、ItemWriterの例外でロールバックできないケースは色々考えられますが、これはトランザクションを無効にするアクションが無い場合があるためです。このため、以下例のように、ロールバックをしない例外リストをStepに設定可能です。

Java Configuration

@Bean
public Step step1() {
        return this.stepBuilderFactory.get("step1")
                                .<String, String>chunk(2)
                                .reader(itemReader())
                                .writer(itemWriter())
                                .faultTolerant()
                                .noRollback(ValidationException.class)
                                .build();
}

Transactional Readers

ItemReaderの基本的な役割は直線的で戻ることは無いです。stepはreaderの入力をバッファし、これはロールバック時にアイテムをreaderから再読み込みする必要を無くすためです。しかし、JMSキューなど、readerがトランザクショナルリソースの一番最初に置かれるケースがあるにはあります。この場合、キューがロールバックするトランザクションに関連付けられるので、キューからプル済みのメッセージは戻されます。このため、下記例のように、アイテムをバッファしないstepを設定可能です。

Java Configuration

@Bean
public Step step1() {
        return this.stepBuilderFactory.get("step1")
                                .<String, String>chunk(2)
                                .reader(itemReader())
                                .writer(itemWriter())
                                .readerIsTransactionalQueue()
                                .build();
}

1.1.8. Transaction Attributes

トランザクション属性(Transaction attributes)は、isolation, propagation, timeout、の設定を制御するのに使います。詳細についてはSpring core documentationを参照してください。以下のサンプルではisolation, propagation, timeoutトランザクション属性を設定しています。

Java Configuration

@Bean
public Step step1() {
        DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
        attribute.setPropagationBehavior(Propagation.REQUIRED.value());
        attribute.setIsolationLevel(Isolation.DEFAULT.value());
        attribute.setTimeout(30);

        return this.stepBuilderFactory.get("step1")
                                .<String, String>chunk(2)
                                .reader(itemReader())
                                .writer(itemWriter())
                                .transactionAttribute(attribute)
                                .build();
}

1.1.9. Registering ItemStream with a Step

stepはライフサイクルの必要時点でItemStreamのコールバックを処理する必要があります。(ItemStreamインタフェースの詳細についてはItemStreamを参照)。これはstepが失敗してリスタート可能にする必要がある場合は必須で、その理由は、ItemStreamはstepで必要な実行間の永続化状態を取得する場所なためです。

ItemReader, ItemProcessor, ItemWriterItemStreamを実装すると、それらは自動登録されます。それ以外のstreamは別途登録します。このstreamは間接的な依存性、デリゲートなど、になる事が多く、readerとwriterにインジェクションされます。streamは、以下例のように、'streams'要素でStepに登録します。

Java Configuration

@Bean
public Step step1() {
        return this.stepBuilderFactory.get("step1")
                                .<String, String>chunk(2)
                                .reader(itemReader())
                                .writer(compositeItemWriter())
                                .stream(fileItemWriter1())
                                .stream(fileItemWriter2())
                                .build();
}

/**
 * Spring Batch 4では、CompositeItemWriterはItemStreamを実装しているので以下は不要ですが
 * 例として作成しています。
 */
@Bean
public CompositeItemWriter compositeItemWriter() {
        List<ItemWriter> writers = new ArrayList<>(2);
        writers.add(fileItemWriter1());
        writers.add(fileItemWriter2());

        CompositeItemWriter itemWriter = new CompositeItemWriter();

        itemWriter.setDelegates(writers);

        return itemWriter;
}

上記のサンプルでは、CompositeItemWriterItemStreamではありませんが、デリゲート先に処理を任せます。このため、デリゲート先のwriterはフレームワーク側で正しく認識出来るようにstreamとして明示的に登録の必要があります。ItemReaderはstreamとしての明示的な登録は必要ありませんが、これはStepのプロパティとして直接指定しているためです。上記のstepはリスタート可能で、readerとwriterの状態は失敗イベント時に正しく永続化されます。

1.1.10. Intercepting Step Execution

Job同様、Step実行中には様々なイベントが発生し、そこでユーザが何らかの機能を実行したい場合があります。たとえば、フッターが必要なフラットファイルを書き出すには、フッターを書き込めるように、Step完了時にItemWriterへ通知の必要があります。是の実現にはStepスコープのリスナの一つを使います。

StepListener(これ自体はマーカーに過ぎない)の拡張の一つを実装するクラスはlisteners要素でstepに適用します。listeners要素は、step, tasklet, chunk、で妥当です。なお、そのリスナ関数を適用するレベルに対し宣言することを推奨します。もし単一クラスで複数リスナ(StepExecutionListenerItemReadListenerなど)を兼ねる場合、最も細かいレベルで宣言します。以下はchunkにリスナを適用する例です。

Java Configuration

@Bean
public Step step1() {
        return this.stepBuilderFactory.get("step1")
                                .<String, String>chunk(10)
                                .reader(reader())
                                .writer(writer())
                                .listener(chunkListener())
                                .build();
}

<step>要素もしくは*StepFactoryBeanファクトリの一つを使用する場合で、ItemReader, ItemWriter, ItemProcessor自身がStepListenerの一つを実装しているとStepに自動登録されます。これはStepに直接インジェクションするコンポーネントにだけ適用されます。もしリスナが別コンポーネント内にネストする場合、明示的な登録が必要です(Registering ItemStream with a Stepで解説)。

StepListenerインタフェースに加えて、同様な処理のためのアノテーションがあります。POJOにこれらアノテーションを付与するメソッドを持たせると、対応するStepListenerに変換します。ItemReader, ItemWriter, Taskletなどchunkコンポネントの実装にそうしたアノテーションを付与するのが良くある使い方です。このアノテーションは、<listener/>要素用のXMLパーサーがアナライズするように、ビルダーのlistenerでも登録をします。よって、stepにリスナーを登録するには、XML namespaceかビルダーのどちらかを使えば良い、ということです。

StepExecutionListener

StepExecutionListenerStep実行における最も汎用的なリスナーです。Step開始・終了後に、完了・失敗に関わらず、通知を受けます。例は以下の通りです。

public interface StepExecutionListener extends StepListener {

    void beforeStep(StepExecution stepExecution);

    ExitStatus afterStep(StepExecution stepExecution);

}

afterStepの戻り値型ExitStatusによってリスナーでStepの完了コードを修正可能です。

このインタフェースに対応するアノテーションは以下です。

  • @BeforeStep
  • @AfterStep

ChunkListener

chunkはトランザクションスコープ内で処理するアイテムとして定義します。コミットインターパルで、トランザクションをコミットすると、chunkをコミットします。ChunkListenerはchunk処理開始かchunk正常処理完了後にロジックを挟むのに使います。以下がインタフェースです。

public interface ChunkListener extends StepListener {

    void beforeChunk(ChunkContext context);
    void afterChunk(ChunkContext context);
    void afterChunkError(ChunkContext context);

}

beforeChunkメソッドはトランザクション開始後でItemReaderのread呼び出し前に呼ばれます。逆に、afterChunkメソッドはchunkコミット後によばれます(ロールバックが起きてない場合に限る)。

このインタフェースに対応するアノテーションは以下です。

  • @BeforeChunk
  • @AfterChunk
  • @AfterChunkError

ChunkListenerはchunk宣言が無い場合にも適用可能です。TaskletStepChunkListenerを呼ぶ責任があるため、非アイテム指向のtaskletにも同様に適用します(taskletの前後に呼ばれる)。

ItemReadListener

前述のskip logicで解説したように、スキップレコードを後々追えるようにログを残すと有益な場合がある、と述べました。読込エラーの場合には、以下インタフェース定義に示す、ItemReaderListenerで実現できます。

public interface ItemReadListener<T> extends StepListener {

    void beforeRead();
    void afterRead(T item);
    void onReadError(Exception ex);

}

beforeReadメソッドはItemReaderの毎回のread呼び出しの前に呼ばれます。afterReadはreadが正常に完了して読みだしたアイテムを次に渡し終えた後に呼ばれます。読み出し中に何らかのエラーが発生した場合、onReadErrorメソッドが呼ばれます。発生した例外をログ出力するなどが可能です。

このインタフェースに対応するアノテーションは以下です。

  • @BeforeRead
  • @AfterRead
  • @OnReadError

ItemProcessListener

ItemReadListener同様、アイテムのprocess処理も、以下インタフェース定義に示すような形で、リッスンが可能です。

public interface ItemProcessListener<T, S> extends StepListener {

    void beforeProcess(T item);
    void afterProcess(T item, S result);
    void onProcessError(T item, Exception e);

}

beforeProcessメソッドはItemProcessorprocess前で処理アイテムを渡す前に呼ばれます。afterProcessメソッドはアイテム正常処理後に呼ばれます。処理中にエラーが発生した場合、onProcessErrorメソッドが呼ばれます。発生した例外とそこで処理対象だったアイテムが渡されるので、それをログ出力などします。

このインタフェースに対応するアノテーションは以下です。

  • @BeforeProcess
  • @AfterProcess
  • @OnProcessError

ItemWriteListener

アイテム書き込みのリッスンはItemWriteListenerで行い、そのインタフェース定義は以下の通りです。

public interface ItemWriteListener<S> extends StepListener {

    void beforeWrite(List<? extends S> items);
    void afterWrite(List<? extends S> items);
    void onWriteError(Exception exception, List<? extends S> items);

}

beforeWriteメソッドはItemWriterwrite前で書き込みアイテムリストを渡す前に呼ばれます。afterWriteメソッドはアイテム正常書き込み後に呼ばれます。エラーが発生した場合、onWriteErrorメソッドが呼ばれます。発生した例外と書き込み対象アイテムが渡されるので、それをログ出力などします。

このインタフェースに対応するアノテーションは以下です。

  • @BeforeWrite
  • @AfterWrite
  • @OnWriteError

SkipListener

ItemReadListener, ItemProcessListener, ItemWriteListenerはいずれもエラー通知の機構を備えますが、スキップしたレコードについての情報は得られません。たとえば、onWriteErrorは、アイテムがリトライして成功したとしても呼ばれます(is called even if an item is retried and successful.)。このため、スキップしたアイテムをトラッキングするために別のインターフェスが存在し、以下のようなインタフェース定義になります。

public interface SkipListener<T,S> extends StepListener {

    void onSkipInRead(Throwable t);
    void onSkipInProcess(T item, Throwable t);
    void onSkipInWrite(S item, Throwable t);

}

onSkipInReadは読み込み中にスキップしたアイテムがあれば呼ばれます。ロールバックすると、複数回スキップにより同一アイテムを登録する場合があることに注意してください(It should be noted that rollbacks may cause the same item to be registered as skipped more than once.)。onSkipInWriteは書き込み中にアイテムスキップした場合に呼ばれます。アイテムは正常に読み込まれている(かつ未スキップ)ので、引数にそのアイテムが渡されます。

このインタフェースに対応するアノテーションは以下です。

  • @OnSkipInRead
  • @OnSkipInWrite
  • @OnSkipInProcess

SkipListeners and Transactions

SkipListenerの最もよくある使い方はスキップアイテムのログ出力で、このログは別のバッチ処理かあるいは人間がスキップとなった原因を修正したり調査するのに使います。大本のトランザクションロールバックとなるケースは多数考えられ、Spring Batchは以下2点を保証します。

  1. 適切なスキップメソッド(発生するエラーのタイミングに依存)がアイテムごとに1度だけ呼ばれる。
  2. SkipListenerトランザクションコミット前に必ず常に呼ばれる。これにより、リスナーで呼ぶなんらかのトランザクショナルリソースがItemWriterでの失敗によってロールバックしないようにしています。

1.2. TaskletStep

chunk指向処理だけがStepの唯一の処理方法ではありません。Stepがシンプルにストアド呼び出しするだけの場合はどうでしょうか。ItemReaderで呼び出してストアド実行後にnullを返すような作りには出来ます。しかし、これは少々不自然で、何もしないItemWriterを作らなければなりません。Spring BatchではこうしたケースではTaskletStepを使います。

Taskletexecuteという1つのメソッドを持つシンプルなインタフェースで、TaskletStepが反復呼び出しを行い、RepeatStatus.FINISHEDを返すか失敗を示す例外をスローするまで実行します。Taskletの毎回の呼び出しはトランザクションでラップします。Taskletの実装者は、ストアド・スクリプト・シンプルなSQL更新、などを行います。

TaskletStepを作成するには、ビルダーのtaskletメソッドにTaskletを実装したbeanを渡します。TaskletStepをビルダーで作る際にはchunkを呼ぶ必要はありません。以下はシンプルなtaskletの例です。

@Bean
public Step step1() {
    return this.stepBuilderFactory.get("step1")
                            .tasklet(myTasklet())
                            .build();
}

TaskletStepはもしtaskletがStepListenerを実装する場合はStepListenerとしても自動的に登録します。

1.2.1. TaskletAdapter

ItemReaderItemWriterのアダプタ同様、TaskletにもTaskletAdapterというSpring Batchが用意するアダプタクラスがあります。これが有用な一例としてはレコードセットのフラグ更新に既存のDAOを使う場合です。TaskletAdapterにより、以下例のように、Taskletのアダプターを記述することなくDAOのメソッドを呼び出せます。

Java Configuration

@Bean
public MethodInvokingTaskletAdapter myTasklet() {
        MethodInvokingTaskletAdapter adapter = new MethodInvokingTaskletAdapter();

        adapter.setTargetObject(fooDao());
        adapter.setTargetMethod("updateFoo");

        return adapter;
}

1.2.2. Example Tasklet Implementation

たいていのバッチjobaは、各種リソースのセットアップためメイン処理開始前に実行すべきstepや、そうしたリソースをクリーンアップする処理の終了後に実行すべきstepがあります。巨大なファイルを扱うjobの場合、別の場所へ正常にアップロード完了後にローカルの対象ファイルを削除する事がほとんどです。以下の例(Spring Batch samples projectの抜粋)はまさにそのような事をするTasklet実装の例です。

public class FileDeletingTasklet implements Tasklet, InitializingBean {

    private Resource directory;

    public RepeatStatus execute(StepContribution contribution,
                                ChunkContext chunkContext) throws Exception {
        File dir = directory.getFile();
        Assert.state(dir.isDirectory());

        File[] files = dir.listFiles();
        for (int i = 0; i < files.length; i++) {
            boolean deleted = files[i].delete();
            if (!deleted) {
                throw new UnexpectedJobExecutionException("Could not delete file " +
                                                          files[i].getPath());
            }
        }
        return RepeatStatus.FINISHED;
    }

    public void setDirectoryResource(Resource directory) {
        this.directory = directory;
    }

    public void afterPropertiesSet() throws Exception {
        Assert.notNull(directory, "directory must be set");
    }
}

上のTasklet実装は所定ディレクトリ内の全ファイルを削除します。executeメソッドは一度だけ呼ばれることに注意してください。あとはStepからこのTaskletを参照します。

Java Configuration

@Bean
public Job taskletJob() {
        return this.jobBuilderFactory.get("taskletJob")
                                .start(deleteFilesInDir())
                                .build();
}

@Bean
public Step deleteFilesInDir() {
        return this.stepBuilderFactory.get("deleteFilesInDir")
                                .tasklet(fileDeletingTasklet())
                                .build();
}

@Bean
public FileDeletingTasklet fileDeletingTasklet() {
        FileDeletingTasklet tasklet = new FileDeletingTasklet();

        tasklet.setDirectoryResource(new FileSystemResource("target/test-outputs/test-dir"));

        return tasklet;
}

1.3. Controlling Step Flow

jobで複数stepをグループ化する機能では、あるstepから別のstepへのjob "flows" を制御する方法が必要となります。Stepの失敗が必ずしもJobの失敗を意味しません。また、Stepが次を実行すべきか決定するための'success'が複数種類存在する場合もあります。Stepsのグループ設定次第では、ある特定のstepが全く実行されない場合がありえます。

1.3.1. Sequential Flow

もっともシンプルなflowのケースは、以下イメージのように、すべてのstepをシーケンシャルに実行するjobです。

Figure 3. Sequential Flow

こういう設定をするにはstep要素の'next'属性を以下の例のように使います。

Java Configuration

@Bean
public Job job() {
        return this.jobBuilderFactory.get("job")
                                .start(stepA())
                                .next(stepB())
                                .next(stepC())
                                .build();
}

上のケースでは、'step A'がStepリストの最初にあるので最初に実行します。'step A'が正常終了すると、'step B'を実行し、その後は同様です。ただし、'step A'が失敗する場合、Job全体が失敗して、'step B'は実行しません。

1.3.2. Conditional Flow

上記例では、2パターンのみ存在します。

  1. Stepが成功して次のStepを実行する。
  2. Stepが失敗するとJobが失敗する。

これで十分なケースも多いです。しかし、失敗にするのではなく、Stepの失敗が別のStepをトリガーするケースはどうでしょうか。以下がそうしたflowのイメージです。

Figure 4. Conditional Flow

複雑なケースを扱うために、Spring Batch namespaceではstep要素内に定義する遷移要素を用意しています。そうした遷移要素の一つがnext要素です。next属性同様、next要素は次に実行するStepJobに指示します。ただし、属性とは異なり、Stepには任意数のnext要素を指定可能で、その場合失敗時のデフォルトの振る舞いは存在しません。つまり、遷移要素を使う場合、Step遷移に対するすべての振る舞いを明示的に定義する必要があります。なお、単一stepはnext属性とtransition要素両方を指定できません。

next要素にはマッチするパターンと次に実行するstepを、以下サンプルのように、指定します。

Java Configuration

@Bean
public Job job() {
        return this.jobBuilderFactory.get("job")
                                .start(stepA())
                                .on("*").to(stepB())
                                .from(stepA()).on("FAILED").to(stepC())
                                .end()
                                .build();
}

java設定を使う場合はonメソッドにシンプルなパターンマッチングを使い、Step実行の結果であるExitStatusにマッチさせるものを指定します。

パターンには2つの特殊文字を使用可能です。

  • "*"は0以上の文字列にマッチ。
  • "?"は1文字にマッチ。

例えば、"c*t"は"cat"と"count"にマッチし、"c?t"は"cat"にはマッチするが"count"にはしない。

Stepに配置する遷移要素に上限はありませんが、Step実行が返すExitStatusに対応する要素が無い場合、フレームワークは例外をスローしてJobは失敗します。フレームワークは遷移順序を最もマッチするものからしないもの順で決定します。つまり、上記例で順序を"stepA"に入れ替えたとしても、"FAILED"のExitStatusは"stepC"に遷移します(This means that, even if the ordering were swapped for "stepA" in the example above, an ExitStatus of "FAILED" would still go to "stepC".)。

Batch Status Versus Exit Status

条件付きflowでJobを設定する場合、BatchStatusExitStatusの違いを意識する事は重要です。BatchStatusenumで、JobExecutionStepExecution双方のプロパティであり、フレームワークJobStepのステータスを記録するために使います。以下の値のいずれか1つになります。COMPLETED, STARTING, STARTED, STOPPING, STOPPED, FAILED, ABANDONED, UNKNOWN。これらの多くは自己説明的です。COMPLETEDはstepやjobが正常終了した場合のステータスで、FAILEDは失敗時、などです。

以下の例はJava設定で'on'要素を持つものです。

...
.from(stepA()).on("FAILED").to(stepB())
...

一目見た感じでは、'on'がStepBatchStatusを参照するように見えます。しかし、実際にはStepExitStatusを参照します。名前が示すように、ExitStatusはそのStepの終了後のステータスを表します。

英語で書くなら、"exit codeがFAILEDであればstepBに遷移する"( "go to stepB if the exit code is FAILED")、です。デフォルトではexit codeはStepBatchStatusと常に同じになり、これが上記サンプルがなぜ動作するのかの理由です。ただし、exit codeを別の値にしたい場合は? 好例がsamples projectのskip sample jobにあります。

Java Configuration

@Bean
public Job job() {
        return this.jobBuilderFactory.get("job")
                        .start(step1()).on("FAILED").end()
                        .from(step1()).on("COMPLETED WITH SKIPS").to(errorPrint1())
                        .from(step1()).on("*").to(step2())
                        .end()
                        .build();
}

step1には3パターンがありえます。

  1. Stepが失敗、jobも失敗。
  2. Stepが正常終了。
  3. Stepが正常終了するがexit codeはCOMPLETED WITH SKIPS'。この場合、エラー処理に別のstepを実行する。

上記の設定は動作します。ただし、以下例のように、レコードスキップ条件でexit codeを変更するように修正します。

public class SkipCheckingListener extends StepExecutionListenerSupport {
    public ExitStatus afterStep(StepExecution stepExecution) {
        String exitCode = stepExecution.getExitStatus().getExitCode();
        if (!exitCode.equals(ExitStatus.FAILED.getExitCode()) &&
              stepExecution.getSkipCount() > 0) {
            return new ExitStatus("COMPLETED WITH SKIPS");
        }
        else {
            return null;
        }
    }
}

上のStepExecutionListenerはまず、Stepが正常終了かつStepExecutionのskip countが0より大きい、かどうかをチェックします。両条件を満たす場合、新規ExitStatusをexit code COMPLETED WITH SKIPSで返します。

1.3.3. Configuring for Stop

BatchStatus and ExitStatusの解説を読んだ後、JobBatchStatusExitStatusの決定ルールを知りたいと感じたかもしれません。Stepのステータスは実行するコードで決定し、Jobは設定で決定します。

これまでに解説したjob設定はすべて少なくとも一つの遷移無しの最終Stepを持っています。たとえば、以下例のように、step実行後、Jobは終了します。

@Bean
public Job job() {
        return this.jobBuilderFactory.get("job")
                                .start(step1())
                                .build();
}

Stepに遷移先が無い場合、Jobのステータスは以下のように決定します。

  • その最終StepExitStatus FAILEDで終了する場合、JobBatchStatusExitStatusは両方ともFAILEDになる。
  • それ以外の場合、JobBatchStatusExitStatusは両方ともCOMPLETEDになる。

ある種のバッチジョブ、シンプルなシーケンシャルstepのjobなどでは、上記の終了ルールで十分ですが、カスタム定義のjob停止が必要な場合もあります。そうした用途のために、Spring BatchはJobを停止するための遷移要素を3つ用意しています(前述のnext要素に加えて)。それらの停止要素はJobを特定のBatchStatusで停止します。なお、その停止遷移要素はJobのいずれのStepsBatchStatusあるいはExitStatusのどちらにも影響を与えません。これらの要素はJobの最終ステータスにだけ影響を与えます。たとえば、jobのすべてのstepがFAILEDでありながら、jobはCOMPLETEDに出来ます。

Ending at a Step

stepを終了するとBatchStatusCOMPLETEDjobを停止します。ステータスCOMPLETEDで終了するJobはリスタート出来ません(フレームワークJobInstanceAlreadyCompleteExceptionをスローする)。

Java設定を使う場合、そのタスクには'end'メソッドを使います。また、endメソッドはJobExitStatusをカスタマイズするための'exitStatus'パラメータを取ることも可能です。'exitStatus'を指定しないはデフォルトでExitStatusCOMPLETEDとなり、BatchStatusもそうなります。

以下のケースでは、step2が失敗するとJobBatchStatus COMPLETEDおよびExitStatus COMPLETEDで停止してstep3は実行しません。そうでない場合、step3に遷移します。なお、step2が失敗する場合はJobはリスタート出来ません(ステータスがCOMPLETEDなので)。

@Bean
public Job job() {
        return this.jobBuilderFactory.get("job")
                                .start(step1())
                                .next(step2())
                                .on("FAILED").end()
                                .from(step2()).on("*").to(step3())
                                .end()
                                .build();
}

Failing a Step

所定ポイントでstepを失敗するよう設定するとJobBatchStatus FAILEDで停止します。endと異なり、Job失敗でリスタート不能にはなりません。

以下のケースでは、step2が失敗するとJobBatchStatus FAILED ExitStatus EARLY TERMINATIONで停止してstep3は実行しません。そうでない場合、step3に遷移します。また、step2が失敗してJobをリスタートすると、step2から再開します。

Java Configuration

@Bean
public Job job() {
        return this.jobBuilderFactory.get("job")
                        .start(step1())
                        .next(step2()).on("FAILED").fail()
                        .from(step2()).on("*").to(step3())
                        .end()
                        .build();
}

Stopping a Job at a Given Step

特定のstepでjobを停止する設定をすると、JobBatchStatus STOPPEDで停止します。Job停止により処理は一時停止するため、オペレーターはJobの再開前になんらかの作業が出来ます。

java設定の場合、stopAndRestartメソッドは'restart'属性を必要とし、この属性にはJobリスタート時にピックアップするstepを指定します。

以下のケースでは、step1COMPLETEで終了するとjobは停止します。リスタートするとstep2から再開します。

@Bean
public Job job() {
        return this.jobBuilderFactory.get("job")
                        .start(step1()).on("COMPLETED").stopAndRestart(step2())
                        .end()
                        .build();
}

1.3.4. Programmatic Flow Decisions

あるケースでは、ExitStatus以外の情報も使用して次に実行するstepを決定したい場合があります。この場合、以下サンプルのように、決定を行うJobExecutionDeciderを使用します。

public class MyDecider implements JobExecutionDecider {
    public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
        String status;
        if (someCondition()) {
            status = "FAILED";
        }
        else {
            status = "COMPLETED";
        }
        return new FlowExecutionStatus(status);
    }
}

次の例では、Java設定でJobExecutionDeciderを実装するbeanをnextに直接渡しています。

Java Configuration

@Bean
public Job job() {
        return this.jobBuilderFactory.get("job")
                        .start(step1())
                        .next(decider()).on("FAILED").to(step2())
                        .from(decider()).on("COMPLETED").to(step3())
                        .end()
                        .build();
}

1.3.5. Split Flows

これまでに解説したケースのJobはいずれも線形に一度に1つのstepを実行します。この一般的なスタイルに加えて、Spring Batchはjobをparallel flowsに設定可能です。

Java設定ではビルダーを用いて設定をスプリットできます。以下に示す例は、'split'要素は複数の'split'要素を持ち、そこで個々のflowを定義します。また、'split'要素には、これまでに解説した遷移要素、'next'属性や'next'要素・'end'や'fail'要素、を入れられます。

@Bean
public Job job() {
        Flow flow1 = new FlowBuilder<SimpleFlow>("flow1")
                        .start(step1())
                        .next(step2())
                        .build();
        Flow flow2 = new FlowBuilder<SimpleFlow>("flow2")
                        .start(step3())
                        .build();

        return this.jobBuilderFactory.get("job")
                                .start(flow1)
                                .split(new SimpleAsyncTaskExecutor())
                                .add(flow2)
                                .next(step4())
                                .end()
                                .build();
}

1.3.6. Externalizing Flow Definitions and Dependencies Between Jobs

jobのflowの一部を別のbeanとして切り出すことで、再使用できます。これには2通りの方法があります。1つは単に、以下に示すように、flowをどこか別の場所で定義したbeanとして宣言します。

Java Configuration

@Bean
public Job job() {
        return this.jobBuilderFactory.get("job")
                                .start(flow1())
                                .next(step3())
                                .end()
                                .build();
}

@Bean
public Flow flow1() {
        return new FlowBuilder<SimpleFlow>("flow1")
                        .start(step1())
                        .next(step2())
                        .build();
}

上の例のように外部flowを定義すると、インラインで定義しているかのように外部flowをjobにstepを追加できることにになります。この方法により、多数のjobが同じテンプレートflowを参照可能となり、そのテンプレートを別のflowに組み込むことも出来ます。また、個々のflowのインテグレーションテストを分離するのにも役立ちます。

flow外部化の別の方法はJobStepの使用です。JobStepFlowStepと似てますが、flow内のstepを別のjob実行として実際に作成・実行します。

以下はJobStepの例です。

Java Configuration

@Bean
public Job jobStepJob() {
        return this.jobBuilderFactory.get("jobStepJob")
                                .start(jobStepJobStep1(null))
                                .build();
}

@Bean
public Step jobStepJobStep1(JobLauncher jobLauncher) {
        return this.stepBuilderFactory.get("jobStepJobStep1")
                                .job(job())
                                .launcher(jobLauncher)
                                .parametersExtractor(jobParametersExtractor())
                                .build();
}

@Bean
public Job job() {
        return this.jobBuilderFactory.get("job")
                                .start(step1())
                                .build();
}

@Bean
public DefaultJobParametersExtractor jobParametersExtractor() {
        DefaultJobParametersExtractor extractor = new DefaultJobParametersExtractor();

        extractor.setKeys(new String[]{"input.file"});

        return extractor;
}

job parameters extractorはStepExecutionContextを実行JobJobParametersに変換する方法を決定するものです。JobStepはjobとstepのモニタリングとレポーティングに細かいオプションをつけたい場合に有用です。また、JobStepは以下の質問に対する回答でもあります。"job間の依存関係をどのように作るのか?" これは大規模システムを小さいモジュールとjobのflow制御に分割する優れた方法です。

1.4. Late Binding of Job and Step Attributes

これまでに解説したXMLとフラットファイルサンプルはいずれもファイル取得にSpringのResourceを使用します。ResourcegetFilejava.io.Fileを返すので機能します。XMLとフラットファイルのリソースは以下サンプルのようにSpringの機能で設定できます。

Java Configuration

@Bean
public FlatFileItemReader flatFileItemReader() {
        FlatFileItemReader<Foo> reader = new FlatFileItemReaderBuilder<Foo>()
                        .name("flatFileItemReader")
                        .resource(new FileSystemResource("file://outputs/file.txt"))
                        ...
}

上のResourceは指定のファイルシステムロケーションからファイルをロードします。注意点として、抽象ロケーション(absolute locations)はスラッシュ2個(//)で開始します。たいていのSpringアプリケーションで、このやり方で十分であり、これらリソース名はコンパイル時に決定しているためです。ただし、バッチのケースでは、ファイル名はjobのパラメータで動的に決定する場合もあります。この場合にはシステムプロパティを読み込み'-D'パラメータを使います。

以下はプロパティからファイル名を読み込み方法です。

Java Configuration

@Bean
public FlatFileItemReader flatFileItemReader(@Value("${input.file.name}") String name) {
        return new FlatFileItemReaderBuilder<Foo>()
                        .name("flatFileItemReader")
                        .resource(new FileSystemResource(name))
                        ...
}

この方法を動かすにはシステム引数(-Dinput.file.name="file://outputs/file.txt"など)を指定します。

※ ここでは``PropertyPlaceholderConfigurerが使用可能ですが、システムプロパティが常にセットされる場合は不要です。これはSpringのResourceEditor```がシステムプロパティのフィルタリングとプレースホルダー置換を行うためです。

また、バッチ設定において、システムプロパティではなくjobのJobParametersでファイル名をパラメータ化してアクセスしたい場合があります。これをするには以下のように、Spring Batchでは各種JobStepで遅延バインディングが可能です。

Java Configuration

@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters['input.file.name']}") String name) {
        return new FlatFileItemReaderBuilder<Foo>()
                        .name("flatFileItemReader")
                        .resource(new FileSystemResource(name))
                        ...
}

以下例のようにJobExecutionStepExecutionExecutionContextは同様な方法でアクセス可能です。

Java Configuration

@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobExecutionContext['input.file.name']}") String name) {
        return new FlatFileItemReaderBuilder<Foo>()
                        .name("flatFileItemReader")
                        .resource(new FileSystemResource(name))
                        ...
}

Java Configuration

@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{stepExecutionContext['input.file.name']}") String name) {
        return new FlatFileItemReaderBuilder<Foo>()
                        .name("flatFileItemReader")
                        .resource(new FileSystemResource(name))
                        ...
}

※ 遅延バインディングを使うbeanはscope="step"を宣言する必要があります。詳細はStep Scopeを参照。

1.4.1. Step Scope

上で取り上げた遅延バインディングの例は、以下サンプルのように、bean定義にすべて"step"スコープを宣言しています。

Java Configuration

@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters[input.file.name]}") String name) {
        return new FlatFileItemReaderBuilder<Foo>()
                        .name("flatFileItemReader")
                        .resource(new FileSystemResource(name))
                        ...
}

遅延バインディングを使うにはStepスコープが必要で、これは属性を参照するには、Step開始までbeanをインスタンス化出来ないためです。このスコープはSpringコンテナのデフォルトには含まれないので、scopeの明示的な追加が必要です。追加するには、batch namespace・StepScopeのbean定義を明示的に追加・@EnableBatchProcessingの使用、のどれか1つを使用します。以下はbatch namespaceの使用例です。

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:batch="http://www.springframework.org/schema/batch"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="...">
<batch:job .../>
...
</beans>

以下の例は明示的なbean定義の追加です。

<bean class="org.springframework.batch.core.scope.StepScope" />

1.4.2. Job Scope

Jobスコープは、Spirng Batch 3.0で導入され、Stepスコープと同様のものですがJobコンテキストのスコープで、ジョブ実行時にそのbeanのインスタンスが1つだけになります。また、参照の遅延バインディングが可能で、JobContext#{..}でアクセスできます。この機能により、以下例に示すように、jobやjob execution contextおよびジョブパラメータ、からbeanプロパティを取得できます。

Java Configuration

@JobScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters[input]}") String name) {
        return new FlatFileItemReaderBuilder<Foo>()
                        .name("flatFileItemReader")
                        .resource(new FileSystemResource(name))
                        ...
}

Java Configuration

@JobScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobExecutionContext['input.name']}") String name) {
        return new FlatFileItemReaderBuilder<Foo>()
                        .name("flatFileItemReader")
                        .resource(new FileSystemResource(name))
                        ...
}

Springコンテナにはデフォルトで含まれないスコープなので、明示的な追加が必要です。batch namespace・JobScopeのbean定義を明示的に追加・@EnableBatchProcessingの使用、のどれか1つを使用します。以下はbatch namespaceの使用例です。

<beans xmlns="http://www.springframework.org/schema/beans"
                  xmlns:batch="http://www.springframework.org/schema/batch"
                  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                  xsi:schemaLocation="...">

<batch:job .../>
...
</beans>

以下の例はJobScopeのbeanを明示的に追加しています。

<bean class="org.springframework.batch.core.scope.JobScope" />

Spring Batch 4.1.x - Reference Documentation - Configuring and Running a Jobのテキトー翻訳

https://docs.spring.io/spring-batch/4.1.x/reference/html/job.html#configureJob

https://qiita.com/kagamihoge/items/12fbbc2eac5b8a5ac1e0 俺の訳一覧リスト

1. Configuring and Running a Job

domain sectionでは、以下図を用いてアーキテクチャデザイン全体について解説しました。

Figure 1. Batch Stereotypes

Jobオブジェクトはstepの単なるコンテナのように見えますが、開発者が知っておいた方が良い多数の設定オプションがあります。また、Jobの実行方法とそのメタデータが実行中にどのように格納されるのかにも注意が必要です。このチャプターでは各種設定オプションとJob実行時の注意事項について解説します。

1.1. Configuring a Job

Jobインタフェースには複数の実装がありますが、ビルダーが設定の違いを吸収します。

@Bean
public Job footballJob() {
    return this.jobBuilderFactory.get("footballJob")
                     .start(playerLoad())
                     .next(gameLoad())
                     .next(playerSummarization())
                     .end()
                     .build();
}

Job(とそれに含むStep)はJobRepositoryを必要とします。JobRepositoryの設定はBatchConfigurerで行います。

上は3つのStepインスタンスを持つJobの例です。また、jobのビルダーでは、パラレル(Split)関連、宣言的なフロー制御(Decision)、フロー定義(Flow)の外部化、も設定出来ます。

1.1.1. Restartability

バッチjob実行時の重要な課題の一つはリスタート時のJobの振る舞いについてです。もし特定のJobInstanceに対するJobExecutionが既に存在する場合、Jobの実行はリスタートと見なします。理想的には、すべてのjobは失敗した箇所から開始可能であるべきですが、それが可能ではないケースがあります。あるケースで新規のJobInstanceを生成する事を保証するのは開発者の責任です。ただし、Spring Batchはその補助機能を提供します。Jobをリスタート不可にする場合、常に新規のJobInstanceで実行し、restartableのプロパティを'false'に設定します。

Java Configuration

@Bean
public Job footballJob() {
    return this.jobBuilderFactory.get("footballJob")
                     .preventRestart()
                     ...
                     .build();
}

別の言い方をすると、restartableをfalseにする事は"このJobは再開出来ない"を意味します。restartableがfalseのJobをリスタートするとJobRestartExceptionをスローします。

Job job = new SimpleJob();
job.setRestartable(false);

JobParameters jobParameters = new JobParameters();

JobExecution firstExecution = jobRepository.createJobExecution(job, jobParameters);
jobRepository.saveOrUpdate(firstExecution);

try {
    jobRepository.createJobExecution(job, jobParameters);
    fail();
}
catch (JobRestartException e) {
    // ここを通る
}

上のJUnitコードは、リスタート不可のjobの1回目に実行するためのJobExecutionを生成し、特に例外は起きません。そして、2回目にはJobRestartExceptionをスローします。

1.1.2. Intercepting Job Execution

Jobの実行中に、何らかのカスタムコードを差し込むために、ライフサイクルの各種イベント通知が有用な場合があります。SimpleJobは適時JobListenerを呼ぶことでこれを実装しています。

public interface JobExecutionListener {

    void beforeJob(JobExecution jobExecution);

    void afterJob(JobExecution jobExecution);

}

SimpleJobのjobにJobListenersを追加するにはlisteners要素で行います。

Java Configuration

@Bean
public Job footballJob() {
    return this.jobBuilderFactory.get("footballJob")
                     .listener(sampleListener())
                     ...
                     .build();
}

なお、Jobの成功か失敗かに依らずafterJobは呼ばれます。成功か失敗かを判別したい場合はJobExecutionから取得します。

public void afterJob(JobExecution jobExecution){
    if( jobExecution.getStatus() == BatchStatus.COMPLETED ){
        //job success
    }
    else if(jobExecution.getStatus() == BatchStatus.FAILED){
        //job failure
    }
}

インタフェースに対応するアノテーションは以下の通りです。

  • @BeforeJob
  • @AfterJob

1.1.4. JobParametersValidator

jobをXMLで宣言したり、AbstractJobのサブクラスにする場合、実行時にjobパラメータ用のvalidatorをオプションで宣言できます。たとえば、すべての必須パラメータを検証してjobを開始したい場合に役立ちます。DefaultJobParametersValidatorでは必須およびオプションの単純なパラメータの組み合わせ検証が可能で、複雑な検証にはインタフェースを自前で実装します。

validatorの設定はjavaのbuilderでも設定可能です。

@Bean
public Job job1() {
    return this.jobBuilderFactory.get("job1")
                     .validator(parametersValidator())
                     ...
                     .build();
}

1.2. Java Config

Spring 3ではXMLに加えてjavaでのアプリケーション設定機能が追加されました。Spring Batch 2.2.0現在、バッチjobはjava configで設定可能です。javaベースの設定には2つのコンポーネントがあり、@EnableBatchProcessingと2つのビルダーがあります。

@EnableBatchProcessingはSpringファミリーの@Enable~アノテーションと同様の動作をします。@EnableBatchProcessingはバッチjobを組み立てるためのベースとなる設定を提供します。ベース設定内で、StepScopeインスタンスが作られ、他にもいくつかのbeanがautowired可能にしています。

  • JobRepository - bean name "jobRepository"
  • JobLauncher - bean name "jobLauncher"
  • JobRegistry - bean name "jobRegistry"
  • PlatformTransactionManager - bean name "transactionManager"
  • JobBuilderFactory - bean name "jobBuilders"
  • StepBuilderFactory - bean name "stepBuilders"

この設定のコアとなるインタフェースはBatchConfigurerです。デフォルト実装は上のbeanを提供し、コンテキストにDataSourceのbean定義が必要です。このデータソースはJobRepositoryで使用します。BatchConfigurerのカスタム実装を作ることで上述のbeanのカスタマイズが可能です。基本的には、DefaultBatchConfigurerBatchConfigurerがコンテキスに無ければこれを使用する)を拡張して必要なgetterをオーバーライドします。なお、スクラッチから実装することも可能です。以下の例はカスタムのtransaction managerを使用する例です。

@Bean
public BatchConfigurer batchConfigurer() {
        return new DefaultBatchConfigurer() {
                @Override
                public PlatformTransactionManager getTransactionManager() {
                        return new MyTransactionManager();
                }
        };
}

@EnableBatchProcessingを持つconfigクラスは1つだけにして下さい。このアノテーションを1つでもどこかのクラスに付与すると、上述のbeanが利用可能になります。

所定のベースconfigにおいて、job設定のためのビルダーのファクトリーを使用可能です。以下はJobBuilderFactoryStepBuilderFactoryで2つのstepのjobを設定する例です。

@Configuration
@EnableBatchProcessing
@Import(DataSourceConfiguration.class)
public class AppConfig {

    @Autowired
    private JobBuilderFactory jobs;

    @Autowired
    private StepBuilderFactory steps;

    @Bean
    public Job job(@Qualifier("step1") Step step1, @Qualifier("step2") Step step2) {
        return jobs.get("myJob").start(step1).next(step2).build();
    }

    @Bean
    protected Step step1(ItemReader<Person> reader,
                         ItemProcessor<Person, Person> processor,
                         ItemWriter<Person> writer) {
        return steps.get("step1")
            .<Person, Person> chunk(10)
            .reader(reader)
            .processor(processor)
            .writer(writer)
            .build();
    }

    @Bean
    protected Step step2(Tasklet tasklet) {
        return steps.get("step2")
            .tasklet(tasklet)
            .build();
    }
}

1.3. Configuring a JobRepository

@EnableBatchProcessingを使う場合、特に設定無くJobRepositoryを使えます。このセクションでは自前での設定についてを解説します。

前述の通り、JobRepositoryはSpring Batch内の各種永続化オブジェクト、JobExecutionStepExecutionなど、のための基本的なCRUD操作で使います。このクラスはフレームワークの主要機能、JobLauncher, Job, Step、など多くの場所で使用します。

java設定の場合、JobRepositoryが使用可能です。DataSourceがあればJDBCベースの実装が使われ、無ければMapベースが使われます。ただし、BatchConfigurerの実装を利用してJobRepositoryのカスタマイズは可能です。

Java Configuration

...
// 以下はBatchConfigurerの実装内に置くものとする。
@Override
protected JobRepository createJobRepository() throws Exception {
    JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
    factory.setDataSource(dataSource);
    factory.setTransactionManager(transactionManager);
    factory.setIsolationLevelForCreate("ISOLATION_SERIALIZABLE");
    factory.setTablePrefix("BATCH_");
    factory.setMaxVarCharLength(1000);
    return factory.getObject();
}
...

上記の設定オプションは、 dataSourceとtransactionManagerを除いて、いずれも必須ではありません。設定しない場合、上のサンプルコードに書かれたデフォルト値を使います。They are shown above for awareness purposes. max varchar lengthのデフォルトは2500で、sample schema scriptsのlong VARCHARカラムのlengthになります。

1.3.1. Transaction Configuration for the JobRepository

namespaceもしくはFactoryBeanを使用する場合、transactional adviceがrepositoryのaroundに自動的に作られます。これは、バッチメタデータに含まれる状態のうち、失敗後のリスタートに必要な状態が正しく永続化される、事を保証します。repositoryのメソッドが非トランザクションの場合、フレームワーク振る舞いが不定になります。The isolation level in the create* method attributes is specified separately to ensure that when jobs are launched, もし2つのプロセスが同時に同一jobを起動すると片方だけが成功します。このメソッド用のデフォルトのisolation levelはSERIALIZABLEで、この設定はかなり攻めているため、READ_COMMITTEDでも同様に機能します。2つのプロセスが衝突をしないような場合にREAD_COMMITTEDが適します。ただ、create*メソッドの呼び出しは極めて短時間なので、DBがサポートしていれば、SERIALIZEDが問題となる可能性は低いです。とはいえ、オーバーライドは可能です。

Java Configuration

// 以下はBatchConfigurerの実装内に置くものとする。
@Override
protected JobRepository createJobRepository() throws Exception {
    JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
    factory.setDataSource(dataSource);
    factory.setTransactionManager(transactionManager);
    factory.setIsolationLevelForCreate("ISOLATION_REPEATABLE_READ");
    return factory.getObject();
}

namespaceあるいはfactory beansを使わない場合、AOPを使用するrepositoryのtransactionalな振る舞いの設定が必要です。

Java Configuration

@Bean
public TransactionProxyFactoryBean baseProxy() {
        TransactionProxyFactoryBean transactionProxyFactoryBean = new TransactionProxyFactoryBean();
        Properties transactionAttributes = new Properties();
        transactionAttributes.setProperty("*", "PROPAGATION_REQUIRED");
        transactionProxyFactoryBean.setTransactionAttributes(transactionAttributes);
        transactionProxyFactoryBean.setTarget(jobRepository());
        transactionProxyFactoryBean.setTransactionManager(transactionManager());
        return transactionProxyFactoryBean;
}

1.3.2. Changing the Table Prefix

JobRepositoryのもう一つの設定可能なプロパティにメタデータテーブルのプレフィクスがあります。デフォルトではすべてBATCHが頭につきます。BATCH_JOB_EXECUTIONとBATCH_STEP_EXECUTIONなどです。このプレフィクスを変更しなければならないケースが存在します。スキーマ名をテーブル名に付ける必要があるとか、同一スキーマ内に複数のメタデータテーブルを作る必要があるなどで、この場合はテーブルのプレフィクスを変更する必要があります。

Java Configuration

// 以下はBatchConfigurerの実装内に置くものとする。
@Override
protected JobRepository createJobRepository() throws Exception {
    JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
    factory.setDataSource(dataSource);
    factory.setTransactionManager(transactionManager);
    factory.setTablePrefix("SYSTEM.TEST_");
    return factory.getObject();
}

上のように変更すると、メタデータテーブルに対するすべてのクエリに"SYSTEM.TEST_"のプレフィクスがつきます。BATCH_JOB_EXECUTIONはSYSTEM.TEST_JOB_EXECUTIONで参照します。

※ テーブルのプレフィクスだけが変更可能です。テーブルとカラム名は変更できません。

1.3.3. In-Memory Repository

DBにドメインオブジェクトの永続化をしたくないケースが存在します。1つは速度で、コミットポイントごとのドメインオブジェクトの保存には幾分かの時間がかかります。もう1つは、特定のjobに限っては状態を永続化したくない場合です。このような用途のために、Spring Batchはjob repositoryのインメモリMap版を用意しています。

Java Configuration

// 以下はBatchConfigurerの実装内に置くものとする。
@Override
protected JobRepository createJobRepository() throws Exception {
    JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
    factory.setDataSource(dataSource);
    factory.setTransactionManager(transactionManager);
    factory.setIsolationLevelForCreate("ISOLATION_REPEATABLE_READ");
    return factory.getObject();
}

インメモリのrepositoryは揮発性でJVMインスタンス間を超えてのリスタートが出来ない点に注意してください。同一パラメータの2つのjobインスタンスを同時に実行する事も保証出来ないので、マルチスレッドJobには適しておらず、また、locally partitioned Stepも同様です。よって、それらの機能を使いたい場合にはrepositoryのDBバージョンを使用してください。

なお、transaction managerを定義する必要があり、その理由はrepository内にはrollback semanticsがあるのとビジネスロジックにはたいていtransactionalな箇所(RDBMSアクセス)があるためです。テスト目的ではResourcelessTransactionManagerを使うと便利です。

1.3.4. Non-standard Database Types in a Repository

使用するDBがサポート対象リストに無い場合、SQLバリアントがなるべく近いものを、サポート対象から1つ選べば使える可能性があります。これを設定するにはnamespaceショートカットの代わりにJobRepositoryFactoryBeanを使用し、一番近いDBタイプをセットします。

Java Configuration

// 以下はBatchConfigurerの実装内に置くものとする。
@Override
protected JobRepository createJobRepository() throws Exception {
    JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
    factory.setDataSource(dataSource);
    factory.setDatabaseType("db2");
    factory.setTransactionManager(transactionManager);
    return factory.getObject();
}

JobRepositoryFactoryBeanはDBタイプが未指定の場合はDataSourceから自動検出します。)プラットフォームごとの主な違いはプライマリキーのインクリメント方法で、これが異なる場合にはincrementerFactoryをオーバーライドする必要があります(Spring Frameworkの標準実装の一つを選んで使用する)。

これが動作しない場合、もしくはRDBMSでは無い場合、SimpleJobRepositoryが依存する各種のDaoインタフェースを実装してSpringの作法に沿ってマニュアルでそれらのbeanをワイヤリングするのが唯一の解決策です。

1.4. Configuring a JobLauncher

@EnableBatchProcessingを使う場合、特に設定無くJobRegistryを使えます。このセクションでは自前での設定についてを解説します。

JobLauncherインタフェースのベーシックな実装がSimpleJobLauncherです。このオブジェクトの依存性は、executionを取得するための、JobRepositoryだけです。

Java Configuration

// 以下はBatchConfigurerの実装内に置くものとする。
...
// This would reside in your BatchConfigurer implementation
@Override
protected JobLauncher createJobLauncher() throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        jobLauncher.afterPropertiesSet();
        return jobLauncher;
}
...

JobExecutionを取得すると、Jobの実行メソッドに渡され、最終的に呼び出し元にJobExecutionを返します。

Figure 2. Job Launcher Sequence

このシーケンスは単純化したものでスケジューラから起動する場合にはうまくいきます。しかし、HTTPリクエスト経由の起動時には問題があります。この場合、起動は非同期で行う必要があり、SimpleJobLauncherは呼び出し元へ即時リターンする必要があります。バッチなど長時間実行プロセスでHTTPリクエストをオープンにしたままにするのは良くありません。

Figure 3. Asynchronous Job Launcher Sequence

SimpleJobLauncherTaskExecutorを設定することでこのようなケースに簡単に対応できます。

Java Configuration

@Bean
public JobLauncher jobLauncher() {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository());
        jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
        jobLauncher.afterPropertiesSet();
        return jobLauncher;
}

jobの非同期実行制御にはTaskExecutorインタフェースの実装であれば何でも使用可能です。

1.5. Running a Job

最低点、バッチjobの実行には2つの要素、実行するJobJobLauncher、が必要です。両者は同一コンテキストでも異なるコンテキストでも構いません。例えば、CLIからjobを起動する場合、各jobごとに新規のJVMインスタンス化し、各jobごとに固有のJobLauncherを持ちます。ただし、HttpRequestスコープ内のwebコンテナから起動する場合、通常JobLauncherは1つで、これは複数リクエストがjobを起動するために非同期job実行の設定にします。

1.5.1. Running Jobs from the Command Line

エンタープライズのスケジューラからjobを起動するユーザにとっては、CLIが主要なインターフェースになります。たいていのスケジューラ(NativeJobを使用しないQuartzを除く)はOSのプロセスと直接やり取りして、基本的にはシェルスクリプトでキックします。Javaプロセスを起動するには、スクリプトの他、Perl, Ruby, antやmavenなど'build tools'、などがあります。とはいえ、たいていはシェルスクリプトが最もポピュラーで、このサンプルでもシェルにフォーカスします。

The CommandLineJobRunner

jobの起動スクリプトJava Virtual Machineをキックするので、エントリーポイントとなるmainメソッドを持つクラスが必要です。Spring BatchはまさしくそのためのCommandLineJobRunnerクラスを用意しています。ただし、このクラスはアプリケーションをブートする1つの方法に過ぎず、Javaプロセスを起動する方法は複数あり、このクラスだけが唯一の方法ではありません。CommandLineJobRunnerは4つのタスクを実行します。

  • 適切なApplicationContextのロード
  • コマンドライン引数をパースしてJobParametersに入れる
  • 引数に基づくjobの特定
  • job実行にアプリケーションコンテキストのJobLauncherを使用

これらのタスクはすべて渡す引数のみで実行します。以下が必須の引数です。

Table 1. CommandLineJobRunner arguments

|jobPath|ApplicationContextを生成するためのXMLファイルの場所。このファイルにはJob実行に必要な情報をすべて持つ必要がある。| |jobName|実行するjob名|

これらの引数は最初がpathで次がnameにします。それらの後に来る引数はすべてJobParametersと見なし、'name=value'の形式で渡します。

<bash$ java CommandLineJobRunner io.spring.EndOfDayJobConfiguration endOfDay schedule.date(date)=2007/05/05

通常はjarのmainクラスの宣言にはマニフェストを使用しますが、ここでは説明簡易化のため、直接クラスを指定しています。この例ではdomainLanguageOfBatchと同一の'EndOfDay'サンプルを使用しています。最初の引数は'io.spring.EndOfDayJobConfiguration'でJobを持つconfigクラスの完全修飾クラス名です。次の引数は'endOfDay'でjob名です。最後の引数'schedule.date(date)=2007/05/05'はJobParametersになります。java configの例は以下の通りです。

@Configuration
@EnableBatchProcessing
public class EndOfDayJobConfiguration {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job endOfDay() {
        return this.jobBuilderFactory.get("endOfDay")
                                    .start(step1())
                                    .build();
    }

    @Bean
    public Step step1() {
        return this.stepBuilderFactory.get("step1")
                                    .tasklet((contribution, chunkContext) -> null)
                                    .build();
    }
}

この例はかなり単純化したもので、通常はSpring Batchでバッチjobを動かすには他にも色々なものが必要ですが、CommandLineJobRunnerが必要とする2つのコンポーネントJobJobLauncher、がある事を示しています。

ExitCodes

CLIからバッチjobを起動する場合、基本的にはエンタープライズのスケジューラを用います。スケジューラの多くはシンプル(fairly dumb)でプロセスレベルで動作します。つまり、スケジューラは、シェルスクリプトなど、スケジューラが呼び出したOSプロセスについてだけ関知します。この場合、jobの成功か失敗かをスケジューラに戻してやり取りする唯一の方法はリターンコードだけです。リターンコードはプロセスがスケジューラに返す数値で、実行結果を意味します。最もシンプルな場合、0が成功で、1が失敗です。しかし、より複雑なシナリオが考えられます。たとえば、もしjob Aのreturn 4はjob Bをキックし、return 5はjob Cをキックする、などです。この種の振る舞いはスケジューラレベルで設定しますが、重要なのは、Spring Batchなどのフレームワークは特定バッチjobの'Exit Code'の数値表現を返す方法がある、という点です。Spring BatchではこれをExitStatusで表現しており、詳細はChapter 5で解説します。完了コード(exit code)の解説の点で、知るべき最も重要な点は、ExitStatusには完了コードプロパティがありこの値はフレームワーク(か開発者)が設定し、JobLauncherが返すJobExecutionの一部として返されます。CommandLineJobRunnerExitCodeMapperインタフェースで文字列の完了コードを数値に変換します。

public interface ExitCodeMapper {

    public int intValue(String exitCode);

}

ExitCodeMapperの役割は、文字列の完了コードを数値表現で返すことです。ジョブランナーのデフォルト実装はSimpleJvmExitCodeMapperで、0が正常終了、1が汎用エラー、2はコンテキストにJobが見つからなかったなどのジョブランナーのエラーです。3以上が必要な場合、ExitCodeMapperのカスタム実装が必要です。CommandLineJobRunnerApplicationContextを作成するクラスで、明示的なワイヤリングが書けないので、上書きしたい値はautowiredする必要があります。つまり、ExitCodeMapperの実装がBeanFactoryにある場合、コンテキスト作成後にランナーにインジェクトされます。自前のExitCodeMapperを使うにはrootレベルbeanとして宣言し、ランナーがロードするApplicationContextの一部となるようにしておきます。

1.5.2. Running Jobs from within a Web Container

歴史的に、バッチジョブなどのオフライン処理は上述のようなCLIで起動していました。しかし、HttpRequest経由で実行するケースがよりベターな選択肢な事も増えました。レポーティング・アドホックなジョブ実行・webアプリケーションサポートなどなどです。定義上バッチジョブは長時間実行するので、最も重要な関心事は非同期なジョブ起動です。

Figure 4. Asynchronous Job Launcher Sequence From Web Container

ここでのcontrollerはSpring MVCのコントローラーです。Spring MVCの詳細については https://docs.spring.io/spring/docs/current/spring-framework-reference/web.html#mvc を参照してください。コントローラーは非同期モードに設定したJobLauncherJobを実行します。このJobLauncherJobExecutionを即時リターンします。Jobは実行中のままですが、ノンブロッキングな振る舞いによりコントローラーでHttpRequest処理時に必要な即時リターンが出来ます。以下はその例です。

@Controller
public class JobLauncherController {

    @Autowired
    JobLauncher jobLauncher;

    @Autowired
    Job job;

    @RequestMapping("/jobLauncher.html")
    public void handle() throws Exception{
        jobLauncher.run(job, new JobParameters());
    }
}

1.6. Advanced Meta-Data Usage

ここまで、JobLauncherJobRepositoryインタフェースを解説してきました。共に、jobの起動と、バッチドメインオブジェクトの基礎的なCRUD操作を行います。

Figure 5. Job Repository

JobLauncherJobExecutionの新規オブジェクトの生成と実行にJobRepositoryを使います。Jobの実行中、JobStepの実装は実行の更新に同一のJobRepositoryを使います。基礎的な操作はシンプルなケースでは十分ですが、数百バッチジョブと複雑なスケジューリング要求の大規模なバッチ環境では、メタデータのより高度なアクセスが必要です。

Figure 6. Advanced Job Repository Access

JobExplorerJobOperatorインタフェースは以降で解説します。これらはメタデータの問い合わせと制御機能を持ちます。

1.6.1. Querying the Repository

高度な機能の前に知るべき基本的な事としては、このインタフェースは既存のexecutionsのリポジトリを問い合わせる機能があります。JobExplorerインタフェースが提供する機能は以下です。

public interface JobExplorer {

    List<JobInstance> getJobInstances(String jobName, int start, int count);

    JobExecution getJobExecution(Long executionId);

    StepExecution getStepExecution(Long jobExecutionId, Long stepExecutionId);

    JobInstance getJobInstance(Long instanceId);

    List<JobExecution> getJobExecutions(JobInstance jobInstance);

    Set<JobExecution> findRunningJobExecutions(String jobName);
}

上のメソッドシグネチャの通り、JobExplorerJobRepositoryのリードオンリーバージョンで、JobRepositoryのように、factory beanを使用して設定します。

Java Configuration

...
// 以下はBatchConfigurerの実装内に置くものとする。
@Override
public JobExplorer getJobExplorer() throws Exception {
        JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean();
        factoryBean.setDataSource(this.dataSource);
        return factoryBean.getObject();
}
...

このチャプター前半で、バージョンやスキーマが異なる場合に備えてJobRepositoryのテーブルプレフィクスは変更可能、と説明しました。JobExplorerもそれらのテーブルを参照するので、こちらでもプレフィクスを設定する必要があります。

Java Configuration

...
// 以下はBatchConfigurerの実装内に置くものとする。
@Override
public JobExplorer getJobExplorer() throws Exception {
        JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean();
        factoryBean.setDataSource(this.dataSource);
        factoryBean.setTablePrefix("SYSTEM.");
        return factoryBean.getObject();
}

1.6.2. JobRegistry

JobRegistry(と親インタフェースJobLocator)は必須では無く、コンテキストで利用可能なjobをトラッキングしたい場合に有用です。別のアプリケーションコンテキスト(子コンテキストなど)で作られるjobがある場合、そうしたjobを集中管理する場合にも有用です。カスタムのJobRegistryであれば登録済みjob名とその他プロパティを操作できます。フレームワークの提供する実装は1つだけで、これはjob名からjobインスタンスというシンプルなmapベースです。

@EnableBatchProcessingを使う場合、JobRegistryは特に設定無く使えます。自前の設定をするには以下のようにします。

// @EnableBatchProcessingで提供するクラスなので、SimpleBatchConfigurationのgetterを
// オーバーライドでカスタマイズが可能。
@Override
@Bean
public JobRegistry jobRegistry() throws Exception {
        return new MapJobRegistry();
}

自動的にJobRegistryを処理するには2つの方法があり、bean post processorとregistrar lifecycle componentです。これらは以降のセクションで解説します。

JobRegistryBeanPostProcessor

bean post-processorですべてのjobが生成後にそれらを登録します。

Java Configuration

@Bean
public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor() {
    JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
    postProcessor.setJobRegistry(jobRegistry());
    return postProcessor;
}

Although it is not strictly necessary, the post-processor in the example has been given an id so that it can be included in child contexts (e.g. as a parent bean definition) and cause all jobs created there to also be registered automatically.

AutomaticJobRegistrar

子コンテキストを作成し、job作成時にそのコンテキストのjobを登録するライフサイクルコンポーネントです。この利点は、子コンテキストのjob名はレジストリ内でグローバルに一意ですが、その依存性は"natural" nameを持つ場合があります。たとえば、複数のXML設定ファイルがそれぞれ1つだけJobを持ち、同一のbean名、例えば"reader"、でItemReaderのそれぞれ異なる定義を持つ場合です。これらのXMLファイルを同一コンテキストにインポートすると、reader定義はクラッシュして別の定義をオーバーライドしてしまいますが、automatic registrarではこれを回避します。これにより、アプリケーションの分割モジュール由来のjobを簡単に統合できます。

Java Configuration

@Bean
public AutomaticJobRegistrar registrar() {

    AutomaticJobRegistrar registrar = new AutomaticJobRegistrar();
    registrar.setJobLoader(jobLoader());
    registrar.setApplicationContextFactories(applicationContextFactories());
    registrar.afterPropertiesSet();
    return registrar;

}

このregistrarは2つの必須プロパティがあり、1つはApplicationContextFactory(ここでは簡易的にfactory beanから生成している)の配列で、もう1つはJobLoaderです。JobLoaderは子コンテキストのライフサイクル管理とJobRegistry内のjob登録を担当します。

ApplicationContextFactoryは子コンテキスト生成の責務があり、最も一般的な使用法は上述のClassPathXmlApplicationContextFactoryとしてです。このファクトリの機能の1つは、デフォルトでは、設定のいくつかを親コンテキストから子にコピーします。よって、親と同じ設定を使いたい場合、子でPropertyPlaceholderConfigurerAOP設定を再定義する必要がありません。

必要に応じてAutomaticJobRegistrarJobRegistryBeanPostProcessorと組み合わせて使用可能です(DefaultJobLoaderを使用する場合に限る)。メインの親コンテキストにも子にもjobを定義する場合、こちらが望ましい場合があります。

1.6.3. JobOperator

上述の通り、JobRepositoryメタデータCRUD操作を提供し、JobExplorerメタデータのリードオンリーの操作を提供します。これらの操作の組み合わせは、バッチオペレータがよく行うJobの停止・リスタート・サマライズなどの、一般的なモニタリングタスクの実行に有用です。Spring Batchはその種の操作をJobOperatorで提供します。

public interface JobOperator {

    List<Long> getExecutions(long instanceId) throws NoSuchJobInstanceException;

    List<Long> getJobInstances(String jobName, int start, int count)
          throws NoSuchJobException;

    Set<Long> getRunningExecutions(String jobName) throws NoSuchJobException;

    String getParameters(long executionId) throws NoSuchJobExecutionException;

    Long start(String jobName, String parameters)
          throws NoSuchJobException, JobInstanceAlreadyExistsException;

    Long restart(long executionId)
          throws JobInstanceAlreadyCompleteException, NoSuchJobExecutionException,
                  NoSuchJobException, JobRestartException;

    Long startNextInstance(String jobName)
          throws NoSuchJobException, JobParametersNotFoundException, JobRestartException,
                 JobExecutionAlreadyRunningException, JobInstanceAlreadyCompleteException;

    boolean stop(long executionId)
          throws NoSuchJobExecutionException, JobExecutionNotRunningException;

    String getSummary(long executionId) throws NoSuchJobExecutionException;

    Map<Long, String> getStepExecutionSummaries(long executionId)
          throws NoSuchJobExecutionException;

    Set<String> getJobNames();

}

上記の操作はJobLauncher, JobRepository, JobExplorer, JobRegistryなど多数の異なるインタフェース由来のメソッドを集約したものです。よって、JobOperatorの実装、SimpleJobOperatorは多数の依存性を持ちます。

 /**
  * このbeanにインジェクトされるすべての依存性は@EnableBatchProcessingが作成する。
  */
 @Bean
 public SimpleJobOperator jobOperator(JobExplorer jobExplorer,
                                JobRepository jobRepository,
                                JobRegistry jobRegistry) {

        SimpleJobOperator jobOperator = new SimpleJobOperator();

        jobOperator.setJobExplorer(jobExplorer);
        jobOperator.setJobRepository(jobRepository);
        jobOperator.setJobRegistry(jobRegistry);
        jobOperator.setJobLauncher(jobLauncher);

        return jobOperator;
 }

※ job repositoryにテーブルプレフィクスを設定する場合、忘れずにjob explorerにも設定してください。

1.6.4. JobParametersIncrementer

JobOperatorの大半は見たままの挙動です、詳細についてはavadoc of the interfaceにあります。ただし、startNextInstanceには注意が必要です。このメソッドは常にJobの新規インスタンスを開始します。JobExecutionに致命的な問題が発生して最初からJobをやり直したい場合に特に有効です。しかし、JobLauncherがパラメータが前回とは異なる場合に新規JobInstanceを開始する新規のJobParametersを必要とするのとは違って、startNextInstanceメソッドは、新規インスタンスJobに強制するために、Jobに紐付くJobParametersIncrementerを使用します。

public interface JobParametersIncrementer {

    JobParameters getNext(JobParameters parameters);

}

JobParametersIncrementerの責務は、与えられたJobParametersを使用し、これに含まれる何らかの必須の値をインクリメントして'次の'JobParametersオブジェクトを返します。これは、'次の'インスタンスを作るのにJobParametersの何を変更すれば良いのかをフレームワークが知らない場合に有用です。たとえば、JobParametersにdateが1つだけの場合、次回のインスタンスを作る必要がありますが、1日進めるのか、それとも1週間でしょうか。Jobの識別に何らかの数値を使う場合にも同じことが言えます。以下がその例です。

public class SampleIncrementer implements JobParametersIncrementer {

    public JobParameters getNext(JobParameters parameters) {
        if (parameters==null || parameters.isEmpty()) {
            return new JobParametersBuilder().addLong("run.id", 1L).toJobParameters();
        }
        long id = parameters.getLong("run.id",1L) + 1;
        return new JobParametersBuilder().addLong("run.id", id).toJobParameters();
    }
}

この例では、'run.id'キーをJobInstancesの識別に使います。JobParametersがnullの場合、Jobをまだ一度も実行していないと見なして初期値を返します。そうでない場合、古い値を取得し、1増やして返します。

ビルダーのincrementerメソッドでJobにincrementerを関連付けします。

@Bean
public Job footballJob() {
    return this.jobBuilderFactory.get("footballJob")
                                     .incrementer(sampleIncrementer())
                                     ...
                     .build();
}

1.6.5. Stopping a Job

JobOperatorのよくある使い方の一つはJobのgracefullyな停止です。

Set<Long> executions = jobOperator.getRunningExecutions("sampleJob");
jobOperator.stop(executions.iterator().next());

即時シャットダウン強制する手段がないので、即時シャットダウンはしません。フレームワークの制御下に無いビジネスサービスなど開発者のコードを実行中の場合は特にそうです。しかし、制御がフレームワークに戻り次第、StepExecutionBatchStatus.STOPPEDをセットしてセーブし、JobExecutionにも終了前に同様の処理をします。

1.6.6. Aborting a Job

FAILEDのjob実行はリスタート可能です(Jobがrestartableの場合)。ABANDONEDのjob実行はフレームワークによるリスタートは出来ません。なお、ABANDONEDはリスタートしたjob実行内でstep実行をskippableにするのにも使います。もし以前に失敗したjob実行でABANDONEDとなったstepがある状態でjob実行する場合、その次のstep(jobフロー定義とstep実行完了ステータスが決定する)に移行します。

プロセスが死んだ("kill -9"やサーバエラー)場合、当然そのjobは動作していませんが、プロセスが死ぬ前に誰もJobRepositoryに通知しないのでそのことを知る術がありません。失敗かアボート(ステータスをFAILEDABANDONEDに変更)のどちらにするかを決めて手動更新する必要があります。これは運用判断で自動決定は出来ません。restartableでは無いか、リスタートデータがvalidであると判断出来れば、FAILEDに変更します。job実行をアボートするにはSpring Batch Admin JobServiceユーティリティを使います。

Spring Batch 4.1.x - Reference Documentation - The Domain Language of Batchのテキトー翻訳

https://docs.spring.io/spring-batch/4.1.x/reference/html/domain.html#domainLanguageOfBatch

https://qiita.com/kagamihoge/items/12fbbc2eac5b8a5ac1e0 俺の訳一覧リスト

1. The Domain Language of Batch

何らかの経験のあるバッチアーキテクトにとって、Spring Batchのバッチ処理のコンセプトはお馴染みのものです。"Jobs"、"Steps"、開発者が用意する処理単位のItemReaderItemWriterがあります。ただ、Springのパターン, operations, テンプレート、コールバック、イディオムを使えるので、以下も可能です。

  • 関心事の分離による品質向上。
  • インターフェースによるサービスとレイヤーの明確化。
  • シンプルなデフォルト実装により、そのままの設定ですぐ動かせる。
  • 拡張性の大幅な機能強化。

以下の図はここ数十年使われ続けてきたバッチのリファレンスアーキテクチャを単純化したものです。バッチ処理ドメイン言語を構成するコンポーネントの概略を示しています。このアーキテクチャフレームワークの設計はここ数世代の複数プラットフォーム(COBOL/Mainframe, C/Unix, and now Java/anywhere)の実装を通して実証されてきました。JCLとCOBOLの開発者は、C, C#, Java開発者と同様に、このコンセプトに慣れていると思われます。Spring Batchは、広範なバッチアプリケーション開発に使われるメンテナンス性がありロバストなシステムで、広く見られるサービス・コンポーネント・レイヤの実装を提供します。また、非常に複雑な要求を扱うための拡張とインフラも備えています。

Figure 1. Batch Stereotypes

上図はSpring Batchのドメイン言語を構成する中核コンセプトを表しています。Jobは一つか複数のstepを持ち、各stepはItemReader, ItemProcessor, ItemWriterを一つ持ちます。jobは(JobLauncher)で実行し、現在実行中のプロセスのメタデータは(JobRepository)に保存されます。

1.1. Job

このセクションではbatch jobのコンセプトに関する解説をします。Jobバッチ処理全体をカプセル化するエンティティです。他のSpringプロジェクト同様に、JobXMLJavaのどちらかの設定でワイヤリングします。この設定は"job configuration"と呼ばれる事が多いです。なお、以下の図で示すように、Jobは階層全体のトップに居ます。

Figure 2. Job Hierarchy

Spring Batchでは、JobStepインスタンスのコンテナです。jobは複数のstepを持ち、これらはフロー内の論理的なまとまりを構成し、リスタートなど全てのstepにグローバルなプロパティ設定があります。job設定は以下があります。

  • job名。
  • Stepインスタンスの定義と順序。
  • jobがリスタート可能かどうか。

Spring BatchのデフォルトJob実装はSimpleJobクラスで、Jobにいくつかの標準機能を追加しています。Java設定の場合、Jobインスタンス化に利用可能なビルダーが複数あり、以下はその例です。

@Bean
public Job footballJob() {
    return this.jobBuilderFactory.get("footballJob")
                     .start(playerLoad())
                     .next(gameLoad())
                     .next(playerSummarization())
                     .end()
                     .build();
}

1.1.1. JobInstance

JobInstanceはjob実行に相当します。いま、一日の終わりに一度だけ実行する、前述の図の'EndOfDay' Job、などのバッチjobを考えます。jobは'EndOfDay'の1つですが、Jobの個々の実行はそれぞれ別個に扱う必要があります。このjobの場合、1日1つのJobInstanceになります。例えば、1/1の実行、1/2の実行、になりま1す。1/1の実行が初回は失敗したが翌日再実行する場合、1/1の実行を再度走らせます(通常、バッチ実行は処理するデータと対応関係があるので、1/1の実行は1/1のデータを処理します)。よって、JobInstanceは複数の実行を持ちうる(JobExecutionはこの賞の後半で解説します)事になり、特定のJobに対応するJobInstanceは一つだけで、JobParametersである特定時点のJobInstanceを実行します。

JobInstance定義はロードされるデータとは完全に無関係です。データのロード方法はItemReaderの実装次第です。たとえば、EndOfDayのケースだと、 'effective date'や'schedule date'を指すデータ列が考えらます。よって、1/1の実行は1/1のデータのみロードし、1/2は1/2のデータのみロードします。これは仕様次第で、ItemReaderで実装します。ただし、同一JobInstanceを使用すると、以前の実行の'state'(ExecutionContextの事でこの章の後半で解説)かどうかが決まります。新規のJobInstanceは'最初から開始'を意味し、既存インスタンスは基本的には'抜けた場所から開始'を意味します。

1.1.2. JobParameters

JobInstanceとそのJobとの違いを解説したので、"JobInstanceの区別方法は?"という疑問が自然に浮かぶと思われます。答えはJobParametersです。JobParametersオブジェクトはバッチjobに使うパラメータを保持します。識別に使われる他、以下の図のように、実行中の参照データとしても使われます。

Figure 3. Job Parameters

先の例では、二つのインスタンスがあり、一つは1/1でもう一つが1/2、Job1つだけですが、JobParameterオブジェクトは2つあります。前者はパラメータ01-01-2017で開始しており、後者は01-02-2017で開始しています。よって、定義としてはJobInstance = Job + JobParametersになります。これにより、開発者はJobInstanceの定義を、渡すパラメータによって、制御します。

JobInstanceの識別にすべてのjobパラメータが必要なわけではありません。By default, they do so. ただし、このフレームワークでは、JobInstanceの識別に使用しないパラメータでJobのサブミットが可能です。

1.1.3. JobExecution

JobExecutionは1回のjob実行に相当します。実行は失敗か成功で終了しますが、ある実行が正常終了しない限り、対応するJobInstanceは完了したとは見なされません。例としてEndOfDayのJobで解説すると、01-01-2017のJobInstanceが初回実行時に失敗した場合を考えます。初回実行(01-01-2017)と同一のjob parametersで再度実行すると、JobExecutionを新規に作成します。ただし、JobInstanceは1つのままです。

Jobはjobの中身と実行方法を定義し、JobInstanceは実行をグループ化するオブジェクトで、これは主に正しくリスタートするためです。一方で、JobExecutionは、実行中に発生した事を格納する役割があり、制御と永続化されるプロパティがあります。

Table 1. JobExecution Properties

Property Definition
Status 実行ステータスを示すBatchStatus。実行時、BatchStatus#STARTED。失敗、BatchStatus#FAILED。正常終了、BatchStatus#COMPLETED
startTime 実行開始時の現在システム時刻のjava.util.Date 。jobがまだ開始していない場合は空。
endTime 実行終了時の現在システム時刻のjava.util.Date。成功かどうかは無関係。jobがまだ終了していない場合は空。
exitStatus 実行結果を示すExitStatus。呼び出し元に返される終了コードなので、最も重要。詳細は5章参照。jobがまだ終了していない場合は空。
createTime JobExecutionを最初に永続化した時点の現在システム時刻のjava.util.Date 。jobが未開始の場合がある(その場合startTimeも空)が、jobレベルのExecutionContextsを管理する上でフレームワークはcreateTimeを必要とします。
lastUpdated JobExecutionを最後の永続化した時間のjava.util.Date。jobが未開始の場合は空。
executionContext 複数のexecution間で永続化の必要があるユーザデータを持つ、プロパティの入れ物。
failureExceptions Job実行中に発生した例外リスト。Jobの失敗時に複数の例外が発生した場合に役立ちます。

これらプロパティは永続化され、実行ステータスの決定に使われる点が重要です。例えば、1/1のEndOfDay jobが9:00 PMに実行して9:30に失敗した場合、以下のエントリがメタデータテーブルに作られます。

Table 2. BATCH_JOB_INSTANCE

JOB_INST_ID JOB_NAME
1 EndOfDayJob

Table 3. BATCH_JOB_EXECUTION_PARAMS

JOB_EXECUTION_ID TYPE_CD KEY_NAME DATE_VAL IDENTIFYING
1 DATE schedule.Date 2017-01-01 TRUE

Table 4. BATCH_JOB_EXECUTION

JOB_EXEC_ID OB_INST_ID START_TIME END_TIME STATUS
1 1 2017-01-01 21:00 2017-01-01 21:30 FAILED

カラム名は説明のために削除したり簡略化しているものがあります。

いま、jobが失敗し、問題解決には一晩かかるとし、バッチウィンドウ('batch window')は終了しています。次に、ウィンドウ開始を9:00 PM、1/1のjobを再度開始、前回失敗箇所から開始して9:30に正常終了しました。この時すでに翌日になっており、1/2のjobも走らせる必要があるので、その直後の9:31に開始して通常の1時間で10:30に完了しました。1つのJobInstanceを連続実行する事に特に要求事項は無いですが、2つのjobが同一データにアクセスする可能性が無い限りにおいてであり、これはDBレベルでロックを引き起こしかねないためです。いつJobを実行するかどうかはスケジューラーに完全に依存します。別々のJobInstancesなので、Spring Batchはコンカレントな実行を止めません。(同一のJobInstanceが実行中の場合、JobExecutionAlreadyRunningExceptionをスローします。)最終的に、以下表のように、JobInstanceJobParametersテーブルに行が追加され、JobExecutionに2行追加されます。

Table 5. BATCH_JOB_INSTANCE

JOB_INST_ID JOB_NAME
1 EndOfDayJob
2 EndOfDayJob

Table 6. BATCH_JOB_EXECUTION_PARAMS

JOB_EXECUTION_ID TYPE_CD KEY_NAME DATE_VAL IDENTIFYING
1 DATE schedule.Date 2017-01-01 00:00:00 TRUE
2 DATE schedule.Date 2017-01-01 00:00:00 TRUE
3 DATE schedule.Date 2017-01-02 00:00:00 TRUE

Table 7. BATCH_JOB_EXECUTION

JOB_EXEC_ID JOB_INST_ID START_TIME END_TIME STATUS
1 1 2017-01-01 21:00 2017-01-01 21:30 FAILED
2 1 2017-01-02 21:00 2017-01-02 21:30 COMPLETED
3 2 2017-01-02 21:31 2017-01-02 22:29 COMPLETED

カラム名は説明のために削除したり簡略化しているものがあります。

1.2. Step

Stepは、バッチjobのシーケンシャルで独立したフェーズ、をカプセル化したドメインオブジェクトです。よって、すべてのJobは必ず1つ以上のstepで構成します。Stepには実際のバッチ処理を定義したり制御するのに必要なすべての情報を持ちます。これは全く持って曖昧な説明ですが、Stepの中身は開発者が書くJob次第なためです。Stepは開発者次第でシンプルなものから複雑なものまで作成出来ます。シンプルなStepはファイルからDBにデータをロードし、コードは全く書かないか少量で済みます(実装依存)。複雑なStepは、何らかの処理の一部となる複雑なビジネスルールを持つものが考えられます。Job同様、Stepは複数のStepExecutionを持ち、1つのJobExecutionと関連を持ちます。以下がイメージ図です。

igure 4. Job Hierarchy With Steps

1.2.1. StepExecution

StepExecutionStepの1回分の実行を表現したものです。JobExecution同様に、Stepが実行される度に新規にStepExecutionを生成します。もしstepが実行に失敗するとき、原因がそのstepより前にある場合、そのstepのexecutionは永続化しません。StepExecutionStepが実際に開始した後にだけ生成します。

Stepの実行はStepExecutionクラスのオブジェクトで表現します。それぞれの実行は、対応するstep・JobExecutionトランザクション関連データ、このデータはコミット・ロールバックカウント・開始終了時刻など、を持ちます。また、それぞれのstep executionはExecutionContextを持ち、ここには開発者がバッチ実行中に永続化しておきたい任意のデータ、例えば統計やリスタートに必要な状態、を持たせます。以下はStepExecutionのプロパティリストです。

Table 8. StepExecution Properties

Property Definition
Status 実行ステータスを示すBatchStatusオブジェクト。実行中BatchStatus.STARTED。失敗BatchStatus.FAILED。正常終了BatchStatus.COMPLETED
startTime 実行開始時の現在システム時刻のjava.util.Date 。stepがまだ開始していない場合は空。
endTime 実行終了時の現在システム時刻のjava.util.Date。成功かどうかは無関係。stepがまだ終了していない場合は空。
exitStatus 実行結果を示すExitStatus。呼び出し元に返される終了コードなので、最も重要。詳細は5章参照。jobがまだ終了していない場合は空。
executionContext 複数のexecution間で永続化の必要があるユーザデータを持つ、プロパティの入れ物。
readCount 正常に読み込んだアイテム数。
writeCount 正常に書き込んだアイテム数。
commitCount この実行でコミットしたトランザクション数。
rollbackCount Stepで制御するビジネストランザクションロールバック数。
readSkipCount readが失敗した回数。アイテムスキップとなる。
processSkipCount processが失敗した回数。アイテムスキップとなる。
filterCount ItemProcessorがフィルターしたアイテム数。
writeSkipCount writeが失敗した回数。アイテムスキップとなる。

1.3. ExecutionContext

ExecutionContextフレームワークが制御および永続化するkey/valueのコレクションで、開発者が永続化状態を保存する場所です。これはStepExecutionもしくはJobExecutionスコープになります。Quartzで言うところのJobDataMapです。最も良く使われる例としてはリスタートの調整です。入力としてフラットファイルを取る例で言うと、個々の行を処理中、フレームワークは定期的にコミットポイントでExecutionContextを永続化します。これにより、ItemReaderで実行中に致命的エラーが発生してもその状態を保存します。これを実装するには、以下例のように、コンテキストに読み込んだ現在行をputすると、あとはフレームワークがやってくれます。

executionContext.putLong(getKey(LINES_READ_COUNT), reader.getPosition());

例としてJobセクションのEndOfDayサンプルの場合、'loadData'というファイルをDBにロードするstepが一つあるとします。最初の実行が失敗後、メタデータテーブルは以下の例になります。

Table 9. BATCH_JOB_INSTANCE

JOB_INST_ID JOB_NAME
1 EndOfDayJob

Table 10. BATCH_JOB_EXECUTION_PARAMS

JOB_INST_ID TYPE_CD KEY_NAME DATE_VAL
1 DATE schedule.Date 2017-01-01

Table 11. BATCH_JOB_EXECUTION

JOB_EXEC_ID JOB_INST_ID START_TIME END_TIME STATUS
1 1 2017-01-01 21:00 2017-01-01 21:30 FAILED

Table 12. BATCH_STEP_EXECUTION

STEP_EXEC_ID JOB_EXEC_ID STEP_NAME START_TIME END_TIME STATUS
1 1 loadData 2017-01-01 21:00 2017-01-01 21:30 FAILED

Table 13. BATCH_STEP_EXECUTION_CONTEXT

STEP_EXEC_ID SHORT_CONTEXT
1 {piece.count=40321}

上の場合、Stepは30分実行し、このシナリオにおけるファイル行であるところの40,321 'pieces'を処理しています。この値はフレームワークがコミット直前に更新します。また、`この値にはExecutionContextの複数エントリに対応する形で複数行を含められます。コミット前に通知を受けるには何らかのStepListener実装(かItemStream)が必要で、詳細は本が意図の後半で扱います。前述の例同様、ここでもJobを翌日にリスタートする、とします。リスタートすると、最終実行時のExecutionContextの値をDBから再構成します。ItemReaderがopenすると、以下例のように、コンテキストに保存した状態があればチェックしてその値で初期化します。

if (executionContext.containsKey(getKey(LINES_READ_COUNT))) {
    log.debug("Initializing for restart. Restart data is: " + executionContext);

    long lineCount = executionContext.getLong(getKey(LINES_READ_COUNT));

    LineReader reader = getReader();

    Object record = "";
    while (reader.getPosition() < lineCount && record != null) {
        record = readLine();
    }
}

この例の場合、上のコードを実行すると、カレントの行は40322になり、Stepは前回失敗箇所からの再実行が可能となります。また、ExecutionContextにはその実行に関して永続化したい統計情報にも使えます。例えば、フラットファイルが複数行に存在する処理命令(orders for processing)を含むケースでは、処理した命令数(読み込んだ行数とは異なる)を保存し、Stepの最後に合計処理命令数をメールで送るような処理が可能です。それぞれのJobInstanceで正確なスコープにするため、フレームワークがそれらの情報を保存します。すでに在るExecutionContextが使われるかどうかを知ることは困難です。たとえば、上記の 'EndOfDay' の例で言うと、1/1を2回目に実行するとき、フレームワークは同一JobInstanceStepであると解釈し、DBからExecutionContextを取得し、その取得データを(StepExecutionの一部として)Stepに渡します。一方で、1/2実行時には、フレームワークは別のインスタンスであると解釈し、よって空のコンテキストをStepに渡します。There are many of these types of determinations that the framework makes for the developer, to ensure the state is given to them at the correct time. また、ある時点において1つのStepExecutionには1つのExecutionContextだけが存在する点は重要です。ExecutionContextを使う際には注意が必要で、このインスタンスは共有のキースペース(a shared keyspace)を生成するためです。このため、データを上書きしないような注意が必要です。ただし、Stepがコンテキストに何もデータを保存しないのであれば、フレームワークには何も影響を与えません。

1つのJobExecutionに少なくとも1つのExecutionContext、それぞれのStepExecutionごとに1つのExecutionContext、が存在する点も重要です。例えば、以下のコード例を考えます。

ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();
//ecStepとecJobは異なる

コメントにあるように、ecStepecJobは異なります。この場合は2つの異なるExecutionContextsが存在します。1つはStepのスコープでStepのコミットポイントごとにセーブし、一方、Jobのスコープの方はStep実行の間でセーブします。

1.4. JobRepository

JobRepositoryは上述のすべての機能のための永続化メカニズムです。JobLauncher, Job, Step実装で用いるためのCRUD操作を提供します。Jobをラウンチする時、JobExecutionリポジトリから取得し、また、実行中には、StepExecutionJobExecutionの実装をリポジトリに渡すことで永続化をします。

java configurationを使う場合、@EnableBatchProcessingアノテーションのデフォルト設定でJobRepositoryが自動的に登録されます。

1.5. JobLauncher

JobLauncherは、以下例のように、JobParametersを渡してJobを起動するためのインタフェースです。

public interface JobLauncher {

public JobExecution run(Job job, JobParameters jobParameters)
            throws JobExecutionAlreadyRunningException, JobRestartException,
                   JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}

このインタフェース実装はJobRepositoryからJobExecutionを取得してJobを実行します。

1.6. Item Reader

ItemReaderは、1回に1アイテム、Stepでの入力を取得するためのインタフェースです。ItemReaderが読み込み可能なアイテムが無くなった事を通知するにはnullを返します。ItemReaderインタフェースの詳細と各種実装についてはReaders And Writersを参照してください。

1.7. Item Writer

ItemWriterは、1回に1チャンクか1バッチ、Stepでの出力を行うためのインタフェースです。通常、ItemWriterは次に受け取るであろう入力に関しては関知せず、現在の呼び出し中に渡されたアイテムのみ処理します。ItemWriterインタフェースの詳細と各種実装についてはReaders And Writersを参照してください。

1.8. Item Processor

ItemProcessorはアイテムにビジネス処理をを行うためのインタフェースです。ItemReaderは1アイテム読み込み、ItemWriterはそれらの書き込み、ItemProcessorはそれ以外のビジネス処理の適用や変換処理のアクセスポイントとなります。なお、アイテム処理中に、アイテムがvalidで無い場合、nullを返すとそのアイテムは書き込みません。ItemProcessorインタフェースの詳細と各種実装についてはReaders And Writersを参照してください。