https://docs.spring.io/spring-batch/4.1.x/reference/html/common-patterns.html#commonPatterns
https://qiita.com/kagamihoge/items/12fbbc2eac5b8a5ac1e0 俺の訳一覧リスト
1. Common Batch Patterns
ある種のジョブはSpring Batch標準コンポーネントのみの組み合わせで構築できます。ItemReader
とItemWriter
の実装は様々なケースに適用可能です。しかし、基本的には、カスタムコードの実装が必要です。アプリケーション開発者に対するSpring BatchのエントリーポイントはTasklet
, ItemReader
, ItemWriter
と各種リスナーです。シンプルなバッチジョブではSpring BatchのItemReader
を使用出来ますが、ItemWriter
やItemProcessor
にカスタムの処理な書き込みが必要となる場合があります。
このチャプターでは、カスタムビジネスロジックにおける共通パターンの例について解説します。これらは主としてリスナーを活用します。なお、ItemReader
やItemWriter
実装はリスナーも実装可能です。
1.1. Logging Item Processing and Failures
1アイテムごとにstepでエラーハンドリングを行い、特別なチャネルにロギングしたりDBにレコードを追加したり、をするための共通パターンです。chunk指向Step
(stepファクトリビーンで生成)では単純に、read
エラーはItemReadListener
、write
エラーはItemWriteListener
の実装により実現します。以下コードはreadとwriteエラーログ出力するリスナの例です。
public class ItemFailureLoggerListener extends ItemListenerSupport { private static Log logger = LogFactory.getLog("item.error"); public void onReadError(Exception ex) { logger.error("Encountered error on read", e); } public void onWriteError(Exception ex, List<? extends Object> items) { logger.error("Encountered error on write", ex); } }
リスナ実装したら以下例のようにstepに登録します。
Java Configuration
@Bean public Step simpleStep() { return this.stepBuilderFactory.get("simpleStep") ... .listener(new ItemFailureLoggerListener()) .build(); }
※ リスナーのonError()
での処理は、後にロールバックされるトランザクション内での処理になります。DBなどトランザクショナルなリソースをonError()
で使う場合、リスナーメソッドに宣言的トランザクション(詳細はSpring Core Reference Guide参照)を追加し、propagation attributeをREQUIRES_NEW
にしてください。
1.2. Stopping a Job Manually for Business Reasons
Spring BatchにはJobLauncher
にstop()
がありますが、これはアプリケーションプログラマというよりオペレータが使うものです。ただ、ビジネスロジック内でのジョブ実行停止したい場合がありえます。
一番シンプルな方法はRuntimeException
のスローです(無期限リトライやスキップしない例外)。例えば以下ではカスタム例外を使用しています。
public class PoisonPillItemProcessor<T> implements ItemProcessor<T, T> { @Override public T process(T item) throws Exception { if (isPoisonPill(item)) { throw new PoisonPillException("Poison pill detected: " + item); } return item; } }
別の方法としてstepを止めるには、以下例のように、ItemReader
でnull
を返します。
public class EarlyCompletionItemReader implements ItemReader<T> { private ItemReader<T> delegate; public void setDelegate(ItemReader<T> delegate) { ... } public T read() throws Exception { T item = delegate.read(); if (isEndItem(item)) { return null; // ここでstep終了 } return item; } }
上の例はCompletionPolicy
のデフォルト実装のnull
がバッチ完了を意味する挙動を利用しています。より複雑な完了ポリシーを実装してStep
に設定するにはSimpleStepFactoryBean
を使用します。
Java Configuration
@Bean public Step simpleStep() { return this.stepBuilderFactory.get("simpleStep") .<String, String>chunk(new SpecialCompletionPolicy()) .reader(reader()) .writer(writer()) .build(); }
また別の方にStepExecution
にフラグを設定し、アイテム処理中にフレームワーク内でStep
実装がこれをチェックします。これを実装するには、StepExecution
にアクセスするためにStepListener
を実装してStep
に登録します。以下はフラグを設定するリスナーの例です。
public class CustomItemWriter extends ItemListenerSupport implements StepListener { private StepExecution stepExecution; public void beforeStep(StepExecution stepExecution) { this.stepExecution = stepExecution; } public void afterRead(Object item) { if (isPoisonPill(item)) { stepExecution.setTerminateOnly(true); } } }
フラグをセットする場合、デフォルトの振る舞いはstepでJobInterruptedException
をスローします。この振る舞いはStepInterruptionPolicy
で制御できます。なお、例外スローかしないかの選択肢しか無いので、ジョブは常にabnormal endingになります。
1.3. Adding a Footer Record
フラットファイルに書き込む場合、すべての処理完了後にファイル末尾にフッター行を追加したい場合があります。これはSpring BatchのFlatFileFooterCallback
により実装できます。FlatFileFooterCallback
(と対になるFlatFileHeaderCallback
)はFlatFileItemWriter
のオプションプロパティで以下のようにwriterに追加します。
Java Configuration
@Bean public FlatFileItemWriter<String> itemWriter(Resource outputResource) { return new FlatFileItemWriterBuilder<String>() .name("itemWriter") .resource(outputResource) .lineAggregator(lineAggregator()) .headerCallback(headerCallback()) .footerCallback(footerCallback()) .build(); }
フッターのコールバックには1つだけメソッドがあり、フッター書き込み時に呼び出されます。
public interface FlatFileFooterCallback { void writeFooter(Writer writer) throws IOException; }
1.3.1. Writing a Summary Footer
フッター行に対するよくある要望に、出力処理中の情報を集約してファイル末尾にそれを追加する、があります。このフッターはファイルの要約やチェックサムに使われます。
例えば、バッチジョブがフラットファイルにTrade
行を書き込むとして、全Trades
の総合計をフッターに配置したい場合、ItemWriter
実装を以下のようにします。
public class TradeItemWriter implements ItemWriter<Trade>, FlatFileFooterCallback { private ItemWriter<Trade> delegate; private BigDecimal totalAmount = BigDecimal.ZERO; public void write(List<? extends Trade> items) throws Exception { BigDecimal chunkTotal = BigDecimal.ZERO; for (Trade trade : items) { chunkTotal = chunkTotal.add(trade.getAmount()); } delegate.write(items); // アイテム正常書き込み後に総合計を加算 totalAmount = totalAmount.add(chunkTotal); } public void writeFooter(Writer writer) throws IOException { writer.write("Total Amount Processed: " + totalAmount); } public void setDelegate(ItemWriter delegate) {...} }
TradeItemWriter
のtotalAmount
はTrade
アイテム書き込み後に加算します。すべてのTrade
処理後、フレームワークはwriteFooter
を呼び、ファイルにtotalAmount
を挿入します。なお、write
には一時変数chunkTotal
があり、これにchunkのTrade
の合計を格納します。これは、write
でskipが発生した場合はtotalAmount
を変更しないためです。write
メソッドを最後まで実行し、例外がスローされなかったことを確認できたら、totalAmount
を更新します。
writeFooter
の使用には、TradeItemWriter
(FlatFileFooterCallback
の実装クラス)をfooterCallback
としてFlatFileItemWriter
にワイヤリングします。以下はその方法の例です。
Java Configuration
@Bean public TradeItemWriter tradeItemWriter() { TradeItemWriter itemWriter = new TradeItemWriter(); itemWriter.setDelegate(flatFileItemWriter(null)); return itemWriter; } @Bean public FlatFileItemWriter<String> flatFileItemWriter(Resource outputResource) { return new FlatFileItemWriterBuilder<String>() .name("itemWriter") .resource(outputResource) .lineAggregator(lineAggregator()) .footerCallback(tradeItemWriter()) .build(); }
このTradeItemWriter
ような作り方はStep
が非リスタート可能の場合のみ正しく動作します。これは、このクラスはstateful(totalAmount
がある)だが、そのtotalAmount
をDBに永続化していないためです。よって、リスタート時にtotalAmount
を取得できません。このクラスをリスタート可能にするには、ItemStream
は以下例のようにopen
とupdate
を実装します。
public void open(ExecutionContext executionContext) { if (executionContext.containsKey("total.amount") { totalAmount = (BigDecimal) executionContext.get("total.amount"); } } public void update(ExecutionContext executionContext) { executionContext.put("total.amount", totalAmount); }
updateメソッドはExecutionContext
にtotalAmount
の現在値を保存します。openメソッドはExecutionContext
からtotalAmount
を取得し処理開始時の値として使用し、Step
の前回終了時点からリスタートします。
1.4. Driving Query Based ItemReaders
chapter on readers and writersで、ページングを使用するDB入力について解説しました。DB2など、たいていのDBで、他のオンラインアプリケーションなどで必要なテーブルを読み込む場合、過度の悲観的ロックは問題を起こす場合があります。加えて、過度に大きなデータセットに対するカーソルオープンが特定DBで問題を起こす場合があります。よって、データ読み取りに'Driving Query'を取るプロジェクトがあります。この方針は、以下図のように、返す必要のあるオブジェクト全体ではなく、キーをイテレートする動作をします。
Figure 1. Driving Query Job
上記例は、カーソルベースのサンプルで使用したのと同じ'FOO'テーブルです。ただし、行全体を選択するのではなく、ここのSQLステートメントではIDのみ選択しています。よって、read
はFOO
オブジェクトではなく、Integer
を返します。このIDの数値を使用し、後でFoo
オブジェクトという"詳細"を取得します。
Figure 2. Driving Query Example
ItemProcessor
はdriving queryから得たキーを基に'Foo'オブジェクトへと変換します。DAOを使用してキーを基にオブジェクト全体をクエリします。
1.5. Multi-Line Records
基本的にはフラットファイルは各レコードを1行になりますが、レコードを複数のフォーマットで複数行に展開することもあります。以下はそうした展開例の抜粋です。
HEA;0013100345;2007-02-15 NCU;Smith;Peter;;T;20014539;F BAD;;Oak Street 31/A;;Small Town;00235;IL;US FOT;2;2;267.34
1行の開始は'HEA'で始まり、1行の最終行は'FOT'で始まります。これを正しく扱うには以下を考慮する必要があります。
- 1度に1レコード読み込むのでなく、
ItemReader
は複数行レコードのグループとして読み込み、そのグループをItemWriter
に渡します。 - 各行の種類ごとにトークン処理を行う。
単一レコードを複数行に展開し、かつ、全体で何行来るかは事前に不明なため、ItemReader
はレコード全体を注意深く読みこむ必要があります。これを行うには、FlatFileItemReader
のラッパーとしてItemReader
を実装します。
Java Configuration
@Bean public MultiLineTradeItemReader itemReader() { MultiLineTradeItemReader itemReader = new MultiLineTradeItemReader(); itemReader.setDelegate(flatFileItemReader()); return itemReader; } @Bean public FlatFileItemReader flatFileItemReader() { FlatFileItemReader<Trade> reader = new FlatFileItemReaderBuilder<Trade>() .name("flatFileItemReader") .resource(new ClassPathResource("data/iosample/input/multiLine.txt")) .lineTokenizer(orderFileTokenizer()) .fieldSetMapper(orderFieldSetMapper()) .build(); return reader; }
各行に応じた適切なトークン分割をするため、特に重要なのは固定幅入力なので、デリゲート先のFlatFileItemReader
に対してPatternMatchingCompositeLineTokenizer
を使います。FlatFileItemReader in the Readers and Writers chapterに詳細があります。それから、デリゲート先のreaderは各行をラップ元のItemReader
にFieldSet
で返すためにPassThroughFieldSetMapper
を使います。
Java Content
@Bean public PatternMatchingCompositeLineTokenizer orderFileTokenizer() { PatternMatchingCompositeLineTokenizer tokenizer = new PatternMatchingCompositeLineTokenizer(); Map<String, LineTokenizer> tokenizers = new HashMap<>(4); tokenizers.put("HEA*", headerRecordTokenizer()); tokenizers.put("FOT*", footerRecordTokenizer()); tokenizers.put("NCU*", customerLineTokenizer()); tokenizers.put("BAD*", billingAddressLineTokenizer()); tokenizer.setTokenizers(tokenizers); return tokenizer; }
ラッパーはレコード終端を解釈する必要があります。よって、レコード終端に達するまで、デリゲート先のread
を繰り返し呼びます。各行を読み込むためには、ラッパーで返すアイテムを組み立てる必要があります。フッターに達すると、アイテムはItemProcessor
やItemWriter
に渡せるようになります。
private FlatFileItemReader<FieldSet> delegate; public Trade read() throws Exception { Trade t = null; for (FieldSet line = null; (line = this.delegate.read()) != null;) { String prefix = line.readString(0); if (prefix.equals("HEA")) { t = new Trade(); // Record must start with header } else if (prefix.equals("NCU")) { Assert.notNull(t, "No header was found."); t.setLast(line.readString(1)); t.setFirst(line.readString(2)); ... } else if (prefix.equals("BAD")) { Assert.notNull(t, "No header was found."); t.setCity(line.readString(4)); t.setState(line.readString(6)); ... } else if (prefix.equals("FOT")) { return t; // Record must end with footer } } Assert.isNull(t, "No 'END' was found."); return null; }
1.6. Executing System Commands
バッチジョブ内から外部のコマンドを呼ぶ必要があるケースは多いです。そうした処理はスケジューラで別途に開始出来ますが、実行時のメタデータの利点が失われます。また、マルチステップジョブを複数ジョブに分割する必要も発生します。
よくあるケースなので、Spring Batchは以下のようにシステムコマンドを呼ぶTasklet
を用意しています。
Java Configuration
@Bean public SystemCommandTasklet tasklet() { SystemCommandTasklet tasklet = new SystemCommandTasklet(); tasklet.setCommand("echo hello"); tasklet.setTimeout(5000); return tasklet; }
1.7. Handling Step Completion When No Input is Found
DBからの取得結果やファイルが0件なケースは良くあります。Step
では単純に何も処理が無いので0アイテムを読み込んで完了します。Spring Batch標準クラスのItemReader
はすべてデフォルトではこの方針になっています。入力があるのに何も書き込まないのは、混乱を招く場合があります(ファイル名がおかしくなったり、他同様な問題の発生)。For this reason, the metadata itself should be inspected to determine how much work the framework found to be processed. しかし、入力0件が例外的の場合にはどうすれば良いでしょうか。この場合、0件処理のメタデータをプログラム的にチェックして失敗とするのがベストです。これは一般的なユースケースなため、Spring Batchはその機能をリスナーNoWorkFoundStepExecutionListener
で提供しています。定義は以下の通りです。
public class NoWorkFoundStepExecutionListener extends StepExecutionListenerSupport { public ExitStatus afterStep(StepExecution stepExecution) { if (stepExecution.getReadCount() == 0) { return ExitStatus.FAILED; } return null; } }
上のStepExecutionListener
は'afterStep'でStepExecution
のreadCount
プロパティで読み込み0件かどうかをチェックします。0件の場合、FAILEDのexit codeでリターンし、Step
は失敗になります。そうでない場合、null
が返され、Step
のステータスは何も変更しません。
1.8. Passing Data to Future Steps
stepから別のstepにデータを渡したい場合が良くあります。これにはExecutionContext
を使います。2つのExecutionContexts
、Step
とJob
、はそれぞれ一長一短です。Step
のExecutionContext
はstep中のみ、Job
のExecutionContext
はJob
全体です。また、Step
のExecutionContext
はStep
のchunkがコミットする際に更新し、Job
のExecutionContext
はStep
終了時にだけ更新します。
このためStep
実行中はStep
のExecutionContext
に全データを保存してください。これによりStep
実行中にデータが適切に保存されます。もしデータをJob
のExecutionContext
に置く場合、Step
の実行中には永続化しません。Step
が失敗するとそのデータはロストします。
public class SavingItemWriter implements ItemWriter<Object> { private StepExecution stepExecution; public void write(List<? extends Object> items) throws Exception { // ... ExecutionContext stepContext = this.stepExecution.getExecutionContext(); stepContext.put("someKey", someObject); } @BeforeStep public void saveStepExecution(StepExecution stepExecution) { this.stepExecution = stepExecution; } }
次以降のSteps
でデータを利用可能にするには、step終了後にJob
のExecutionContext
へ"昇格"の必要があります。Spring BatchではExecutionContextPromotionListener
を使います。このリスナーにはExecutionContext
で昇格させたいキーを設定します。また、オプションで、昇格するexit codeのリストパターンも設定出来ます(COMPLETED
がデフォルト)。他リスナー同様、以下サンプルのようにStep
に登録します。
Java Configuration
@Bean public Job job1() { return this.jobBuilderFactory.get("job1") .start(step1()) .next(step1()) .build(); } @Bean public Step step1() { return this.stepBuilderFactory.get("step1") .<String, String>chunk(10) .reader(reader()) .writer(savingWriter()) .listener(promotionListener()) .build(); } @Bean public ExecutionContextPromotionListener promotionListener() { ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener(); listener.setKeys(new String[] {"someKey" }); return listener; }
以下サンプルのように、Job
のExecutionContext
から保存した値を取得するには以下のようにします。
public class RetrievingItemWriter implements ItemWriter<Object> { private Object someObject; public void write(List<? extends Object> items) throws Exception { // ... } @BeforeStep public void retrieveInterstepData(StepExecution stepExecution) { JobExecution jobExecution = stepExecution.getJobExecution(); ExecutionContext jobContext = jobExecution.getExecutionContext(); this.someObject = jobContext.get("someKey"); } }