https://docs.spring.io/spring-batch/4.1.x/reference/html/step.html#configureStep
https://qiita.com/kagamihoge/items/12fbbc2eac5b8a5ac1e0 俺の訳一覧リスト
1. Configuring a Step
ドメインのチャプターで解説したように、Step
は、バッチjobのシーケンシャルで独立したフェーズをカプセル化したもので、バッチ処理の制御と定義に必要なすべての情報を持ちます。Step
の中身はJob
を書く開発者次第なため、やや曖昧な説明になっています。Step
は開発者次第でシンプルにも複雑にもなります。シンプルなStep
はファイルからDBにロードし、要求されるコードは少しかゼロです(実装次第)。複雑なStep
は、以下イメージ図のような、複雑なビジネスルール処理の一部になるものがあります。
Figure 1. Step
1.1. Chunk-oriented Processing
Spring Batchは一般的な実装スタイルのうちチャンク指向処理スタイルを使います。チャンク指向の処理は、1度に1データ読み取ってトランザクション境界内で書き込むチャンクを作成します。ItemReader
から1アイテム読み、ItemProcessor
に渡し、集約します。読み込んだアイテム数がコミット間隔に達すると、ItemWriter
がチャンクを書き込み、トランザクションをコミットします。以下のイメージが一連の処理です。
Figure 2. Chunk-oriented Processing
以下のコードは概念を示すコードです。
List items = new Arraylist(); for(int i = 0; i < commitInterval; i++){ Object item = itemReader.read() Object processedItem = itemProcessor.process(item); items.add(processedItem); } itemWriter.write(items);
1.1.1. Configuring a Step
Step
の依存性のリストは比較的短いですが、多くの関連クラスを持つ場合があるので非常に複雑です。
java設定の場合、以下例のように、Spring Batchのビルダーが使えます。
Java Configuration
/** * JobRepositoryは基本的にはautowiredされるので明示的な設定は不要です。 */ @Bean public Job sampleJob(JobRepository jobRepository, Step sampleStep) { return this.jobBuilderFactory.get("sampleJob") .repository(jobRepository) .start(sampleStep) .build(); } /** * TransactionManagerは基本的にはautowiredされるので明示的な設定は不要です。 */ @Bean public Step sampleStep(PlatformTransactionManager transactionManager) { return this.stepBuilderFactory.get("sampleStep") .transactionManager(transactionManager) .<String, String>chunk(10) .reader(itemReader()) .writer(itemWriter()) .build(); }
上の設定はアイテム指向stepを作るのに必要な依存性だけを入れています。
reader
: processorにアイテムを与えるItemReader
。writer
:ItemReader
からのアイテムを処理するItemWriter
transactionManager
: トランザクションの開始とコミットをするSpringのPlatformTransactionManager
repository
:JobRepository
は定期的にStepExecution
とExecutionContext
を保存する(コミットの直前)。chunk
: ここではアイテムベースのstepを意味し、トランザクションコミットの処理アイテム数を指定している。
なお、repository
のデフォルトのbeanはjobRepository
でtransactionManager
はtransactionManger
(どちらも@EnableBatchProcessing
で自動設定されます)。また、ItemProcessor
は無くても良く、これによりアイテムをreaderからwriterに直接渡せます。
1.1.3. The Commit Interval
前述の通り、stepはアイテムを読んで書き込み、設定したPlatformTransactionManager
で定期的にコミットします。commit-interval
が1の場合、1アイテム書き込むたびにコミットします。ただ、トランザクションの開始とコミットは高コストなので、多くの場合にこれは理想的とは言えません。理想的には、各トランザクションでなるべく多数のアイテム処理するのが好ましく、これはstepが相互作用するリソースと処理データの特性に完全に依存します。このため、あるコミットで処理するアイテム数は変更可能です。以下の例はcommit-interval
が10のtasklet
のstep
です。
Java Configuration
@Bean public Job sampleJob() { return this.jobBuilderFactory.get("sampleJob") .start(step1()) .end() .build(); } @Bean public Step step1() { return this.stepBuilderFactory.get("step1") .<String, String>chunk(10) .reader(itemReader()) .writer(itemWriter()) .build(); }
上の例では、各トランザクションで10アイテム処理します。処理開始時にトランザクションが開始します。そして、各アイテムをItemReader
のread
で読むたびにカウンターがインクリメントします。10に達すると、集約アイテムのリストをItemWriter
に渡し、トランザクションをコミットします。
Configuring a Step for Restart
Configuring and Running a JobではJob
のリスタートについて解説しました。リスタートは様々な影響をstepに与えるため、従って、何らかの設定が必要となります。
Setting a Start Limit
Step
の最大実行回数を制御したい場合があります。たとえば、ある特定のStep
がリソースを無効化して再実行前に手動で戻す必要があるので、一度だけ実行するようにしたい、などです。設定はstepレベルで行い、異なるstepで異なる設定が可能です。一度しか実行出来ないStep
は、制限無しのStep
があるJob
と一緒に入れられます。以下のコード例はstart limit設定の例です。
Java Configuration
@Bean public Step step1() { return this.stepBuilderFactory.get("step1") .<String, String>chunk(10) .reader(itemReader()) .writer(itemWriter()) .startLimit(1) .build(); }
上のstepは一度だけ実行可能です。再実行するとStartLimitExceededException
をスローします。なお、start-limitのデフォルト値はInteger.MAX_VALUE
です。
Restarting a Completed Step
restartable jobの場合、初めは成功したかどうかに関わらず、常に実行したいstepがある場合があります。validation stepや処理前にリソースのクリーンアップをするStep
などです。restarted jobの通常処理では、正常終了したことを示す'COMPLETED'ステータスとなり、これはスキップします。allow-start-if-complete
を"true"に設定するとその挙動をオーバーライドして常に実行するようになります。以下が例です。
Java Configuration
@Bean public Step step1() { return this.stepBuilderFactory.get("step1") .<String, String>chunk(10) .reader(itemReader()) .writer(itemWriter()) .allowStartIfComplete(true) .build(); }
Step Restart Configuration Example
以下はリスタート可能なstepを持つjobの設定例です。
Java Configuration
@Bean public Job footballJob() { return this.jobBuilderFactory.get("footballJob") .start(playerLoad()) .next(gameLoad()) .next(playerSummarization()) .end() .build(); } @Bean public Step playerLoad() { return this.stepBuilderFactory.get("playerLoad") .<String, String>chunk(10) .reader(playerFileItemReader()) .writer(playerWriter()) .build(); } @Bean public Step gameLoad() { return this.stepBuilderFactory.get("gameLoad") .allowStartIfComplete(true) .<String, String>chunk(10) .reader(gameFileItemReader()) .writer(gameWriter()) .build(); } @Bean public Step playerSummarization() { return this.stepBuilderFactor.get("playerSummarization") .startLimit(2) .<String, String>chunk(10) .reader(playerSummarizationSource()) .writer(summaryWriter()) .build(); }
上の設定例のjobは、フットボールのデータロードしてサマリをします。3つのstep、playerLoad
, gameLoad
, playerSummarization
があります。playerLoad
はフラットファイルからプレイヤーデータをロードし、gameLoad
は同様にゲームデータをロードします。最後に、playerSummarization
は、ゲームをベースに各プレイヤーの統計を出力します。ここでの想定は、playerLoad
のファイルロードは一度だけにしたいが、gameLoad
はゲームデータをディレクトリからファイルで取得し、DBに正常ロード後はそのファイルを削除する、とします。この場合、playerLoad
は特に設定を必要としません。何度でも実行可能ですが、completeになると、以降はスキップします。しかしgameLoad
は最終実行以降にファイルが追加された場合は都度実行の必要があります。毎回実行するために'allow-start-if-complete'を'true'にしています。(DBのgamesテーブルはprocess indicatorを持ち、次のsummarization stepでその新規ゲームデータを参照可能になっている、という想定)。summarization stepはこのjobで最も重要で、start limitは2にしています。stepが連続して失敗し、job実行を制御するオペレータにexit codeを返します。そして、手動変更しない限り再開は出来なくなります。
※ 本ドキュメントのjob例はサンプルプロジェクトのfootballJob
とは異なります。
このセクションのまとめとしてfootballJob
の例を3回実行したときに起きることを解説します。
Run 1:
playerLoad
を実行すると正常終了し、'PLAYERS'テーブルに400プレイヤー追加する。gameLoad
を実行するとゲームデータの11ファイルを処理し、'GAMES'テーブルにロードする。playerSummarization
を開始すると5分後に失敗する。
Run 2:
playerLoad
は既に正常終了済みなので実行しない。allow-start-if-complete
は'false'(デフォルト)gameLoad
は再実行して更に別の2ファイルを処理し、前回様に'GAMES'テーブルにロードする(このデータについてはprocess indicatorが未処理となる)。playerSummarization
はすべての残ゲームデータ(process indicatorでフィルタリング)を処理開始して30分後に失敗する。
Run 3:
playerLoad
は既に正常終了済みなので実行しない。allow-start-if-complete
は'false'(デフォルト)gameLoad
は再実行して更に別の2ファイルを処理し、前回様に'GAMES'テーブルにロードする(このデータについてはprocess indicatorが未処理となる)。playerSummarization
は開始せずjobが即時killされる。この時点でplayerSummarization
は3回目で、リミットが2なため。リミットを上げるか、そのJob
で新規のJobInstance
を作る必要がある。
1.1.5. Configuring Skip Logic
エラーが発生してもStep
を失敗にはせず、代わりにスキップしたい場合があります。基本的には、そこでのデータや事象の意味を理解する人が決定します。たとえば、金融データでは、送金は完全に実行する必要あるので、スキップ可能でないものもあります。一方、ベンダーリストのロードは、スキップ可能な場合があります。フォーマット間違いや必須情報の漏れでロード出来ない場合、おおむねスキップで問題ありません。たいていの場合、こうした不正レコードはログ出力します。これについては後に述べるリスナーで処理します。
以下はskip limitの使用例です。
Java Configuration
@Bean public Step step1() { return this.stepBuilderFactory.get("step1") .<String, String>chunk(10) .reader(flatFileItemReader()) .writer(itemWriter()) .faultTolerant() .skipLimit(10) .skip(FlatFileParseException.class) .build(); }
上の例ではFlatFileItemReader
を使用しています。この場合、どの段階においても、FlatFileParseException
をスローするとそのアイテムはスキップされてskip limit合計10に対してカウントアップします。step実行中のread, process, writeのスキップ数は別々にカウントしますが、limitはその全スキップに対して適用します。skip limitに達すると、次の例外スローでstepは失敗します。つまり、11回のスキップが例外をトリガするが、10回ではありません(the eleventh skip triggers the exception, not the tenth.)。
上の例の問題点としては、FlatFileParseException
以外のその他すべての例外でJob
は失敗になります。場合によってはこれは正しい動作となります。しかし、さらに場合によっては、失敗させる例外を指定してその他の全例外はスキップする方が簡単な場合があります。以下はその例です。
Java Configuration
@Bean public Step step1() { return this.stepBuilderFactory.get("step1") .<String, String>chunk(10) .reader(flatFileItemReader()) .writer(itemWriter()) .faultTolerant() .skipLimit(10) .skip(Exception.class) .noSkip(FileNotFoundException.class) .build(); }
スキップ可能例外クラスにjava.lang.Exception
を指定することで、すべてのExceptions
をスキップ可能に指定しています。ただし、'除外'としてjava.io.FileNotFoundException
を指定することで、すべてのExceptions
からFileNotFoundException
を除外した設定になります。もしその除外した例外が発生する(つまりスキップ不能な例外)はfatalになります。
例外発生時において、スキップ可能かどうかはクラス階層上の最も近いスーパークラスが決定します。未分類の例外は'fatal'扱いになります。
skip
とnoSkip
の呼び出し順序は特に意味を持ちません。
1.1.6. Configuring Retry Logic
たいていの場合、例外はskipするかStep
失敗のどちらかにします。ただし、すべての例外が決定的なわけではありません。いま、読み込み中にFlatFileParseException
をスローする場合、そのレコードに対しては常に例外をスローします。ItemReader
のリセットは無意味です。対して、DeadlockLoserDataAccessException
など、プロセスが別のプロセスでロックするレコードの更新をしようとする場合などは、待機して再試行すれば成功します。この場合、以下のようにリトライを設定します。
@Bean public Step step1() { return this.stepBuilderFactory.get("step1") .<String, String>chunk(2) .reader(itemReader()) .writer(itemWriter()) .faultTolerant() .retryLimit(3) .retry(DeadlockLoserDataAccessException.class) .build(); }
Step
では、リトライ可能なアイテム数と、リトライ可能な例外リストを設定可能です。リトライの挙動の詳細についてはretryを参照してください。
1.1.7. Controlling Rollback
デフォルトでは、リトライかスキップかを問わず、ItemWriter
がスローする例外はStep
で制御するトランザクションをロールバックさせます。スキップを前述のように設定する場合、ItemReader
がスローする例外はロールバックになりません。ただし、ItemWriter
の例外でロールバックできないケースは色々考えられますが、これはトランザクションを無効にするアクションが無い場合があるためです。このため、以下例のように、ロールバックをしない例外リストをStep
に設定可能です。
Java Configuration
@Bean public Step step1() { return this.stepBuilderFactory.get("step1") .<String, String>chunk(2) .reader(itemReader()) .writer(itemWriter()) .faultTolerant() .noRollback(ValidationException.class) .build(); }
Transactional Readers
ItemReader
の基本的な役割は直線的で戻ることは無いです。stepはreaderの入力をバッファし、これはロールバック時にアイテムをreaderから再読み込みする必要を無くすためです。しかし、JMSキューなど、readerがトランザクショナルリソースの一番最初に置かれるケースがあるにはあります。この場合、キューがロールバックするトランザクションに関連付けられるので、キューからプル済みのメッセージは戻されます。このため、下記例のように、アイテムをバッファしないstepを設定可能です。
Java Configuration
@Bean public Step step1() { return this.stepBuilderFactory.get("step1") .<String, String>chunk(2) .reader(itemReader()) .writer(itemWriter()) .readerIsTransactionalQueue() .build(); }
1.1.8. Transaction Attributes
トランザクション属性(Transaction attributes)は、isolation
, propagation
, timeout
、の設定を制御するのに使います。詳細についてはSpring core documentationを参照してください。以下のサンプルではisolation
, propagation
, timeout
のトランザクション属性を設定しています。
Java Configuration
@Bean public Step step1() { DefaultTransactionAttribute attribute = new DefaultTransactionAttribute(); attribute.setPropagationBehavior(Propagation.REQUIRED.value()); attribute.setIsolationLevel(Isolation.DEFAULT.value()); attribute.setTimeout(30); return this.stepBuilderFactory.get("step1") .<String, String>chunk(2) .reader(itemReader()) .writer(itemWriter()) .transactionAttribute(attribute) .build(); }
1.1.9. Registering ItemStream with a Step
stepはライフサイクルの必要時点でItemStream
のコールバックを処理する必要があります。(ItemStream
インタフェースの詳細についてはItemStreamを参照)。これはstepが失敗してリスタート可能にする必要がある場合は必須で、その理由は、ItemStream
はstepで必要な実行間の永続化状態を取得する場所なためです。
ItemReader
, ItemProcessor
, ItemWriter
がItemStream
を実装すると、それらは自動登録されます。それ以外のstreamは別途登録します。このstreamは間接的な依存性、デリゲートなど、になる事が多く、readerとwriterにインジェクションされます。streamは、以下例のように、'streams'要素でStep
に登録します。
Java Configuration
@Bean public Step step1() { return this.stepBuilderFactory.get("step1") .<String, String>chunk(2) .reader(itemReader()) .writer(compositeItemWriter()) .stream(fileItemWriter1()) .stream(fileItemWriter2()) .build(); } /** * Spring Batch 4では、CompositeItemWriterはItemStreamを実装しているので以下は不要ですが * 例として作成しています。 */ @Bean public CompositeItemWriter compositeItemWriter() { List<ItemWriter> writers = new ArrayList<>(2); writers.add(fileItemWriter1()); writers.add(fileItemWriter2()); CompositeItemWriter itemWriter = new CompositeItemWriter(); itemWriter.setDelegates(writers); return itemWriter; }
上記のサンプルでは、CompositeItemWriter
はItemStream
ではありませんが、デリゲート先に処理を任せます。このため、デリゲート先のwriterはフレームワーク側で正しく認識出来るようにstreamとして明示的に登録の必要があります。ItemReader
はstreamとしての明示的な登録は必要ありませんが、これはStep
のプロパティとして直接指定しているためです。上記のstepはリスタート可能で、readerとwriterの状態は失敗イベント時に正しく永続化されます。
1.1.10. Intercepting Step Execution
Job
同様、Step
実行中には様々なイベントが発生し、そこでユーザが何らかの機能を実行したい場合があります。たとえば、フッターが必要なフラットファイルを書き出すには、フッターを書き込めるように、Step
完了時にItemWriter
へ通知の必要があります。是の実現にはStep
スコープのリスナの一つを使います。
StepListener
(これ自体はマーカーに過ぎない)の拡張の一つを実装するクラスはlisteners
要素でstepに適用します。listeners
要素は、step, tasklet, chunk、で妥当です。なお、そのリスナ関数を適用するレベルに対し宣言することを推奨します。もし単一クラスで複数リスナ(StepExecutionListener
とItemReadListener
など)を兼ねる場合、最も細かいレベルで宣言します。以下はchunkにリスナを適用する例です。
Java Configuration
@Bean public Step step1() { return this.stepBuilderFactory.get("step1") .<String, String>chunk(10) .reader(reader()) .writer(writer()) .listener(chunkListener()) .build(); }
<step>
要素もしくは*StepFactoryBean
ファクトリの一つを使用する場合で、ItemReader
, ItemWriter
, ItemProcessor
自身がStepListener
の一つを実装しているとStep
に自動登録されます。これはStep
に直接インジェクションするコンポーネントにだけ適用されます。もしリスナが別コンポーネント内にネストする場合、明示的な登録が必要です(Registering ItemStream with a Stepで解説)。
StepListener
インタフェースに加えて、同様な処理のためのアノテーションがあります。POJOにこれらアノテーションを付与するメソッドを持たせると、対応するStepListener
に変換します。ItemReader
, ItemWriter
, Tasklet
などchunkコンポネントの実装にそうしたアノテーションを付与するのが良くある使い方です。このアノテーションは、<listener/>
要素用のXMLパーサーがアナライズするように、ビルダーのlistener
でも登録をします。よって、stepにリスナーを登録するには、XML namespaceかビルダーのどちらかを使えば良い、ということです。
StepExecutionListener
StepExecutionListener
はStep
実行における最も汎用的なリスナーです。Step
開始・終了後に、完了・失敗に関わらず、通知を受けます。例は以下の通りです。
public interface StepExecutionListener extends StepListener { void beforeStep(StepExecution stepExecution); ExitStatus afterStep(StepExecution stepExecution); }
afterStep
の戻り値型ExitStatus
によってリスナーでStep
の完了コードを修正可能です。
このインタフェースに対応するアノテーションは以下です。
@BeforeStep
@AfterStep
ChunkListener
chunkはトランザクションスコープ内で処理するアイテムとして定義します。コミットインターパルで、トランザクションをコミットすると、chunkをコミットします。ChunkListener
はchunk処理開始かchunk正常処理完了後にロジックを挟むのに使います。以下がインタフェースです。
public interface ChunkListener extends StepListener { void beforeChunk(ChunkContext context); void afterChunk(ChunkContext context); void afterChunkError(ChunkContext context); }
beforeChunkメソッドはトランザクション開始後でItemReader
のread呼び出し前に呼ばれます。逆に、afterChunk
メソッドはchunkコミット後によばれます(ロールバックが起きてない場合に限る)。
このインタフェースに対応するアノテーションは以下です。
@BeforeChunk
@AfterChunk
@AfterChunkError
ChunkListener
はchunk宣言が無い場合にも適用可能です。TaskletStep
はChunkListener
を呼ぶ責任があるため、非アイテム指向のtaskletにも同様に適用します(taskletの前後に呼ばれる)。
ItemReadListener
前述のskip logicで解説したように、スキップレコードを後々追えるようにログを残すと有益な場合がある、と述べました。読込エラーの場合には、以下インタフェース定義に示す、ItemReaderListener
で実現できます。
public interface ItemReadListener<T> extends StepListener { void beforeRead(); void afterRead(T item); void onReadError(Exception ex); }
beforeRead
メソッドはItemReader
の毎回のread呼び出しの前に呼ばれます。afterRead
はreadが正常に完了して読みだしたアイテムを次に渡し終えた後に呼ばれます。読み出し中に何らかのエラーが発生した場合、onReadError
メソッドが呼ばれます。発生した例外をログ出力するなどが可能です。
このインタフェースに対応するアノテーションは以下です。
@BeforeRead
@AfterRead
@OnReadError
ItemProcessListener
ItemReadListener
同様、アイテムのprocess処理も、以下インタフェース定義に示すような形で、リッスンが可能です。
public interface ItemProcessListener<T, S> extends StepListener { void beforeProcess(T item); void afterProcess(T item, S result); void onProcessError(T item, Exception e); }
beforeProcess
メソッドはItemProcessor
のprocess
前で処理アイテムを渡す前に呼ばれます。afterProcess
メソッドはアイテム正常処理後に呼ばれます。処理中にエラーが発生した場合、onProcessError
メソッドが呼ばれます。発生した例外とそこで処理対象だったアイテムが渡されるので、それをログ出力などします。
このインタフェースに対応するアノテーションは以下です。
@BeforeProcess
@AfterProcess
@OnProcessError
ItemWriteListener
アイテム書き込みのリッスンはItemWriteListener
で行い、そのインタフェース定義は以下の通りです。
public interface ItemWriteListener<S> extends StepListener { void beforeWrite(List<? extends S> items); void afterWrite(List<? extends S> items); void onWriteError(Exception exception, List<? extends S> items); }
beforeWrite
メソッドはItemWriter
のwrite
前で書き込みアイテムリストを渡す前に呼ばれます。afterWrite
メソッドはアイテム正常書き込み後に呼ばれます。エラーが発生した場合、onWriteError
メソッドが呼ばれます。発生した例外と書き込み対象アイテムが渡されるので、それをログ出力などします。
このインタフェースに対応するアノテーションは以下です。
@BeforeWrite
@AfterWrite
@OnWriteError
SkipListener
ItemReadListener
, ItemProcessListener
, ItemWriteListener
はいずれもエラー通知の機構を備えますが、スキップしたレコードについての情報は得られません。たとえば、onWriteError
は、アイテムがリトライして成功したとしても呼ばれます(is called even if an item is retried and successful.)。このため、スキップしたアイテムをトラッキングするために別のインターフェスが存在し、以下のようなインタフェース定義になります。
public interface SkipListener<T,S> extends StepListener { void onSkipInRead(Throwable t); void onSkipInProcess(T item, Throwable t); void onSkipInWrite(S item, Throwable t); }
onSkipInRead
は読み込み中にスキップしたアイテムがあれば呼ばれます。ロールバックすると、複数回スキップにより同一アイテムを登録する場合があることに注意してください(It should be noted that rollbacks may cause the same item to be registered as skipped more than once.)。onSkipInWrite
は書き込み中にアイテムスキップした場合に呼ばれます。アイテムは正常に読み込まれている(かつ未スキップ)ので、引数にそのアイテムが渡されます。
このインタフェースに対応するアノテーションは以下です。
@OnSkipInRead
@OnSkipInWrite
@OnSkipInProcess
SkipListeners and Transactions
SkipListener
の最もよくある使い方はスキップアイテムのログ出力で、このログは別のバッチ処理かあるいは人間がスキップとなった原因を修正したり調査するのに使います。大本のトランザクションがロールバックとなるケースは多数考えられ、Spring Batchは以下2点を保証します。
- 適切なスキップメソッド(発生するエラーのタイミングに依存)がアイテムごとに1度だけ呼ばれる。
SkipListener
はトランザクションコミット前に必ず常に呼ばれる。これにより、リスナーで呼ぶなんらかのトランザクショナルリソースがItemWriter
での失敗によってロールバックしないようにしています。
1.2. TaskletStep
chunk指向処理だけがStep
の唯一の処理方法ではありません。Step
がシンプルにストアド呼び出しするだけの場合はどうでしょうか。ItemReader
で呼び出してストアド実行後にnullを返すような作りには出来ます。しかし、これは少々不自然で、何もしないItemWriter
を作らなければなりません。Spring BatchではこうしたケースではTaskletStep
を使います。
Tasklet
はexecute
という1つのメソッドを持つシンプルなインタフェースで、TaskletStep
が反復呼び出しを行い、RepeatStatus.FINISHED
を返すか失敗を示す例外をスローするまで実行します。Tasklet
の毎回の呼び出しはトランザクションでラップします。Tasklet
の実装者は、ストアド・スクリプト・シンプルなSQL更新、などを行います。
TaskletStep
を作成するには、ビルダーのtasklet
メソッドにTasklet
を実装したbeanを渡します。TaskletStep
をビルダーで作る際にはchunk
を呼ぶ必要はありません。以下はシンプルなtaskletの例です。
@Bean public Step step1() { return this.stepBuilderFactory.get("step1") .tasklet(myTasklet()) .build(); }
※ TaskletStep
はもしtaskletがStepListener
を実装する場合はStepListener
としても自動的に登録します。
1.2.1. TaskletAdapter
ItemReader
とItemWriter
のアダプタ同様、Tasklet
にもTaskletAdapter
というSpring Batchが用意するアダプタクラスがあります。これが有用な一例としてはレコードセットのフラグ更新に既存のDAOを使う場合です。TaskletAdapter
により、以下例のように、Tasklet
のアダプターを記述することなくDAOのメソッドを呼び出せます。
Java Configuration
@Bean public MethodInvokingTaskletAdapter myTasklet() { MethodInvokingTaskletAdapter adapter = new MethodInvokingTaskletAdapter(); adapter.setTargetObject(fooDao()); adapter.setTargetMethod("updateFoo"); return adapter; }
1.2.2. Example Tasklet Implementation
たいていのバッチjobaは、各種リソースのセットアップためメイン処理開始前に実行すべきstepや、そうしたリソースをクリーンアップする処理の終了後に実行すべきstepがあります。巨大なファイルを扱うjobの場合、別の場所へ正常にアップロード完了後にローカルの対象ファイルを削除する事がほとんどです。以下の例(Spring Batch samples projectの抜粋)はまさにそのような事をするTasklet
実装の例です。
public class FileDeletingTasklet implements Tasklet, InitializingBean { private Resource directory; public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { File dir = directory.getFile(); Assert.state(dir.isDirectory()); File[] files = dir.listFiles(); for (int i = 0; i < files.length; i++) { boolean deleted = files[i].delete(); if (!deleted) { throw new UnexpectedJobExecutionException("Could not delete file " + files[i].getPath()); } } return RepeatStatus.FINISHED; } public void setDirectoryResource(Resource directory) { this.directory = directory; } public void afterPropertiesSet() throws Exception { Assert.notNull(directory, "directory must be set"); } }
上のTasklet
実装は所定ディレクトリ内の全ファイルを削除します。execute
メソッドは一度だけ呼ばれることに注意してください。あとはStep
からこのTasklet
を参照します。
Java Configuration
@Bean public Job taskletJob() { return this.jobBuilderFactory.get("taskletJob") .start(deleteFilesInDir()) .build(); } @Bean public Step deleteFilesInDir() { return this.stepBuilderFactory.get("deleteFilesInDir") .tasklet(fileDeletingTasklet()) .build(); } @Bean public FileDeletingTasklet fileDeletingTasklet() { FileDeletingTasklet tasklet = new FileDeletingTasklet(); tasklet.setDirectoryResource(new FileSystemResource("target/test-outputs/test-dir")); return tasklet; }
1.3. Controlling Step Flow
jobで複数stepをグループ化する機能では、あるstepから別のstepへのjob "flows" を制御する方法が必要となります。Step
の失敗が必ずしもJob
の失敗を意味しません。また、Step
が次を実行すべきか決定するための'success'が複数種類存在する場合もあります。Steps
のグループ設定次第では、ある特定のstepが全く実行されない場合がありえます。
1.3.1. Sequential Flow
もっともシンプルなflowのケースは、以下イメージのように、すべてのstepをシーケンシャルに実行するjobです。
Figure 3. Sequential Flow
こういう設定をするにはstep要素の'next'属性を以下の例のように使います。
Java Configuration
@Bean public Job job() { return this.jobBuilderFactory.get("job") .start(stepA()) .next(stepB()) .next(stepC()) .build(); }
上のケースでは、'step A'がStep
リストの最初にあるので最初に実行します。'step A'が正常終了すると、'step B'を実行し、その後は同様です。ただし、'step A'が失敗する場合、Job
全体が失敗して、'step B'は実行しません。
1.3.2. Conditional Flow
上記例では、2パターンのみ存在します。
Step
が成功して次のStep
を実行する。Step
が失敗するとJob
が失敗する。
これで十分なケースも多いです。しかし、失敗にするのではなく、Step
の失敗が別のStep
をトリガーするケースはどうでしょうか。以下がそうしたflowのイメージです。
Figure 4. Conditional Flow
複雑なケースを扱うために、Spring Batch namespaceではstep要素内に定義する遷移要素を用意しています。そうした遷移要素の一つがnext
要素です。next
属性同様、next
要素は次に実行するStep
をJob
に指示します。ただし、属性とは異なり、Step
には任意数のnext
要素を指定可能で、その場合失敗時のデフォルトの振る舞いは存在しません。つまり、遷移要素を使う場合、Step
遷移に対するすべての振る舞いを明示的に定義する必要があります。なお、単一stepはnext
属性とtransition
要素両方を指定できません。
next
要素にはマッチするパターンと次に実行するstepを、以下サンプルのように、指定します。
Java Configuration
@Bean public Job job() { return this.jobBuilderFactory.get("job") .start(stepA()) .on("*").to(stepB()) .from(stepA()).on("FAILED").to(stepC()) .end() .build(); }
java設定を使う場合はon
メソッドにシンプルなパターンマッチングを使い、Step
実行の結果であるExitStatus
にマッチさせるものを指定します。
パターンには2つの特殊文字を使用可能です。
- "*"は0以上の文字列にマッチ。
- "?"は1文字にマッチ。
例えば、"c*t"は"cat"と"count"にマッチし、"c?t"は"cat"にはマッチするが"count"にはしない。
Step
に配置する遷移要素に上限はありませんが、Step
実行が返すExitStatus
に対応する要素が無い場合、フレームワークは例外をスローしてJob
は失敗します。フレームワークは遷移順序を最もマッチするものからしないもの順で決定します。つまり、上記例で順序を"stepA"に入れ替えたとしても、"FAILED"のExitStatus
は"stepC"に遷移します(This means that, even if the ordering were swapped for "stepA" in the example above, an ExitStatus of "FAILED" would still go to "stepC".)。
Batch Status Versus Exit Status
条件付きflowでJob
を設定する場合、BatchStatus
とExitStatus
の違いを意識する事は重要です。BatchStatus
はenumで、JobExecution
とStepExecution
双方のプロパティであり、フレームワークがJob
やStep
のステータスを記録するために使います。以下の値のいずれか1つになります。COMPLETED
, STARTING
, STARTED
, STOPPING
, STOPPED
, FAILED
, ABANDONED
, UNKNOWN
。これらの多くは自己説明的です。COMPLETED
はstepやjobが正常終了した場合のステータスで、FAILED
は失敗時、などです。
以下の例はJava設定で'on'要素を持つものです。
... .from(stepA()).on("FAILED").to(stepB()) ...
一目見た感じでは、'on'がStep
のBatchStatus
を参照するように見えます。しかし、実際にはStep
のExitStatus
を参照します。名前が示すように、ExitStatus
はそのStep
の終了後のステータスを表します。
英語で書くなら、"exit codeがFAILEDであればstepBに遷移する"( "go to stepB if the exit code is FAILED
")、です。デフォルトではexit codeはStep
のBatchStatus
と常に同じになり、これが上記サンプルがなぜ動作するのかの理由です。ただし、exit codeを別の値にしたい場合は? 好例がsamples projectのskip sample jobにあります。
Java Configuration
@Bean public Job job() { return this.jobBuilderFactory.get("job") .start(step1()).on("FAILED").end() .from(step1()).on("COMPLETED WITH SKIPS").to(errorPrint1()) .from(step1()).on("*").to(step2()) .end() .build(); }
step1
には3パターンがありえます。
Step
が失敗、jobも失敗。Step
が正常終了。Step
が正常終了するがexit codeはCOMPLETED WITH SKIPS'。この場合、エラー処理に別のstepを実行する。
上記の設定は動作します。ただし、以下例のように、レコードスキップ条件でexit codeを変更するように修正します。
public class SkipCheckingListener extends StepExecutionListenerSupport { public ExitStatus afterStep(StepExecution stepExecution) { String exitCode = stepExecution.getExitStatus().getExitCode(); if (!exitCode.equals(ExitStatus.FAILED.getExitCode()) && stepExecution.getSkipCount() > 0) { return new ExitStatus("COMPLETED WITH SKIPS"); } else { return null; } } }
上のStepExecutionListener
はまず、Step
が正常終了かつStepExecution
のskip countが0より大きい、かどうかをチェックします。両条件を満たす場合、新規ExitStatus
をexit code COMPLETED WITH SKIPS
で返します。
1.3.3. Configuring for Stop
BatchStatus and ExitStatusの解説を読んだ後、Job
でBatchStatus
とExitStatus
の決定ルールを知りたいと感じたかもしれません。Step
のステータスは実行するコードで決定し、Job
は設定で決定します。
これまでに解説したjob設定はすべて少なくとも一つの遷移無しの最終Step
を持っています。たとえば、以下例のように、step実行後、Job
は終了します。
@Bean public Job job() { return this.jobBuilderFactory.get("job") .start(step1()) .build(); }
Step
に遷移先が無い場合、Job
のステータスは以下のように決定します。
- その最終
Step
がExitStatus
FAILEDで終了する場合、Job
のBatchStatus
とExitStatus
は両方ともFAILED
になる。 - それ以外の場合、
Job
のBatchStatus
とExitStatus
は両方ともCOMPLETED
になる。
ある種のバッチジョブ、シンプルなシーケンシャルstepのjobなどでは、上記の終了ルールで十分ですが、カスタム定義のjob停止が必要な場合もあります。そうした用途のために、Spring BatchはJob
を停止するための遷移要素を3つ用意しています(前述のnext要素に加えて)。それらの停止要素はJob
を特定のBatchStatus
で停止します。なお、その停止遷移要素はJob
のいずれのSteps
のBatchStatus
あるいはExitStatus
のどちらにも影響を与えません。これらの要素はJob
の最終ステータスにだけ影響を与えます。たとえば、jobのすべてのstepがFAILED
でありながら、jobはCOMPLETED
に出来ます。
Ending at a Step
stepを終了するとBatchStatus
がCOMPLETED
でjob
を停止します。ステータスCOMPLETED
で終了するJob
はリスタート出来ません(フレームワークがJobInstanceAlreadyCompleteException
をスローする)。
Java設定を使う場合、そのタスクには'end'メソッドを使います。また、end
メソッドはJob
のExitStatus
をカスタマイズするための'exitStatus'パラメータを取ることも可能です。'exitStatus'を指定しないはデフォルトでExitStatus
がCOMPLETED
となり、BatchStatus
もそうなります。
以下のケースでは、step2
が失敗するとJob
はBatchStatus
COMPLETED
およびExitStatus
COMPLETED
で停止してstep3
は実行しません。そうでない場合、step3
に遷移します。なお、step2
が失敗する場合はJob
はリスタート出来ません(ステータスがCOMPLETED
なので)。
@Bean public Job job() { return this.jobBuilderFactory.get("job") .start(step1()) .next(step2()) .on("FAILED").end() .from(step2()).on("*").to(step3()) .end() .build(); }
Failing a Step
所定ポイントでstepを失敗するよう設定するとJob
はBatchStatus
FAILED
で停止します。endと異なり、Job
失敗でリスタート不能にはなりません。
以下のケースでは、step2
が失敗するとJob
はBatchStatus
FAILED
ExitStatus
EARLY TERMINATION
で停止してstep3
は実行しません。そうでない場合、step3
に遷移します。また、step2
が失敗してJob
をリスタートすると、step2
から再開します。
Java Configuration
@Bean public Job job() { return this.jobBuilderFactory.get("job") .start(step1()) .next(step2()).on("FAILED").fail() .from(step2()).on("*").to(step3()) .end() .build(); }
Stopping a Job at a Given Step
特定のstepでjobを停止する設定をすると、Job
はBatchStatus
STOPPED
で停止します。Job
停止により処理は一時停止するため、オペレーターはJob
の再開前になんらかの作業が出来ます。
java設定の場合、stopAndRestart
メソッドは'restart'属性を必要とし、この属性にはJobリスタート時にピックアップするstepを指定します。
以下のケースでは、step1
がCOMPLETE
で終了するとjobは停止します。リスタートするとstep2
から再開します。
@Bean public Job job() { return this.jobBuilderFactory.get("job") .start(step1()).on("COMPLETED").stopAndRestart(step2()) .end() .build(); }
1.3.4. Programmatic Flow Decisions
あるケースでは、ExitStatus
以外の情報も使用して次に実行するstepを決定したい場合があります。この場合、以下サンプルのように、決定を行うJobExecutionDecider
を使用します。
public class MyDecider implements JobExecutionDecider { public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) { String status; if (someCondition()) { status = "FAILED"; } else { status = "COMPLETED"; } return new FlowExecutionStatus(status); } }
次の例では、Java設定でJobExecutionDecider
を実装するbeanをnext
に直接渡しています。
Java Configuration
@Bean public Job job() { return this.jobBuilderFactory.get("job") .start(step1()) .next(decider()).on("FAILED").to(step2()) .from(decider()).on("COMPLETED").to(step3()) .end() .build(); }
1.3.5. Split Flows
これまでに解説したケースのJob
はいずれも線形に一度に1つのstepを実行します。この一般的なスタイルに加えて、Spring Batchはjobをparallel flowsに設定可能です。
Java設定ではビルダーを用いて設定をスプリットできます。以下に示す例は、'split'要素は複数の'split'要素を持ち、そこで個々のflowを定義します。また、'split'要素には、これまでに解説した遷移要素、'next'属性や'next'要素・'end'や'fail'要素、を入れられます。
@Bean public Job job() { Flow flow1 = new FlowBuilder<SimpleFlow>("flow1") .start(step1()) .next(step2()) .build(); Flow flow2 = new FlowBuilder<SimpleFlow>("flow2") .start(step3()) .build(); return this.jobBuilderFactory.get("job") .start(flow1) .split(new SimpleAsyncTaskExecutor()) .add(flow2) .next(step4()) .end() .build(); }
1.3.6. Externalizing Flow Definitions and Dependencies Between Jobs
jobのflowの一部を別のbeanとして切り出すことで、再使用できます。これには2通りの方法があります。1つは単に、以下に示すように、flowをどこか別の場所で定義したbeanとして宣言します。
Java Configuration
@Bean public Job job() { return this.jobBuilderFactory.get("job") .start(flow1()) .next(step3()) .end() .build(); } @Bean public Flow flow1() { return new FlowBuilder<SimpleFlow>("flow1") .start(step1()) .next(step2()) .build(); }
上の例のように外部flowを定義すると、インラインで定義しているかのように外部flowをjobにstepを追加できることにになります。この方法により、多数のjobが同じテンプレートflowを参照可能となり、そのテンプレートを別のflowに組み込むことも出来ます。また、個々のflowのインテグレーションテストを分離するのにも役立ちます。
flow外部化の別の方法はJobStep
の使用です。JobStep
はFlowStep
と似てますが、flow内のstepを別のjob実行として実際に作成・実行します。
以下はJobStep
の例です。
Java Configuration
@Bean public Job jobStepJob() { return this.jobBuilderFactory.get("jobStepJob") .start(jobStepJobStep1(null)) .build(); } @Bean public Step jobStepJobStep1(JobLauncher jobLauncher) { return this.stepBuilderFactory.get("jobStepJobStep1") .job(job()) .launcher(jobLauncher) .parametersExtractor(jobParametersExtractor()) .build(); } @Bean public Job job() { return this.jobBuilderFactory.get("job") .start(step1()) .build(); } @Bean public DefaultJobParametersExtractor jobParametersExtractor() { DefaultJobParametersExtractor extractor = new DefaultJobParametersExtractor(); extractor.setKeys(new String[]{"input.file"}); return extractor; }
job parameters extractorはStep
のExecutionContext
を実行Job
のJobParameters
に変換する方法を決定するものです。JobStep
はjobとstepのモニタリングとレポーティングに細かいオプションをつけたい場合に有用です。また、JobStep
は以下の質問に対する回答でもあります。"job間の依存関係をどのように作るのか?" これは大規模システムを小さいモジュールとjobのflow制御に分割する優れた方法です。
1.4. Late Binding of Job and Step Attributes
これまでに解説したXMLとフラットファイルサンプルはいずれもファイル取得にSpringのResource
を使用します。Resource
のgetFile
がjava.io.File
を返すので機能します。XMLとフラットファイルのリソースは以下サンプルのようにSpringの機能で設定できます。
Java Configuration
@Bean public FlatFileItemReader flatFileItemReader() { FlatFileItemReader<Foo> reader = new FlatFileItemReaderBuilder<Foo>() .name("flatFileItemReader") .resource(new FileSystemResource("file://outputs/file.txt")) ... }
上のResource
は指定のファイルシステムロケーションからファイルをロードします。注意点として、抽象ロケーション(absolute locations)はスラッシュ2個(//
)で開始します。たいていのSpringアプリケーションで、このやり方で十分であり、これらリソース名はコンパイル時に決定しているためです。ただし、バッチのケースでは、ファイル名はjobのパラメータで動的に決定する場合もあります。この場合にはシステムプロパティを読み込み'-D'パラメータを使います。
以下はプロパティからファイル名を読み込み方法です。
Java Configuration
@Bean public FlatFileItemReader flatFileItemReader(@Value("${input.file.name}") String name) { return new FlatFileItemReaderBuilder<Foo>() .name("flatFileItemReader") .resource(new FileSystemResource(name)) ... }
この方法を動かすにはシステム引数(-Dinput.file.name="file://outputs/file.txt"
など)を指定します。
※ ここでは``PropertyPlaceholderConfigurerが使用可能ですが、システムプロパティが常にセットされる場合は不要です。これはSpringの
ResourceEditor```がシステムプロパティのフィルタリングとプレースホルダー置換を行うためです。
また、バッチ設定において、システムプロパティではなくjobのJobParameters
でファイル名をパラメータ化してアクセスしたい場合があります。これをするには以下のように、Spring Batchでは各種Job
とStep
で遅延バインディングが可能です。
Java Configuration
@StepScope @Bean public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters['input.file.name']}") String name) { return new FlatFileItemReaderBuilder<Foo>() .name("flatFileItemReader") .resource(new FileSystemResource(name)) ... }
以下例のようにJobExecution
とStepExecution
のExecutionContext
は同様な方法でアクセス可能です。
Java Configuration
@StepScope @Bean public FlatFileItemReader flatFileItemReader(@Value("#{jobExecutionContext['input.file.name']}") String name) { return new FlatFileItemReaderBuilder<Foo>() .name("flatFileItemReader") .resource(new FileSystemResource(name)) ... }
Java Configuration
@StepScope @Bean public FlatFileItemReader flatFileItemReader(@Value("#{stepExecutionContext['input.file.name']}") String name) { return new FlatFileItemReaderBuilder<Foo>() .name("flatFileItemReader") .resource(new FileSystemResource(name)) ... }
※ 遅延バインディングを使うbeanはscope="step"を宣言する必要があります。詳細はStep Scopeを参照。
1.4.1. Step Scope
上で取り上げた遅延バインディングの例は、以下サンプルのように、bean定義にすべて"step"スコープを宣言しています。
Java Configuration
@StepScope @Bean public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters[input.file.name]}") String name) { return new FlatFileItemReaderBuilder<Foo>() .name("flatFileItemReader") .resource(new FileSystemResource(name)) ... }
遅延バインディングを使うにはStep
スコープが必要で、これは属性を参照するには、Step
開始までbeanをインスタンス化出来ないためです。このスコープはSpringコンテナのデフォルトには含まれないので、scopeの明示的な追加が必要です。追加するには、batch
namespace・StepScope
のbean定義を明示的に追加・@EnableBatchProcessing
の使用、のどれか1つを使用します。以下はbatch
namespaceの使用例です。
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:batch="http://www.springframework.org/schema/batch" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="..."> <batch:job .../> ... </beans>
以下の例は明示的なbean定義の追加です。
<bean class="org.springframework.batch.core.scope.StepScope" />
1.4.2. Job Scope
Job
スコープは、Spirng Batch 3.0で導入され、Step
スコープと同様のものですがJob
コンテキストのスコープで、ジョブ実行時にそのbeanのインスタンスが1つだけになります。また、参照の遅延バインディングが可能で、JobContext
を#{..}
でアクセスできます。この機能により、以下例に示すように、jobやjob execution contextおよびジョブパラメータ、からbeanプロパティを取得できます。
Java Configuration
@JobScope @Bean public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters[input]}") String name) { return new FlatFileItemReaderBuilder<Foo>() .name("flatFileItemReader") .resource(new FileSystemResource(name)) ... }
Java Configuration
@JobScope @Bean public FlatFileItemReader flatFileItemReader(@Value("#{jobExecutionContext['input.name']}") String name) { return new FlatFileItemReaderBuilder<Foo>() .name("flatFileItemReader") .resource(new FileSystemResource(name)) ... }
Springコンテナにはデフォルトで含まれないスコープなので、明示的な追加が必要です。batch
namespace・JobScope
のbean定義を明示的に追加・@EnableBatchProcessing
の使用、のどれか1つを使用します。以下はbatch
namespaceの使用例です。
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:batch="http://www.springframework.org/schema/batch" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="..."> <batch:job .../> ... </beans>
以下の例はJobScope
のbeanを明示的に追加しています。
<bean class="org.springframework.batch.core.scope.JobScope" />