https://docs.spring.io/spring-batch/4.1.x/reference/html/common-patterns.html#commonPatterns
https://qiita.com/kagamihoge/items/12fbbc2eac5b8a5ac1e0 俺の訳一覧リスト
1. Common Batch Patterns
ある種のジョブはSpring Batch標準コンポーネントのみの組み合わせで構築できます。ItemReader
とItemWriter
の実装は様々なケースに適用可能です。しかし、基本的には、カスタムコードの実装が必要です。アプリケーション開発者に対するSpring BatchのエントリーポイントはTasklet
, ItemReader
, ItemWriter
と各種リスナーです。シンプルなバッチジョブではSpring BatchのItemReader
を使用出来ますが、ItemWriter
やItemProcessor
にカスタムの処理な書き込みが必要となる場合があります。
このチャプターでは、カスタムビジネスロジックにおける共通パターンの例について解説します。これらは主としてリスナーを活用します。なお、ItemReader
やItemWriter
実装はリスナーも実装可能です。
1.1. Logging Item Processing and Failures
1アイテムごとにstepでエラーハンドリングを行い、特別なチャネルにロギングしたりDBにレコードを追加したり、をするための共通パターンです。chunk指向Step
(stepファクトリビーンで生成)では単純に、read
エラーはItemReadListener
、write
エラーはItemWriteListener
の実装により実現します。以下コードはreadとwriteエラーログ出力するリスナの例です。
public class ItemFailureLoggerListener extends ItemListenerSupport {
private static Log logger = LogFactory.getLog("item.error");
public void onReadError(Exception ex) {
logger.error("Encountered error on read", e);
}
public void onWriteError(Exception ex, List<? extends Object> items) {
logger.error("Encountered error on write", ex);
}
}
リスナ実装したら以下例のようにstepに登録します。
Java Configuration
@Bean
public Step simpleStep() {
return this.stepBuilderFactory.get("simpleStep")
...
.listener(new ItemFailureLoggerListener())
.build();
}
※ リスナーのonError()
での処理は、後にロールバックされるトランザクション内での処理になります。DBなどトランザクショナルなリソースをonError()
で使う場合、リスナーメソッドに宣言的トランザクション(詳細はSpring Core Reference Guide参照)を追加し、propagation attributeをREQUIRES_NEW
にしてください。
1.2. Stopping a Job Manually for Business Reasons
Spring BatchにはJobLauncher
にstop()
がありますが、これはアプリケーションプログラマというよりオペレータが使うものです。ただ、ビジネスロジック内でのジョブ実行停止したい場合がありえます。
一番シンプルな方法はRuntimeException
のスローです(無期限リトライやスキップしない例外)。例えば以下ではカスタム例外を使用しています。
public class PoisonPillItemProcessor<T> implements ItemProcessor<T, T> {
@Override
public T process(T item) throws Exception {
if (isPoisonPill(item)) {
throw new PoisonPillException("Poison pill detected: " + item);
}
return item;
}
}
別の方法としてstepを止めるには、以下例のように、ItemReader
でnull
を返します。
public class EarlyCompletionItemReader implements ItemReader<T> {
private ItemReader<T> delegate;
public void setDelegate(ItemReader<T> delegate) { ... }
public T read() throws Exception {
T item = delegate.read();
if (isEndItem(item)) {
return null;
}
return item;
}
}
上の例はCompletionPolicy
のデフォルト実装のnull
がバッチ完了を意味する挙動を利用しています。より複雑な完了ポリシーを実装してStep
に設定するにはSimpleStepFactoryBean
を使用します。
Java Configuration
@Bean
public Step simpleStep() {
return this.stepBuilderFactory.get("simpleStep")
.<String, String>chunk(new SpecialCompletionPolicy())
.reader(reader())
.writer(writer())
.build();
}
また別の方にStepExecution
にフラグを設定し、アイテム処理中にフレームワーク内でStep
実装がこれをチェックします。これを実装するには、StepExecution
にアクセスするためにStepListener
を実装してStep
に登録します。以下はフラグを設定するリスナーの例です。
public class CustomItemWriter extends ItemListenerSupport implements StepListener {
private StepExecution stepExecution;
public void beforeStep(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
public void afterRead(Object item) {
if (isPoisonPill(item)) {
stepExecution.setTerminateOnly(true);
}
}
}
フラグをセットする場合、デフォルトの振る舞いはstepでJobInterruptedException
をスローします。この振る舞いはStepInterruptionPolicy
で制御できます。なお、例外スローかしないかの選択肢しか無いので、ジョブは常にabnormal endingになります。
1.3. Adding a Footer Record
フラットファイルに書き込む場合、すべての処理完了後にファイル末尾にフッター行を追加したい場合があります。これはSpring BatchのFlatFileFooterCallback
により実装できます。FlatFileFooterCallback
(と対になるFlatFileHeaderCallback
)はFlatFileItemWriter
のオプションプロパティで以下のようにwriterに追加します。
Java Configuration
@Bean
public FlatFileItemWriter<String> itemWriter(Resource outputResource) {
return new FlatFileItemWriterBuilder<String>()
.name("itemWriter")
.resource(outputResource)
.lineAggregator(lineAggregator())
.headerCallback(headerCallback())
.footerCallback(footerCallback())
.build();
}
フッターのコールバックには1つだけメソッドがあり、フッター書き込み時に呼び出されます。
public interface FlatFileFooterCallback {
void writeFooter(Writer writer) throws IOException;
}
1.3.1. Writing a Summary Footer
フッター行に対するよくある要望に、出力処理中の情報を集約してファイル末尾にそれを追加する、があります。このフッターはファイルの要約やチェックサムに使われます。
例えば、バッチジョブがフラットファイルにTrade
行を書き込むとして、全Trades
の総合計をフッターに配置したい場合、ItemWriter
実装を以下のようにします。
public class TradeItemWriter implements ItemWriter<Trade>,
FlatFileFooterCallback {
private ItemWriter<Trade> delegate;
private BigDecimal totalAmount = BigDecimal.ZERO;
public void write(List<? extends Trade> items) throws Exception {
BigDecimal chunkTotal = BigDecimal.ZERO;
for (Trade trade : items) {
chunkTotal = chunkTotal.add(trade.getAmount());
}
delegate.write(items);
totalAmount = totalAmount.add(chunkTotal);
}
public void writeFooter(Writer writer) throws IOException {
writer.write("Total Amount Processed: " + totalAmount);
}
public void setDelegate(ItemWriter delegate) {...}
}
TradeItemWriter
のtotalAmount
はTrade
アイテム書き込み後に加算します。すべてのTrade
処理後、フレームワークはwriteFooter
を呼び、ファイルにtotalAmount
を挿入します。なお、write
には一時変数chunkTotal
があり、これにchunkのTrade
の合計を格納します。これは、write
でskipが発生した場合はtotalAmount
を変更しないためです。write
メソッドを最後まで実行し、例外がスローされなかったことを確認できたら、totalAmount
を更新します。
writeFooter
の使用には、TradeItemWriter
(FlatFileFooterCallback
の実装クラス)をfooterCallback
としてFlatFileItemWriter
にワイヤリングします。以下はその方法の例です。
Java Configuration
@Bean
public TradeItemWriter tradeItemWriter() {
TradeItemWriter itemWriter = new TradeItemWriter();
itemWriter.setDelegate(flatFileItemWriter(null));
return itemWriter;
}
@Bean
public FlatFileItemWriter<String> flatFileItemWriter(Resource outputResource) {
return new FlatFileItemWriterBuilder<String>()
.name("itemWriter")
.resource(outputResource)
.lineAggregator(lineAggregator())
.footerCallback(tradeItemWriter())
.build();
}
このTradeItemWriter
ような作り方はStep
が非リスタート可能の場合のみ正しく動作します。これは、このクラスはstateful(totalAmount
がある)だが、そのtotalAmount
をDBに永続化していないためです。よって、リスタート時にtotalAmount
を取得できません。このクラスをリスタート可能にするには、ItemStream
は以下例のようにopen
とupdate
を実装します。
public void open(ExecutionContext executionContext) {
if (executionContext.containsKey("total.amount") {
totalAmount = (BigDecimal) executionContext.get("total.amount");
}
}
public void update(ExecutionContext executionContext) {
executionContext.put("total.amount", totalAmount);
}
updateメソッドはExecutionContext
にtotalAmount
の現在値を保存します。openメソッドはExecutionContext
からtotalAmount
を取得し処理開始時の値として使用し、Step
の前回終了時点からリスタートします。
1.4. Driving Query Based ItemReaders
chapter on readers and writersで、ページングを使用するDB入力について解説しました。DB2など、たいていのDBで、他のオンラインアプリケーションなどで必要なテーブルを読み込む場合、過度の悲観的ロックは問題を起こす場合があります。加えて、過度に大きなデータセットに対するカーソルオープンが特定DBで問題を起こす場合があります。よって、データ読み取りに'Driving Query'を取るプロジェクトがあります。この方針は、以下図のように、返す必要のあるオブジェクト全体ではなく、キーをイテレートする動作をします。
Figure 1. Driving Query Job
上記例は、カーソルベースのサンプルで使用したのと同じ'FOO'テーブルです。ただし、行全体を選択するのではなく、ここのSQLステートメントではIDのみ選択しています。よって、read
はFOO
オブジェクトではなく、Integer
を返します。このIDの数値を使用し、後でFoo
オブジェクトという"詳細"を取得します。
Figure 2. Driving Query Example
ItemProcessor
はdriving queryから得たキーを基に'Foo'オブジェクトへと変換します。DAOを使用してキーを基にオブジェクト全体をクエリします。
1.5. Multi-Line Records
基本的にはフラットファイルは各レコードを1行になりますが、レコードを複数のフォーマットで複数行に展開することもあります。以下はそうした展開例の抜粋です。
HEA;0013100345;2007-02-15
NCU;Smith;Peter;;T;20014539;F
BAD;;Oak Street 31/A;;Small Town;00235;IL;US
FOT;2;2;267.34
1行の開始は'HEA'で始まり、1行の最終行は'FOT'で始まります。これを正しく扱うには以下を考慮する必要があります。
- 1度に1レコード読み込むのでなく、
ItemReader
は複数行レコードのグループとして読み込み、そのグループをItemWriter
に渡します。
- 各行の種類ごとにトークン処理を行う。
単一レコードを複数行に展開し、かつ、全体で何行来るかは事前に不明なため、ItemReader
はレコード全体を注意深く読みこむ必要があります。これを行うには、FlatFileItemReader
のラッパーとしてItemReader
を実装します。
Java Configuration
@Bean
public MultiLineTradeItemReader itemReader() {
MultiLineTradeItemReader itemReader = new MultiLineTradeItemReader();
itemReader.setDelegate(flatFileItemReader());
return itemReader;
}
@Bean
public FlatFileItemReader flatFileItemReader() {
FlatFileItemReader<Trade> reader = new FlatFileItemReaderBuilder<Trade>()
.name("flatFileItemReader")
.resource(new ClassPathResource("data/iosample/input/multiLine.txt"))
.lineTokenizer(orderFileTokenizer())
.fieldSetMapper(orderFieldSetMapper())
.build();
return reader;
}
各行に応じた適切なトークン分割をするため、特に重要なのは固定幅入力なので、デリゲート先のFlatFileItemReader
に対してPatternMatchingCompositeLineTokenizer
を使います。FlatFileItemReader in the Readers and Writers chapterに詳細があります。それから、デリゲート先のreaderは各行をラップ元のItemReader
にFieldSet
で返すためにPassThroughFieldSetMapper
を使います。
Java Content
@Bean
public PatternMatchingCompositeLineTokenizer orderFileTokenizer() {
PatternMatchingCompositeLineTokenizer tokenizer =
new PatternMatchingCompositeLineTokenizer();
Map<String, LineTokenizer> tokenizers = new HashMap<>(4);
tokenizers.put("HEA*", headerRecordTokenizer());
tokenizers.put("FOT*", footerRecordTokenizer());
tokenizers.put("NCU*", customerLineTokenizer());
tokenizers.put("BAD*", billingAddressLineTokenizer());
tokenizer.setTokenizers(tokenizers);
return tokenizer;
}
ラッパーはレコード終端を解釈する必要があります。よって、レコード終端に達するまで、デリゲート先のread
を繰り返し呼びます。各行を読み込むためには、ラッパーで返すアイテムを組み立てる必要があります。フッターに達すると、アイテムはItemProcessor
やItemWriter
に渡せるようになります。
private FlatFileItemReader<FieldSet> delegate;
public Trade read() throws Exception {
Trade t = null;
for (FieldSet line = null; (line = this.delegate.read()) != null;) {
String prefix = line.readString(0);
if (prefix.equals("HEA")) {
t = new Trade();
}
else if (prefix.equals("NCU")) {
Assert.notNull(t, "No header was found.");
t.setLast(line.readString(1));
t.setFirst(line.readString(2));
...
}
else if (prefix.equals("BAD")) {
Assert.notNull(t, "No header was found.");
t.setCity(line.readString(4));
t.setState(line.readString(6));
...
}
else if (prefix.equals("FOT")) {
return t;
}
}
Assert.isNull(t, "No 'END' was found.");
return null;
}
1.6. Executing System Commands
バッチジョブ内から外部のコマンドを呼ぶ必要があるケースは多いです。そうした処理はスケジューラで別途に開始出来ますが、実行時のメタデータの利点が失われます。また、マルチステップジョブを複数ジョブに分割する必要も発生します。
よくあるケースなので、Spring Batchは以下のようにシステムコマンドを呼ぶTasklet
を用意しています。
Java Configuration
@Bean
public SystemCommandTasklet tasklet() {
SystemCommandTasklet tasklet = new SystemCommandTasklet();
tasklet.setCommand("echo hello");
tasklet.setTimeout(5000);
return tasklet;
}
1.7. Handling Step Completion When No Input is Found
DBからの取得結果やファイルが0件なケースは良くあります。Step
では単純に何も処理が無いので0アイテムを読み込んで完了します。Spring Batch標準クラスのItemReader
はすべてデフォルトではこの方針になっています。入力があるのに何も書き込まないのは、混乱を招く場合があります(ファイル名がおかしくなったり、他同様な問題の発生)。For this reason, the metadata itself should be inspected to determine how much work the framework found to be processed. しかし、入力0件が例外的の場合にはどうすれば良いでしょうか。この場合、0件処理のメタデータをプログラム的にチェックして失敗とするのがベストです。これは一般的なユースケースなため、Spring Batchはその機能をリスナーNoWorkFoundStepExecutionListener
で提供しています。定義は以下の通りです。
public class NoWorkFoundStepExecutionListener extends StepExecutionListenerSupport {
public ExitStatus afterStep(StepExecution stepExecution) {
if (stepExecution.getReadCount() == 0) {
return ExitStatus.FAILED;
}
return null;
}
}
上のStepExecutionListener
は'afterStep'でStepExecution
のreadCount
プロパティで読み込み0件かどうかをチェックします。0件の場合、FAILEDのexit codeでリターンし、Step
は失敗になります。そうでない場合、null
が返され、Step
のステータスは何も変更しません。
1.8. Passing Data to Future Steps
stepから別のstepにデータを渡したい場合が良くあります。これにはExecutionContext
を使います。2つのExecutionContexts
、Step
とJob
、はそれぞれ一長一短です。Step
のExecutionContext
はstep中のみ、Job
のExecutionContext
はJob
全体です。また、Step
のExecutionContext
はStep
のchunkがコミットする際に更新し、Job
のExecutionContext
はStep
終了時にだけ更新します。
このためStep
実行中はStep
のExecutionContext
に全データを保存してください。これによりStep
実行中にデータが適切に保存されます。もしデータをJob
のExecutionContext
に置く場合、Step
の実行中には永続化しません。Step
が失敗するとそのデータはロストします。
public class SavingItemWriter implements ItemWriter<Object> {
private StepExecution stepExecution;
public void write(List<? extends Object> items) throws Exception {
ExecutionContext stepContext = this.stepExecution.getExecutionContext();
stepContext.put("someKey", someObject);
}
@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
}
次以降のSteps
でデータを利用可能にするには、step終了後にJob
のExecutionContext
へ"昇格"の必要があります。Spring BatchではExecutionContextPromotionListener
を使います。このリスナーにはExecutionContext
で昇格させたいキーを設定します。また、オプションで、昇格するexit codeのリストパターンも設定出来ます(COMPLETED
がデフォルト)。他リスナー同様、以下サンプルのようにStep
に登録します。
Java Configuration
@Bean
public Job job1() {
return this.jobBuilderFactory.get("job1")
.start(step1())
.next(step1())
.build();
}
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(10)
.reader(reader())
.writer(savingWriter())
.listener(promotionListener())
.build();
}
@Bean
public ExecutionContextPromotionListener promotionListener() {
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys(new String[] {"someKey" });
return listener;
}
以下サンプルのように、Job
のExecutionContext
から保存した値を取得するには以下のようにします。
public class RetrievingItemWriter implements ItemWriter<Object> {
private Object someObject;
public void write(List<? extends Object> items) throws Exception {
}
@BeforeStep
public void retrieveInterstepData(StepExecution stepExecution) {
JobExecution jobExecution = stepExecution.getJobExecution();
ExecutionContext jobContext = jobExecution.getExecutionContext();
this.someObject = jobContext.get("someKey");
}
}