kagamihogeの日記

kagamihogeの日記です。

Spring Batch 4.1.x - Reference Documentation - ItemReaders and ItemWriters - 1.1-1.6のテキトー翻訳

https://docs.spring.io/spring-batch/4.1.x/reference/html/readersAndWriters.html#readersAndWriters

https://qiita.com/kagamihoge/items/12fbbc2eac5b8a5ac1e0 俺の訳一覧リスト

1. ItemReaders and ItemWriters

すべてのバッチ処理は、大規模データの読み込み・何らかの計算あるいは変換処理・結果の書き込み、というシンプルな形で説明が可能です。Spring Batchはバルク読み込みと書き込みを実行する3つの中核インタフェース、ItemReader, ItemProcessor, ItemWriter、を用意しています。

1.1. ItemReader

コンセプトはシンプルですが、ItemReaderは多数の異なる種類の入力からデータを受け取るクラスです。よくある例は以下の通りです。

  • フラットファイル: フラットファイルのitem readerはフラットファイルからデータ行を読み込みます。このファイルは基本的にレコード定義を、各フィールドはファイルの固定位置で定義するか、なんらかの特殊文字(カンマ)による区切り、で行います。
  • XML: XMLItemReadersは、パース処理するテクノロジとは独立してXMLを処理し、オブジェクトのマッピングとvalidateをします。入力データはXSDスキーマに対するXMLファイルvalidationが可能です。
  • Database: DBリソースのresultsetからprocessに回すオブジェクトにマッピングします。デフォルトのSQL ItemReader 実装は戻りオブジェクトにRowMapperを実行し、リスタートする場合はカレントの行をトラッキングし、基本的な統計を保存し、後述するトランザクション機能を提供します。

様々な用途が考えられますが、このチャプターでは基本的な事柄に焦点をあてます。利用可能なすべてのItemReader実装のリストはAppendix Aを参照してください。

ItemReaderは、以下のインタフェース定義のように、汎用入力操作用の基礎的インタフェースです。

public interface ItemReader<T> {

    T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;

}

readメソッドはItemReaderの中核を定義しています。このメソッドは1つのitemを返すか、これ以上読み込みものが無い場合はnullを返します。itemは、ファイルの1行、DBの1行、XMLの1要素、を表します。これらのitemは扱いやすいドメインオブジェクト(Trade, Fooなど)に基本的には変換しますが、必須ではありません。

ItemReaderの実装は基本的には単方向のみです。ただし、基底リソースがtransactional(JMSキューなど)の場合、ロールバック発生時のその次にread呼び出しをすると同一のlogical itemを返す可能性があります。また、ItemReaderで処理するアイテムが無いけど例外をスローしないケースは一考の余地があります。たとえば、DBのItemReaderのクエリが0件の場合、readの初回呼び出しでnullを返します。

1.2. ItemWriter

ItemWriterは機能的にはItemReaderと似ていますがそれとは逆の操作をします。何らかのリソースを、配置・open・closeが必要な点は同じで、読み込みではなく書き込む点が異なります。DBやキューの場合、行う操作は、insert, update, sendになります。出力のシリアライズフォーマットはバッチジョブそれぞれ固有のものになります。

ItemReader同様ItemWriterは、以下のような、極めて汎用的なインタフェース定義です。

public interface ItemWriter<T> {

    void write(List<? extends T> items) throws Exception;

}

ItemReaderread同様、writeItemWriterの振る舞いの基礎を提供します。open状態のwriterに渡されるitemのリストを書き込みます。基本的に、itemは複数まとめて('batched' together)chunkに入れて出力するという想定なので、インタフェースはwriter自身でitemを作成するのではなく、itemのリストを受け取ります。リストの書き込み後、writeメソッドを返す前に、状況に応じたflushを実行します。例えば、Hibernate DAOに書き込む場合、各アイテムごとに複数回書き込みを実行します。その後writerはreturn前にhibernate sessionのflushを呼び出します。

1.3. ItemProcessor

ItemReaderItemWriterはここに書きたいタスクがある場合には有用ですが、書き込む前に実行したいビジネスロジックとは何でしょうか。書き込みと読み込みの両方で可能な一つのやり方にcomposite patternがあります。これは、別のItemWriterを持つItemWriterか、もしくは、別のItemReaderを持つItemReaderです。以下はその例です。

public class CompositeItemWriter<T> implements ItemWriter<T> {

    ItemWriter<T> itemWriter;

    public CompositeItemWriter(ItemWriter<T> itemWriter) {
        this.itemWriter = itemWriter;
    }

    public void write(List<? extends T> items) throws Exception {
        //Add business logic here
       itemWriter.write(items);
    }

    public void setDelegate(ItemWriter<T> itemWriter){
        this.itemWriter = itemWriter;
    }
}

上のクラスは、何らかのビジネスロジック実行後に、別のItemWriterにデリゲートします。このパターンは同様にItemReaderでも使用可能で、メインとなるItemReaderの入力を基にして更に別の参照データを得るような場合に使えます。write呼び出しを制御したい場合にも有用です。ただし、実際の書き込み前にアイテムを変換しておきたい場合、そのクラス自身ではwriteの必要はありません。そこではただ単にアイテムの修正のみ行います。この場合、Spring Batchの、以下インフェース定義を持つItemProcessorを使います。

public interface ItemProcessor<I, O> {

    O process(I item) throws Exception;
}

ItemProcessorはシンプルです。1つのオブジェクトを与え、変換して返します。入出力のオブジェクトは同一型になる場合もならない場合もあります。ビジネスロジックはprocess内で適用する点と、その中身はロジックを作成する開発者に完全に委ねられている、という点が重要です。ItemProcessorはstepに直接ワイヤリングできます。たとえば、ItemReaderFooクラスで読み込んで書き込み前にBarに変換する必要がある、とします。以下は変換を実行するItemProcessorの例です。

public class Foo {}

public class Bar {
    public Bar(Foo foo) {}
}

public class FooProcessor implements ItemProcessor<Foo,Bar>{
    public Bar process(Foo foo) throws Exception {
        // FooからBarへのシンプルな変換
        return new Bar(foo);
    }
}

public class BarWriter implements ItemWriter<Bar>{
    public void write(List<? extends Bar> bars) throws Exception {
        //barsの書き込み
    }
}

上の例では、Foo, Barのクラスがあり、ItemProcessorを実装するFooProcessorがあります。変換はシンプルですが、これ以外の型へ変換することも可能です。BarWriterBarオブジェクトを書き込み、これ以外の型が来る場合は例外をスローします。同じく、FooProcessorFoo以外の場合に例外をスローします。FooProcessorは以下例のようにStepにインジェクションします。

Java Configuration

@Bean
public Job ioSampleJob() {
        return this.jobBuilderFactory.get("ioSampleJOb")
                                .start(step1())
                                .end()
                                .build();
}

@Bean
public Step step1() {
        return this.stepBuilderFactory.get("step1")
                                .<String, String>chunk(2)
                                .reader(fooReader())
                                .processor(fooProcessor())
                                .writer(barWriter())
                                .build();
}

1.3.1. Chaining ItemProcessors

大抵のケースで変換は一つで十分ですが、複数のItemProcessor実装を一緒に実行したい場合はどうでしょうか。前に開設したcomposite patternで実現可能です。FooからBarに変換し、それからFooBarに変換して書き込む例は以下です。

public class Foo {}

public class Bar {
    public Bar(Foo foo) {}
}

public class Foobar {
    public Foobar(Bar bar) {}
}

public class FooProcessor implements ItemProcessor<Foo,Bar>{
    public Bar process(Foo foo) throws Exception {
        //Perform simple transformation, convert a Foo to a Bar
        return new Bar(foo);
    }
}

public class BarProcessor implements ItemProcessor<Bar,Foobar>{
    public Foobar process(Bar bar) throws Exception {
        return new Foobar(bar);
    }
}

public class FoobarWriter implements ItemWriter<Foobar>{
    public void write(List<? extends Foobar> items) throws Exception {
        //write items
    }
}

FooProcessorBarProcessorは、以下例のように、結果としてFoobar```を返します。

CompositeItemProcessor<Foo,Foobar> compositeProcessor =
                                      new CompositeItemProcessor<Foo,Foobar>();
List itemProcessors = new ArrayList();
itemProcessors.add(new FooTransformer());
itemProcessors.add(new BarTransformer());
compositeProcessor.setDelegates(itemProcessors);

先の例同様に、composite processorStepに設定します。

Java ConfigurationStepListener

@Bean
public Job ioSampleJob() {
        return this.jobBuilderFactory.get("ioSampleJob")
                                .start(step1())
                                .end()
                                .build();
}

@Bean
public Step step1() {
        return this.stepBuilderFactory.get("step1")
                                .<String, String>chunk(2)
                                .reader(fooReader())
                                .processor(compositeProcessor())
                                .writer(foobarWriter())
                                .build();
}

@Bean
public CompositeItemProcessor compositeProcessor() {
        List<ItemProcessor> delegates = new ArrayList<>(2);
        delegates.add(new FooProcessor());
        delegates.add(new BarProcessor());

        CompositeItemProcessor processor = new CompositeItemProcessor();

        processor.setDelegates(delegates);

        return processor;
}

1.3.2. Filtering Records

item processorの良くある使い方の一つはItemWriterに渡す前にレコードをフィルタリングします。フィルタはスキップとは異なるアクションです。スキップはレコードがinvalidなことを示し、一方、フィルタは書き込まないレコードを意味します。

例えば、三種の異なるタイプのレコード、挿入・更新・削除、を持つファイルを読むバッチジョブを考えます。対象システムでレコード削除が未サポートの場合、削除レコードはItemWriterには送りたくありません。しかし、レコード自体は不正データでは無いので、スキップではなくフィルタをかけたいと考えます。この結果、ItemWriterは挿入・更新レコードのみ受け取ります。

レコードをフィルタするには、ItemProcessornullを返します。フレームワークは戻り値nullがあるとItemWriterに渡すレコードリストにそのアイテムを追加しません。ItemProcessorの例外スローはスキップになります。

1.3.3. Fault Tolerance

chunkをロールバックすると、読み込みでキャッシュしたアイテムのreprocessが可能です。stepにfault tolerant(skipやretry処理)を設定する場合、ItemProcessorはべき等に実装します。基本的には、ItemProcessorで入力アイテムは一切変更せず、変更は結果となるインスタンスにだけ行います。

1.4. ItemStream

ItemReadersItemWritersはそれぞれ固有の役割を持ちますが、両者に共通な役割のインタフェースがあります。基本的に、これは、バッチジョブのscopeの一部となり、readerとwriterでopen,closeをし、永続化状態のメカニズムを必要とします。ItemStreamは以下のインタフェースを提供します。

public interface ItemStream {

    void open(ExecutionContext executionContext) throws ItemStreamException;

    void update(ExecutionContext executionContext) throws ItemStreamException;

    void close() throws ItemStreamException;
}

各メソッドの説明の前に、ExecutionContextについて触れます。ItemStreamも実装するItemReaderのクライアントはread前にopenを呼び出し、ファイルなど何らかのリソースのopenやコネクション取得を行います。同様の制限がItemWriter実装にも適用されます。Chapter 2で触れたように、ExecutionContextに再開用データがある場合、初期状態ではない位置からItemReaderItemWriterを開始するためにそのデータを用います。逆に、openしたリソースの安全な解放を保証するべくcloseを呼びます。updateは現在保持している状態をExecutionContextにロードするために呼ばれます。現在の状態をコミット前にDBへの永続化を保証するため、コミット前に呼ばれます。

特殊な場合としてItemStreamのクライアントがStepの場合、ExecutionContextが各StepExecutionで作成され、ユーザはexecutitonの状態を格納でき、同一のJobInstanceで再開する場合はその状態を返します。Quartzに詳しい場合は、QuartsのJobDataMapに良く似たセマンティクスになっています。

1.5. The Delegate Pattern and Registering with the Step

CompositeItemWriterは、Spring Batchの共通パターンの一つ、delegation patternの例です。委譲先はStepListenerなどののコールバックインタフェースを実装する場合があります。そのインタフェースを実装し、かつ、JobStepの一部としてSpring Batch Coreと連携させる場合、Stepに明示的な登録が必要となる場合がほとんどです。reader, writer, processorStepに直接ワイヤリングすると、ItemStreamStepListenerを実装している場合、自動登録されます。しかし、Stepは委譲先を関知しないため、以下例のように、これらをlistenerやstreamとしてインジェクションする必要があります。

Java ConfigurationStepListener

@Bean
public Job ioSampleJob() {
        return this.jobBuilderFactory.get("ioSampleJob")
                                .start(step1())
                                .end()
                                .build();
}

@Bean
public Step step1() {
        return this.stepBuilderFactory.get("step1")
                                .<String, String>chunk(2)
                                .reader(fooReader())
                                .processor(fooProcessor())
                                .writer(compositeItemWriter())
                                .stream(barWriter())
                                .build();
}

@Bean
public CustomCompositeItemWriter compositeItemWriter() {

        CustomCompositeItemWriter writer = new CustomCompositeItemWriter();

        writer.setDelegate(barWriter());

        return writer;
}

@Bean
public BarWriter barWriter() {
        return new BarWriter();
}

1.6. Flat Files

バルクデータ連携の最も一般的な方式の一つはフラットファイルです。その構造(XSD)定義に対する合意に基づくXMLとは異なり、フラットファイルを読む側は前もってその構造を把握しておく必要があります。通常、フラットファイルは以下2つのどちらかのタイプ、デリミタか固定長、になります。デリミタファイルはデリミタ、カンマなど、でフィールドを区切ります。固定長ファイルはフィールド長が決まっています。

1.6.1. The FieldSet

Spring Batchでフラットファイルを処理する場合、入力・出力を問わず、最も重要なクラスの一つがFieldSetです。世の中多数のアーキテクチャとライブラリがファイル読み込み機能を提供しますが、これらは通常Stringかその配列を返します。これはやりたい事の半分でしかありません。FieldSetはファイルリソースとフィールドのバインディングを行うためのSpring Batchの機能です。これにより、DB入力と同じ方法でファイル入力も扱えます。FieldSetはコンセプト的にはJDBCResultSetと似ています。FieldSetは単一の引数、String配列のトークン、を取ります。オプションで、フィールド名も設定可能で、このフィールドはインデックスか名前のどちらかでアクセスします。

String[] tokens = new String[]{"foo", "1", "true"};
FieldSet fs = new DefaultFieldSet(tokens);
String name = fs.readString(0);
int value = fs.readInt(1);
boolean booleanValue = fs.readBoolean(2);

FieldSetには、Date, BigDecimalなどのオプションが多数存在します。FieldSetの最大の利点はフラットファイル入力のパースに一貫性を持たせられる点です。バッチジョブのパースをそれぞれ好き勝手に異なる方法でするより、フォーマット例外のハンドリングや、シンプルなデータ変換において、一貫性を持たせられます。

1.6.2. FlatFileItemReader

フラットファイルはおおむね2次元(表形式)データを持ちます。Spring Batchフレームワークでのフラットファイル読み込みはFlatFileItemReaderを呼ぶクラスで設定します。FlatFileItemReaderはフラットファイルの読み込みとパースの基本的な機能を提要します。FlatFileItemReaderの2つの重要な依存性はResourceLineMapperです。LineMapperは次のセクションで解説します。resourceプロパティはSpring CoreのResourceです。この型のbeanの生成方法のドキュメントはSpring Framework, Chapter 5. Resourcesにあります。よって、このガイドではResourceオブジェクトの生成については以下のシンプルな例に留めます。

Resource resource = new FileSystemResource("resources/trades.csv");

複雑なバッチ環境では、ディレクトリ構造はEAIインフラで管理する事が多く、ここでは、外部インターフェース向けのdrop zoneはFTPアップロード先からバッチ処理ディレクトリまたはその逆へのファイル移動で確立します。ファイル移動ユーティリティはSpring Batchのスコープ外ですが、ジョブストリームのステップにファイル移動ユーティリティを使う事はよくあります。バッチアーキテクチャは処理対象のファイルの場所だけは知っている必要があります。Spring Batchは開始ポイントからパイプへのデータフィード処理を開始します。なお、Spring Integrationはこの種のサービスを多数提供します。

FlatFileItemReaderのその他のプロパティには、以下表のような、データ処理方法の指定が可能です。

Table 1. FlatFileItemReader Properties

Property Type Description
comments String[] コメント行を示すline prefixesを指定
encoding String テキストエンコーディングを指定。デフォルトはCharset.defaultCharset()
lineMapper LineMapper StringからアイテムのObjectに変換
linesToSkip int ファイル先頭から無視する行数
recordSeparatorPolicy RecordSeparatorPolicy 行端識別に使用し、クオートで囲む1レコードを複数行にまたがらせたい場合などに使用
resource Resource 読込対象リソース
skippedLinesCallback LineCallbackHandler スキップファイル行を受けるインタフェース。linesToSkipが2の場合このインタフェースは2回呼ばれる。
strict boolean strictモードでは入力リソースが存在しない場合readerはExecutionContextに 例外をスローする。そうでない場合、ログ出力して処理継続する。

LineMapper

RowMapperResultSetなどの低レベル要素を受け取りObjectを返すのと同様、フラットファイルの処理ではStringの行をObjectに変換します。

public interface LineMapper<T> {

    T mapLine(String line, int lineNumber) throws Exception;

}

基本要素として、現在行とその行番号があり、このmapperはドメインオブジェクトを返します。RowMapper同様、行番号とそれに関連付けられたResultSetの各行のように、行番号と各行を持ちます。一意確認用にドメインジェクトと行番号を関連付けたり、ログに行番号を含めたりが出来ます。ただし、RowMapperと異なり、LineMapperには生データの行が渡されるので、前述のように、これだけでは不十分です。本ドキュメント後半で解説するように、オブジェクトにマッピング可能なFieldSetに行をトークンで分割してください。

LineTokenizer

入力行をFieldSet````に変換するインタフェースが必要です。FieldSetに変換したフラットファイルデータのフォーマットは多数考えられるためです。Spring Batchでは、それ用のインタフェースがLineTokenizer```です。

public interface LineTokenizer {

    FieldSet tokenize(String line);

}

LineTokenizerの役割は、入力行(理論上Stringには複数行含める事も可能)を与えると、その行に対するFieldSetを返します。FieldSetFieldSetMapperに渡します。Spring Batchは以下のLineTokenizer実装を提供します。

  • DelimitedLineTokenizer: レコードのフィールドがデリミタで区切られているファイルで使用する。最も一般的なのはカンマで、pipeやセミコロンもよく使います。
  • FixedLengthTokenizer: レコードのフィールドが固定長のファイルで使用する。各フィールド長は個々のレコードタイプごとに定義する。
  • PatternMatchingCompositeLineTokenizer: 行がパターンにマッチすると対応するLineTokenizerを使用する。

FieldSetMapper

FieldSetMapperには単一メソッドmapFieldSetがあり、FieldSetを取りオブジェクトにマッピングします。このオブジェクトは、job仕様に応じて、DTOドメインオブジェクト・配列などになります。FieldSetMapperLineTokenizerと組み合わせて、リソースから適当な型にデータ行を変換するために使います。インタフェース定義は以下の通りです。

public interface FieldSetMapper<T> {

    T mapFieldSet(FieldSet fieldSet) throws BindException;

}

JdbcTemplateRowMapperと同じようなパターンになっています。

DefaultLineMapper

これまでの解説でフラットファイル読み込みのための基本的なインターフェース定義を見てきました。3つの基本的なステップが明確になりました。

  1. ファイルから1行読み込む。
  2. StringLineTokenizer#tokenize()に渡してFieldSetを取得する。
  3. トークン処理が返すFieldSetFieldSetMapperに渡し、ItemReader#read()が結果を返す。

上述の2つのインタフェースが2つのタスク、FieldSet変換とFieldSetからドメインオブジェクトへのマッピング、を分離しています。LineTokenizerの入力はLineMapper(行)の入力と対応関係にあり、FieldSetMapperの出力はLineMapperの出力と対応関係があるので、LineTokenizerFieldSetMapperの両方を使うデフォルト実装を用意しています。DefaultLineMapperは、以下のようなクラス定義で、大抵はこの振る舞いで十分です。

public class DefaultLineMapper<T> implements LineMapper<>, InitializingBean {

    private LineTokenizer tokenizer;

    private FieldSetMapper<T> fieldSetMapper;

    public T mapLine(String line, int lineNumber) throws Exception {
        return fieldSetMapper.mapFieldSet(tokenizer.tokenize(line));
    }

    public void setLineTokenizer(LineTokenizer tokenizer) {
        this.tokenizer = tokenizer;
    }

    public void setFieldSetMapper(FieldSetMapper<T> fieldSetMapper) {
        this.fieldSetMapper = fieldSetMapper;
    }
}

上述のデフォルト実装の機能は、(以前のバージョンのように)reader自体に組み込むのではなく、行を直接処理する場合のパース処理に高い柔軟性をユーザに提供しています。

Simple Delimited File Reading Example

以下は実際のシナリオに沿ってフラットファイルを読み込む方法の例の解説です。このバッチジョブは以下のファイルからフットボールプレイヤーを読み込みます。

ID,lastName,firstName,position,birthYear,debutYear
"AbduKa00,Abdul-Jabbar,Karim,rb,1974,1996",
"AbduRa00,Abdullah,Rabih,rb,1975,1999",
"AberWa00,Abercrombie,Walter,rb,1959,1982",
"AbraDa00,Abramowicz,Danny,wr,1945,1967",
"AdamBo00,Adams,Bob,te,1946,1969",
"AdamCh00,Adams,Charlie,wr,1979,2003"

ファイルの中身は以下のPlayerドメインオブジェクトにマッピングします。

public class Player implements Serializable {

    private String ID;
    private String lastName;
    private String firstName;
    private String position;
    private int birthYear;
    private int debutYear;

    public String toString() {
        return "PLAYER:ID=" + ID + ",Last Name=" + lastName +
            ",First Name=" + firstName + ",Position=" + position +
            ",Birth Year=" + birthYear + ",DebutYear=" +
            debutYear;
    }

    // setters and getters...
}

FieldSetPlayerマッピングするには、以下のように、プレイヤーを返すFieldSetMapperを定義します。

protected static class PlayerFieldSetMapper implements FieldSetMapper<Player> {
    public Player mapFieldSet(FieldSet fieldSet) {
        Player player = new Player();

        player.setID(fieldSet.readString(0));
        player.setLastName(fieldSet.readString(1));
        player.setFirstName(fieldSet.readString(2));
        player.setPosition(fieldSet.readString(3));
        player.setBirthYear(fieldSet.readInt(4));
        player.setDebutYear(fieldSet.readInt(5));

        return player;
    }
}

次に、FlatFileItemReaderを正しく設定してreadを呼ぶことで、ファイルを読み込みます。

FlatFileItemReader<Player> itemReader = new FlatFileItemReader<Player>();
itemReader.setResource(new FileSystemResource("resources/players.csv"));
//DelimitedLineTokenizer defaults to comma as its delimiter
DefaultLineMapper<Player> lineMapper = new DefaultLineMapper<Player>();
lineMapper.setLineTokenizer(new DelimitedLineTokenizer());
lineMapper.setFieldSetMapper(new PlayerFieldSetMapper());
itemReader.setLineMapper(lineMapper);
itemReader.open(new ExecutionContext());
Player player = itemReader.read();

readはファイルの各行を基にPlayerオブジェクトを返します。EOFに達するとnullを返します。

Mapping Fields by Name

DelimitedLineTokenizerFixedLengthTokenizerの双方で使用可能な機能があり、JDBCResultSetと似たような機能を持ちます。フィールド名をLineTokenizerに設定してマッピング関数の可読性を上げられます。まず、フラットファイルの全フィールドのカラム名をtokenizerに設定します。以下がその例です。

tokenizer.setNames(new String[] {"ID", "lastName","firstName","position","birthYear","debutYear"});

FieldSetMapperは以下のように上記のカラム名を使います。

public class PlayerMapper implements FieldSetMapper<Player> {
    public Player mapFieldSet(FieldSet fs) {

       if(fs == null){
           return null;
       }

       Player player = new Player();
       player.setID(fs.readString("ID"));
       player.setLastName(fs.readString("lastName"));
       player.setFirstName(fs.readString("firstName"));
       player.setPosition(fs.readString("position"));
       player.setDebutYear(fs.readInt("debutYear"));
       player.setBirthYear(fs.readInt("birthYear"));

       return player;
   }
}

Automapping FieldSets to Domain Objects

多くの場合、FieldSetMapperを書くことはJdbcTemplateRowMapper書くことと同じくらいに面倒です。名前とマッチするフィールドに、JavaBeanのsetterを用いて、自動マッピングするFieldSetMapperをSpring Batchは提供しています。再度フットボールの例にとると、BeanWrapperFieldSetMapperは以下のようになります。

Java Configuration

@Bean
public FieldSetMapper fieldSetMapper() {
        BeanWrapperFieldSetMapper fieldSetMapper = new BeanWrapperFieldSetMapper();

        fieldSetMapper.setPrototypeBeanName("player");

        return fieldSetMapper;
}

@Bean
@Scope("prototype")
public Player player() {
        return new Player();
}

FieldSetの各エントリに対し、マッパーはPlayer新規インスタンス(このためprototypeスコープが飛鳥。)の対応するsetterを参照します。これはSpringコンテナがプロパティ名にマッチするsetterを参照するのと同様です。FieldSetの使用可能なフィールドをマッピングし、Playerオブジェクトを返します。上記設定以外のコードは不要です。

Fixed Length File Formats

これまではデリミタファイルの詳細のみ解説してきました。しかし、それだけでは片手落ちです。固定長フォーマットのフラットファイルを使用する組織は数多く存在します。固定長の例は以下の通りです。

UK21341EAH4121131.11customer1
UK21341EAH4221232.11customer2
UK21341EAH4321333.11customer3
UK21341EAH4421434.11customer4
UK21341EAH4521535.11customer5

1つの大きなフィールドに見えますが、実際には4つの独立したフィールドです。

  1. ISIN: 注文商品の一意識別子 - 12文字
  2. Quantity: 個数 - 3文字
  3. Price: 価格 - 5文字
  4. Customer: 顧客ID - 9文字

FixedLengthLineTokenizerを設定する場合、以下例のように、長さをレンジ形式で指定する必要があります。

※ レンジ形式は専用のproperty editorのRangeArrayPropertyEditorApplicationContextに入れる必要があります。ただし、このbeanはbatch namespaceを使う場合はApplicationContextに自動設定されます。

Java Configuration

@Bean
public FixedLengthTokenizer fixedLengthTokenizer() {
        FixedLengthTokenizer tokenizer = new FixedLengthTokenizer();

        tokenizer.setNames("ISIN", "Quantity", "Price", "Customer");
        tokenizer.setColumns(new Range(1-12),
                                                new Range(13-15),
                                                new Range(16-20),
                                                new Range(21-29));

        return tokenizer;
}

FixedLengthLineTokenizerはこれまでに説明してきたLineTokenizerの一種なので、デリミタの場合と同様にFieldSetを返します。その出力処理についても同様で、BeanWrapperFieldSetMapperなどが使えます。

Multiple Record Types within a Single File

これまでのファイル読み込みサンプルは説明簡略化のため、ファイルの全レコードが同一フォーマットである、という仮定を置いていました。しかし、そうでない場合もあります。各レコードの異なるフォーマットに異なるトークン処理をして異なるオブジェクトにマッピングするファイルもありえます。以下はそうしたファイルの抜粋です。

USER;Smith;Peter;;T;20014539;F
LINEA;1044391041ABC037.49G201XX1383.12H
LINEB;2134776319DEF422.99M005LI

このファイルには3種類のレコード、USER", "LINEA", and "LINEB"、があります。"USER"はUserに相当し、"LINEA"と"LINEB"は共にLineに相当し、"LINEA"は"LINEB"よりも多くのデータを持ちます。

ItemReaderは個々の行を独立に読みますが、ItemWriterが正しくアイテムを受け取れるように、各行に対して異なるLineTokenizerFieldSetMapperを指定する必要があります。PatternMatchingCompositeLineMapperは、パターンとLineTokenizerマッピングおよびパターンとFieldSetMapperマッピング、を指定することで、これを簡単に設定できます。

Java Configuration

@Bean
public PatternMatchingCompositeLineMapper orderFileLineMapper() {
        PatternMatchingCompositeLineMapper lineMapper =
                new PatternMatchingCompositeLineMapper();

        Map<String, LineTokenizer> tokenizers = new HashMap<>(3);
        tokenizers.put("USER*", userTokenizer());
        tokenizers.put("LINEA*", lineATokenizer());
        tokenizers.put("LINEB*", lineBTokenizer());

        lineMapper.setTokenizers(tokenizers);

        Map<String, FieldSetMapper> mappers = new HashMap<>(2);
        mappers.put("USER*", userFieldSetMapper());
        mappers.put("LINE*", lineFieldSetMapper());

        lineMapper.setFieldSetMappers(mappers);

        return lineMapper;
}

この例では、"LINEA"と"LINEB" はそれぞれ別のLineTokenizerですが、FieldSetMapperは同じものを使います。

PatternMatchingCompositeLineMapperPatternMatcher#matchで各行に対する委譲先の選択を行います。PatternMatcherには2つのワイルドカード特殊文字が使えます。クエスチョンマーク("?")は1文字のみにマッチし、アスタリスク("")はゼロ文字以上にマッチします。上の設定例では、すべてのパターンの最後にアスタリスクを付与しており、各行のプレフィックスとマッチするようにしています。PatternMatcherは、設定順序に関わらず、常に可能な限り最も一致するパターンにマッチします(matches the most specific pattern possible)。このため、"LINE"と"LINEA"がパターンリストにある場合、"LINEA"はパターン"LINEA"にマッチし、"LINEB"はパターン"LINE"にマッチします。なお、アスタリスクのみ("")はデフォルトとして振る舞い、他パターンがマッチしないすべての行にマッチします。

Java Configuration

...
tokenizers.put("*", defaultLineTokenizer());
...

また、トークン処理にPatternMatchingCompositeLineTokenizerを単独で使う事も可能です。

複数行にまたがるレコードを持つフラットファイルもあります。これの対処には、さらに複雑な方法が必要となります。このための一般的なパターンの解説はmultiLineRecordsのサンプルコードにあります。

Exception Handling in Flat Files

トークン処理が例外をスローするケースは多数考えられます。フラットファイルが不完全で不正確なフォーマットのレコードを持つ場合があります。大抵の場合は、ログに問題・オリジナルの行・行番後を出力し、エラー行をスキップします。ログは後々に別のバッチジョブや手動調査に使います。このため、Spring Batchはパース例外処理、FlatFileParseExceptionFlatFileFormatException、の例外の階層を持っています。ファイル読み込み時に何らかのエラーが発生する場合、FlatFileItemReaderFlatFileParseExceptionをスローします。LineTokenizer実装はトークン処理中に発生したエラーを示すFlatFileFormatExceptionをスローします。

IncorrectTokenCountException

DelimitedLineTokenizerFixedLengthLineTokenizerFieldSetの生成にカラム名を指定します。しかし、カラム名の個数がトークン処理時にマッチしない場合、FieldSetは生成できず、トークンの個数と期待値を持つIncorrectTokenCountExceptionをスローします。

tokenizer.setNames(new String[] {"A", "B", "C", "D"});

try {
    tokenizer.tokenize("a,b,c");
}
catch(IncorrectTokenCountException e){
    assertEquals(4, e.getExpectedCount());
    assertEquals(3, e.getActualCount());
}

tokenizerには4つのカラム名を設定していますが、ファイルからは3トークンしか検出出来ないと、IncorrectTokenCountExceptionをスローします。

IncorrectLineLengthException

固定長フォーマットの場合、デリミタとは異なり、パース時に各カラムが厳密に定義した長さに従う必要があります。行の長さが異なる場合、例外をスローします。

tokenizer.setColumns(new Range[] { new Range(1, 5),
                                   new Range(6, 10),
                                   new Range(11, 15) });
try {
    tokenizer.tokenize("12345");
    fail("Expected IncorrectLineLengthException");
}
catch (IncorrectLineLengthException ex) {
    assertEquals(15, ex.getExpectedLength());
    assertEquals(5, ex.getActualLength());
}

上記のtokenizerの設定範囲は1-5, 6-10, and 11-15です。よって、行の合計の長さは15です。しかし、上の例では、長さ5の行が渡され、IncorrectLineLengthExceptionをスローします。最初のカラムのみマッピングするよりも、例外スローによって行処理を早めに失敗させる事が可能となり、FieldSetMapperで2カラム目を読み込む際にエラーにするよりも多くの情報を返せます。しかし、行の長さが常に一定ではないケースも存在します。このため、行の長さのvalidationは'strict'プロパティによりオフにできます。

tokenizer.setColumns(new Range[] { new Range(1, 5), new Range(6, 10) });
tokenizer.setStrict(false);
FieldSet tokens = tokenizer.tokenize("12345");
assertEquals("12345", tokens.readString(0));
assertEquals("", tokens.readString(1));

上の例は、okenizer.setStrict(false)以外、一つ前の例とおおむね同一です。この設定により、tokenizerで行のトークン処理時に長さチェックをしなくなります。これでFieldSetは正しく生成されて返されます。なお、残りの値については空のトークンになります。

1.6.3. FlatFileItemWriter

フラットファイル書き込みには読み込みと同じ解決すべき問題と課題があります。transactionalにデリミタあるいは固定長フォーマットで書き込み可能なstepにする必要があります。

LineAggregator

LineTokenizer同様、アイテムをStringに変換する必要があり、ファイルへ書き込むために複数フィールドを単一の文字列へ集約する必要があります。Spring Batchでは、これはLineAggregatorが担います。インタフェース定義は以下の通りです。

public interface LineAggregator<T> {

    public String aggregate(T item);

}

LineAggregatorLineTokenizerは論理的な対応関係にあります。LineTokenizerStringFieldSetにし、一方、LineAggregatoritemStringにします。

PassThroughLineAggregator

LineAggregatorの一番簡単な実装はPassThroughLineAggregatorで、オブジェクトがすでに文字列であるか、オブジェクトの文字列表現を書き込んでも問題無い、と想定できる場合に使います。

public class PassThroughLineAggregator<T> implements LineAggregator<T> {

    public String aggregate(T item) {
        return item.toString();
    }
}

上の実装は、文字列生成を直接制御する必要はあるものの、FlatFileItemWriterの利点であるトランザクションやリスタート機能などは必要な場合に便利です。

Simplified File Writing Example

LineAggregatorインタフェースとその一番簡単な実装のPassThroughLineAggregatorを見たので書き込みの基本的なフローを解説します。

  1. 書き込むオブジェクトをLineAggregatorに渡してStringを得る。
  2. 返されたStringが設定したファイルに書き込まれる。

以下のFlatFileItemWriterの抜粋がそのコード部分です。

public void write(T item) throws Exception {
    write(lineAggregator.aggregate(item) + LINE_SEPARATOR);
}

簡単な設定例は以下の通りです。

Java Configuration

@Bean
public FlatFileItemWriter itemWriter() {
        return  new FlatFileItemWriterBuilder<Foo>()
                                   .name("itemWriter")
                                   .resource(new FileSystemResource("target/test-outputs/output.txt"))
                                   .lineAggregator(new PassThroughLineAggregator<>())
                                   .build();
}

FieldExtractor

上の例はファイル書き込みの一番簡単な使用法として有用です。しかし、FlatFileItemWriterのユーザは基本的にはドメインオブジェクトを書き込む必要があるため、これを行に変換する必要があります。ファイル読み込みでは、以下が必要でした。

  1. ファイルから1行読み込む。
  2. その行をLineTokenizer#tokenize()に渡してFieldSetを得る。
  3. トークン処理結果のFieldSetFieldSetMapperに渡し、ItemReader#read()が結果を返す。

ファイル書き込みも似た構造になりますが手順は逆になります。

  1. writerに書き込むアイテムを渡す。
  2. アイテムのフィールドを配列に変換。
  3. 配列を行に集約する。

フレームワークは書き込みたいオブジェクトのフィールドを知らないので、アイテムを配列に変換するタスク用のFieldExtractorを指定する必要があります。インタフェース定義は以下になります。

public interface FieldExtractor<T> {

    Object[] extract(T item);

}

FieldExtractorの実装はオブジェクトのフィールドから配列を生成し、デリミタ区切りの要素や固定長の行の一部にこの配列を使用します。

PassThroughFieldExtractor

配列・CollectionFieldSetなどコレクションを書き込む必要があるケースは多くあります。これらのコレクション型から配列を"抽出"するのは極めて単純で、コレクションから配列に変換します。よって、PassThroughFieldExtractorはそういう場合に使います。なお、非コレクション型のオブジェクトを渡す場合、PassThroughFieldExtractorはそのアイテムのみを含む配列を返します。

BeanWrapperFieldExtractor

前述の読み込みセクションのBeanWrapperFieldSetMapper同様、変換を自分で書くのではなく、ドメインオブジェクトを配列に変換したい場合に使います。BeanWrapperFieldExtractorのこの機能は以下の例のように使います。

BeanWrapperFieldExtractor<Name> extractor = new BeanWrapperFieldExtractor<Name>();
extractor.setNames(new String[] { "first", "last", "born" });

String first = "Alan";
String last = "Turing";
int born = 1912;

Name n = new Name(first, last, born);
Object[] values = extractor.extract(n);

assertEquals(first, values[0]);
assertEquals(last, values[1]);
assertEquals(born, values[2]);

extractor実装には必須プロパティが1つだけあり、マッピング用に使うフィールド名を渡しますBeanWrapperFieldSetMapperはオブジェクトのsetterにFieldSetマッピングするのにフィールド名を使います。BeanWrapperFieldExtractorはオブジェクト配列を生成するためにgetterとフィールド名の配列とをマッピングします。フィールド名配列の順序が配列内のフィールドの順序となる点に注意してください。

Delimited File Writing Example

一番簡単なフラットファイルフォーマットはすべてのフィールドがデリミタで区切ったものです。この場合はDelimitedLineAggregatorを使います。以下の例はクレジットと顧客アカウントを表すシンプルなオブジェクトを出力する例です。

public class CustomerCredit {

    private int id;
    private String name;
    private BigDecimal credit;

    //説明簡略化のためgetters,settersは省略
}

ドメインオブジェクトを使うため、FieldExtractor実装をデリミタ指定で必要があります。

Java Configuration

@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
        BeanWrapperFieldExtractor<CustomerCredit> fieldExtractor = new BeanWrapperFieldExtractor<>();
        fieldExtractor.setNames(new String[] {"name", "credit"});
        fieldExtractor.afterPropertiesSet();

        DelimitedLineAggregator<CustomerCredit> lineAggregator = new DelimitedLineAggregator<>();
        lineAggregator.setDelimiter(",");
        lineAggregator.setFieldExtractor(fieldExtractor);

        return new FlatFileItemWriterBuilder<CustomerCredit>()
                                .name("customerCreditWriter")
                                .resource(outputResource)
                                .lineAggregator(lineAggregator)
                                .build();
}

上の例では、このチャプター前半で解説したBeanWrapperFieldExtractorCustomerCreditのnameとcreditフィールドを配列に変換し、各フィールドをカンマで書き込みます。

なお、以下例のように、FlatFileItemWriterBuilder.DelimitedBuilderBeanWrapperFieldExtractorDelimitedLineAggregatorを内部的に自動生成する使い方も可能です。

Java Configuration

@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
        return new FlatFileItemWriterBuilder<CustomerCredit>()
                                .name("customerCreditWriter")
                                .resource(outputResource)
                                .delimited()
                                .delimiter("|")
                                .names(new String[] {"name", "credit"})
                                .build();
}

Fixed Width File Writing Example

デリミタがフラットファイルの唯一のフォーマットではありません。固定長という、フィールドの区切りに各カラムが長さを持つ場合もあります。Spring BatchはFormatterLineAggregatorで固定長を扱います。上の説明で使用したCustomerCreditの場合、以下のように使います。

Java Configuration

@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
        BeanWrapperFieldExtractor<CustomerCredit> fieldExtractor = new BeanWrapperFieldExtractor<>();
        fieldExtractor.setNames(new String[] {"name", "credit"});
        fieldExtractor.afterPropertiesSet();

        FormatterLineAggregator<CustomerCredit> lineAggregator = new FormatterLineAggregator<>();
        lineAggregator.setFormat("%-9s%-2.0f");
        lineAggregator.setFieldExtractor(fieldExtractor);

        return new FlatFileItemWriterBuilder<CustomerCredit>()
                                .name("customerCreditWriter")
                                .resource(outputResource)
                                .lineAggregator(lineAggregator)
                                .build();
}

今までに見てきた例とほぼ同じですが、formatプロパティ値はここで初めて登場します。

...
FormatterLineAggregator<CustomerCredit> lineAggregator = new FormatterLineAggregator<>();
lineAggregator.setFormat("%-9s%-2.0f");
...

これの内部実装はJava 5で導入されたFormatterをベースにしています。JavaFormatterC言語printfをベースにしています。formatter設定の詳細についてはFormatterを参照してください。

なお、以下例のように、FlatFileItemWriterBuilder.FormattedBuilderBeanWrapperFieldExtractorFormatterLineAggregatorを内部的に自動生成する使い方も可能です。

Java Configuration

@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
        return new FlatFileItemWriterBuilder<CustomerCredit>()
                                .name("customerCreditWriter")
                                .resource(outputResource)
                                .formatted()
                                .format("%-9s%-2.0f")
                                .names(new String[] {"name", "credit"})
                                .build();
}

Handling File Creation

FlatFileItemReaderとファイルリソースとの関係は極めてシンプルです。readerを初期化すると、(存在する)ファイルをオープンし、無ければ例外をスローします。ファイル書き込みはこのようにシンプルではありません。一見、FlatFileItemWriterにも同様なシンプルな関係があるように思えます。ファイルがあれば例外をスローし、無ければ生成して書き込みを開始する。ただし、Jobリスタートがあり得る場合は問題を起こす可能性があります。通常のリスタートでは、ファイルが存在する場合、最終ポジションから書き込みを開始し、存在しなければ例外をスローします。しかし、このjobのファイル名が常に同一の場合どうなるでしょうか? この場合、リスタートで無ければ、存在するファイルを削除したいと考えるはずです。こういった課題があるため、FlatFileItemWriterにはshouldDeleteIfExistsプロパティがあります。このプロパティをtrueにするとwriterオープン時に同一名のファイルが存在すれば削除します。

Spring Batch 4.1.x - Reference Documentation - Configuring a Stepのテキトー翻訳

https://docs.spring.io/spring-batch/4.1.x/reference/html/step.html#configureStep

https://qiita.com/kagamihoge/items/12fbbc2eac5b8a5ac1e0 俺の訳一覧リスト

1. Configuring a Step

ドメインのチャプターで解説したように、Stepは、バッチjobのシーケンシャルで独立したフェーズをカプセル化したもので、バッチ処理の制御と定義に必要なすべての情報を持ちます。Stepの中身はJobを書く開発者次第なため、やや曖昧な説明になっています。Stepは開発者次第でシンプルにも複雑にもなります。シンプルなStepはファイルからDBにロードし、要求されるコードは少しかゼロです(実装次第)。複雑なStepは、以下イメージ図のような、複雑なビジネスルール処理の一部になるものがあります。

Figure 1. Step

1.1. Chunk-oriented Processing

Spring Batchは一般的な実装スタイルのうちチャンク指向処理スタイルを使います。チャンク指向の処理は、1度に1データ読み取ってトランザクション境界内で書き込むチャンクを作成します。ItemReaderから1アイテム読み、ItemProcessorに渡し、集約します。読み込んだアイテム数がコミット間隔に達すると、ItemWriterがチャンクを書き込み、トランザクションをコミットします。以下のイメージが一連の処理です。

Figure 2. Chunk-oriented Processing

以下のコードは概念を示すコードです。

List items = new Arraylist();
for(int i = 0; i < commitInterval; i++){
    Object item = itemReader.read()
    Object processedItem = itemProcessor.process(item);
    items.add(processedItem);
}
itemWriter.write(items);

1.1.1. Configuring a Step

Stepの依存性のリストは比較的短いですが、多くの関連クラスを持つ場合があるので非常に複雑です。

java設定の場合、以下例のように、Spring Batchのビルダーが使えます。

Java Configuration

/**
 * JobRepositoryは基本的にはautowiredされるので明示的な設定は不要です。
 */
@Bean
public Job sampleJob(JobRepository jobRepository, Step sampleStep) {
    return this.jobBuilderFactory.get("sampleJob")
                            .repository(jobRepository)
                .start(sampleStep)
                .build();
}

/**
 * TransactionManagerは基本的にはautowiredされるので明示的な設定は不要です。
 */
@Bean
public Step sampleStep(PlatformTransactionManager transactionManager) {
        return this.stepBuilderFactory.get("sampleStep")
                                .transactionManager(transactionManager)
                                .<String, String>chunk(10)
                                .reader(itemReader())
                                .writer(itemWriter())
                                .build();
}

上の設定はアイテム指向stepを作るのに必要な依存性だけを入れています。

  • reader: processorにアイテムを与えるItemReader
  • writer: ItemReaderからのアイテムを処理するItemWriter
  • transactionManager: トランザクションの開始とコミットをするSpringのPlatformTransactionManager
  • repository: JobRepositoryは定期的にStepExecutionExecutionContextを保存する(コミットの直前)。
  • chunk: ここではアイテムベースのstepを意味し、トランザクションコミットの処理アイテム数を指定している。

なお、repositoryのデフォルトのbeanはjobRepositorytransactionManagertransactionManger(どちらも@EnableBatchProcessingで自動設定されます)。また、ItemProcessorは無くても良く、これによりアイテムをreaderからwriterに直接渡せます。

1.1.3. The Commit Interval

前述の通り、stepはアイテムを読んで書き込み、設定したPlatformTransactionManagerで定期的にコミットします。commit-intervalが1の場合、1アイテム書き込むたびにコミットします。ただ、トランザクションの開始とコミットは高コストなので、多くの場合にこれは理想的とは言えません。理想的には、各トランザクションでなるべく多数のアイテム処理するのが好ましく、これはstepが相互作用するリソースと処理データの特性に完全に依存します。このため、あるコミットで処理するアイテム数は変更可能です。以下の例はcommit-intervalが10のtaskletstepです。

Java Configuration

@Bean
public Job sampleJob() {
    return this.jobBuilderFactory.get("sampleJob")
                     .start(step1())
                     .end()
                     .build();
}

@Bean
public Step step1() {
        return this.stepBuilderFactory.get("step1")
                                .<String, String>chunk(10)
                                .reader(itemReader())
                                .writer(itemWriter())
                                .build();
}

上の例では、各トランザクションで10アイテム処理します。処理開始時にトランザクションが開始します。そして、各アイテムをItemReaderreadで読むたびにカウンターがインクリメントします。10に達すると、集約アイテムのリストをItemWriterに渡し、トランザクションをコミットします。

Configuring a Step for Restart

Configuring and Running a JobではJobのリスタートについて解説しました。リスタートは様々な影響をstepに与えるため、従って、何らかの設定が必要となります。

Setting a Start Limit

Stepの最大実行回数を制御したい場合があります。たとえば、ある特定のStepがリソースを無効化して再実行前に手動で戻す必要があるので、一度だけ実行するようにしたい、などです。設定はstepレベルで行い、異なるstepで異なる設定が可能です。一度しか実行出来ないStepは、制限無しのStepがあるJobと一緒に入れられます。以下のコード例はstart limit設定の例です。

Java Configuration

@Bean
public Step step1() {
        return this.stepBuilderFactory.get("step1")
                                .<String, String>chunk(10)
                                .reader(itemReader())
                                .writer(itemWriter())
                                .startLimit(1)
                                .build();
}

上のstepは一度だけ実行可能です。再実行するとStartLimitExceededExceptionをスローします。なお、start-limitのデフォルト値はInteger.MAX_VALUEです。

Restarting a Completed Step

restartable jobの場合、初めは成功したかどうかに関わらず、常に実行したいstepがある場合があります。validation stepや処理前にリソースのクリーンアップをするStepなどです。restarted jobの通常処理では、正常終了したことを示す'COMPLETED'ステータスとなり、これはスキップします。allow-start-if-completeを"true"に設定するとその挙動をオーバーライドして常に実行するようになります。以下が例です。

Java Configuration

@Bean
public Step step1() {
        return this.stepBuilderFactory.get("step1")
                                .<String, String>chunk(10)
                                .reader(itemReader())
                                .writer(itemWriter())
                                .allowStartIfComplete(true)
                                .build();
}

Step Restart Configuration Example

以下はリスタート可能なstepを持つjobの設定例です。

Java Configuration

@Bean
public Job footballJob() {
        return this.jobBuilderFactory.get("footballJob")
                                .start(playerLoad())
                                .next(gameLoad())
                                .next(playerSummarization())
                                .end()
                                .build();
}

@Bean
public Step playerLoad() {
        return this.stepBuilderFactory.get("playerLoad")
                        .<String, String>chunk(10)
                        .reader(playerFileItemReader())
                        .writer(playerWriter())
                        .build();
}

@Bean
public Step gameLoad() {
        return this.stepBuilderFactory.get("gameLoad")
                        .allowStartIfComplete(true)
                        .<String, String>chunk(10)
                        .reader(gameFileItemReader())
                        .writer(gameWriter())
                        .build();
}

@Bean
public Step playerSummarization() {
        return this.stepBuilderFactor.get("playerSummarization")
                        .startLimit(2)
                        .<String, String>chunk(10)
                        .reader(playerSummarizationSource())
                        .writer(summaryWriter())
                        .build();
}

上の設定例のjobは、フットボールのデータロードしてサマリをします。3つのstep、playerLoad, gameLoad, playerSummarizationがあります。playerLoadはフラットファイルからプレイヤーデータをロードし、gameLoadは同様にゲームデータをロードします。最後に、playerSummarizationは、ゲームをベースに各プレイヤーの統計を出力します。ここでの想定は、playerLoadのファイルロードは一度だけにしたいが、gameLoadはゲームデータをディレクトリからファイルで取得し、DBに正常ロード後はそのファイルを削除する、とします。この場合、playerLoadは特に設定を必要としません。何度でも実行可能ですが、completeになると、以降はスキップします。しかしgameLoadは最終実行以降にファイルが追加された場合は都度実行の必要があります。毎回実行するために'allow-start-if-complete'を'true'にしています。(DBのgamesテーブルはprocess indicatorを持ち、次のsummarization stepでその新規ゲームデータを参照可能になっている、という想定)。summarization stepはこのjobで最も重要で、start limitは2にしています。stepが連続して失敗し、job実行を制御するオペレータにexit codeを返します。そして、手動変更しない限り再開は出来なくなります。

※ 本ドキュメントのjob例はサンプルプロジェクトのfootballJobとは異なります。

このセクションのまとめとしてfootballJobの例を3回実行したときに起きることを解説します。

Run 1:

  1. playerLoadを実行すると正常終了し、'PLAYERS'テーブルに400プレイヤー追加する。
  2. gameLoadを実行するとゲームデータの11ファイルを処理し、'GAMES'テーブルにロードする。
  3. playerSummarizationを開始すると5分後に失敗する。

Run 2:

  1. playerLoadは既に正常終了済みなので実行しない。allow-start-if-completeは'false'(デフォルト)
  2. gameLoadは再実行して更に別の2ファイルを処理し、前回様に'GAMES'テーブルにロードする(このデータについてはprocess indicatorが未処理となる)。
  3. playerSummarizationはすべての残ゲームデータ(process indicatorでフィルタリング)を処理開始して30分後に失敗する。

Run 3:

  1. playerLoadは既に正常終了済みなので実行しない。allow-start-if-completeは'false'(デフォルト)
  2. gameLoadは再実行して更に別の2ファイルを処理し、前回様に'GAMES'テーブルにロードする(このデータについてはprocess indicatorが未処理となる)。
  3. playerSummarizationは開始せずjobが即時killされる。この時点でplayerSummarizationは3回目で、リミットが2なため。リミットを上げるか、そのJobで新規のJobInstanceを作る必要がある。

1.1.5. Configuring Skip Logic

エラーが発生してもStepを失敗にはせず、代わりにスキップしたい場合があります。基本的には、そこでのデータや事象の意味を理解する人が決定します。たとえば、金融データでは、送金は完全に実行する必要あるので、スキップ可能でないものもあります。一方、ベンダーリストのロードは、スキップ可能な場合があります。フォーマット間違いや必須情報の漏れでロード出来ない場合、おおむねスキップで問題ありません。たいていの場合、こうした不正レコードはログ出力します。これについては後に述べるリスナーで処理します。

以下はskip limitの使用例です。

Java Configuration

@Bean
public Step step1() {
        return this.stepBuilderFactory.get("step1")
                                .<String, String>chunk(10)
                                .reader(flatFileItemReader())
                                .writer(itemWriter())
                                .faultTolerant()
                                .skipLimit(10)
                                .skip(FlatFileParseException.class)
                                .build();
}

上の例ではFlatFileItemReaderを使用しています。この場合、どの段階においても、FlatFileParseExceptionをスローするとそのアイテムはスキップされてskip limit合計10に対してカウントアップします。step実行中のread, process, writeのスキップ数は別々にカウントしますが、limitはその全スキップに対して適用します。skip limitに達すると、次の例外スローでstepは失敗します。つまり、11回のスキップが例外をトリガするが、10回ではありません(the eleventh skip triggers the exception, not the tenth.)。

上の例の問題点としては、FlatFileParseException以外のその他すべての例外でJobは失敗になります。場合によってはこれは正しい動作となります。しかし、さらに場合によっては、失敗させる例外を指定してその他の全例外はスキップする方が簡単な場合があります。以下はその例です。

Java Configuration

@Bean
public Step step1() {
        return this.stepBuilderFactory.get("step1")
                                .<String, String>chunk(10)
                                .reader(flatFileItemReader())
                                .writer(itemWriter())
                                .faultTolerant()
                                .skipLimit(10)
                                .skip(Exception.class)
                                .noSkip(FileNotFoundException.class)
                                .build();
}

スキップ可能例外クラスにjava.lang.Exceptionを指定することで、すべてのExceptionsをスキップ可能に指定しています。ただし、'除外'としてjava.io.FileNotFoundExceptionを指定することで、すべてのExceptionsからFileNotFoundException除外した設定になります。もしその除外した例外が発生する(つまりスキップ不能な例外)はfatalになります。

例外発生時において、スキップ可能かどうかはクラス階層上の最も近いスーパークラスが決定します。未分類の例外は'fatal'扱いになります。

skipnoSkipの呼び出し順序は特に意味を持ちません。

1.1.6. Configuring Retry Logic

たいていの場合、例外はskipするかStep失敗のどちらかにします。ただし、すべての例外が決定的なわけではありません。いま、読み込み中にFlatFileParseExceptionをスローする場合、そのレコードに対しては常に例外をスローします。ItemReaderのリセットは無意味です。対して、DeadlockLoserDataAccessExceptionなど、プロセスが別のプロセスでロックするレコードの更新をしようとする場合などは、待機して再試行すれば成功します。この場合、以下のようにリトライを設定します。

@Bean
public Step step1() {
        return this.stepBuilderFactory.get("step1")
                                .<String, String>chunk(2)
                                .reader(itemReader())
                                .writer(itemWriter())
                                .faultTolerant()
                                .retryLimit(3)
                                .retry(DeadlockLoserDataAccessException.class)
                                .build();
}

Stepでは、リトライ可能なアイテム数と、リトライ可能な例外リストを設定可能です。リトライの挙動の詳細についてはretryを参照してください。

1.1.7. Controlling Rollback

デフォルトでは、リトライかスキップかを問わず、ItemWriterがスローする例外はStepで制御するトランザクションロールバックさせます。スキップを前述のように設定する場合、ItemReaderがスローする例外はロールバックになりません。ただし、ItemWriterの例外でロールバックできないケースは色々考えられますが、これはトランザクションを無効にするアクションが無い場合があるためです。このため、以下例のように、ロールバックをしない例外リストをStepに設定可能です。

Java Configuration

@Bean
public Step step1() {
        return this.stepBuilderFactory.get("step1")
                                .<String, String>chunk(2)
                                .reader(itemReader())
                                .writer(itemWriter())
                                .faultTolerant()
                                .noRollback(ValidationException.class)
                                .build();
}

Transactional Readers

ItemReaderの基本的な役割は直線的で戻ることは無いです。stepはreaderの入力をバッファし、これはロールバック時にアイテムをreaderから再読み込みする必要を無くすためです。しかし、JMSキューなど、readerがトランザクショナルリソースの一番最初に置かれるケースがあるにはあります。この場合、キューがロールバックするトランザクションに関連付けられるので、キューからプル済みのメッセージは戻されます。このため、下記例のように、アイテムをバッファしないstepを設定可能です。

Java Configuration

@Bean
public Step step1() {
        return this.stepBuilderFactory.get("step1")
                                .<String, String>chunk(2)
                                .reader(itemReader())
                                .writer(itemWriter())
                                .readerIsTransactionalQueue()
                                .build();
}

1.1.8. Transaction Attributes

トランザクション属性(Transaction attributes)は、isolation, propagation, timeout、の設定を制御するのに使います。詳細についてはSpring core documentationを参照してください。以下のサンプルではisolation, propagation, timeoutトランザクション属性を設定しています。

Java Configuration

@Bean
public Step step1() {
        DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
        attribute.setPropagationBehavior(Propagation.REQUIRED.value());
        attribute.setIsolationLevel(Isolation.DEFAULT.value());
        attribute.setTimeout(30);

        return this.stepBuilderFactory.get("step1")
                                .<String, String>chunk(2)
                                .reader(itemReader())
                                .writer(itemWriter())
                                .transactionAttribute(attribute)
                                .build();
}

1.1.9. Registering ItemStream with a Step

stepはライフサイクルの必要時点でItemStreamのコールバックを処理する必要があります。(ItemStreamインタフェースの詳細についてはItemStreamを参照)。これはstepが失敗してリスタート可能にする必要がある場合は必須で、その理由は、ItemStreamはstepで必要な実行間の永続化状態を取得する場所なためです。

ItemReader, ItemProcessor, ItemWriterItemStreamを実装すると、それらは自動登録されます。それ以外のstreamは別途登録します。このstreamは間接的な依存性、デリゲートなど、になる事が多く、readerとwriterにインジェクションされます。streamは、以下例のように、'streams'要素でStepに登録します。

Java Configuration

@Bean
public Step step1() {
        return this.stepBuilderFactory.get("step1")
                                .<String, String>chunk(2)
                                .reader(itemReader())
                                .writer(compositeItemWriter())
                                .stream(fileItemWriter1())
                                .stream(fileItemWriter2())
                                .build();
}

/**
 * Spring Batch 4では、CompositeItemWriterはItemStreamを実装しているので以下は不要ですが
 * 例として作成しています。
 */
@Bean
public CompositeItemWriter compositeItemWriter() {
        List<ItemWriter> writers = new ArrayList<>(2);
        writers.add(fileItemWriter1());
        writers.add(fileItemWriter2());

        CompositeItemWriter itemWriter = new CompositeItemWriter();

        itemWriter.setDelegates(writers);

        return itemWriter;
}

上記のサンプルでは、CompositeItemWriterItemStreamではありませんが、デリゲート先に処理を任せます。このため、デリゲート先のwriterはフレームワーク側で正しく認識出来るようにstreamとして明示的に登録の必要があります。ItemReaderはstreamとしての明示的な登録は必要ありませんが、これはStepのプロパティとして直接指定しているためです。上記のstepはリスタート可能で、readerとwriterの状態は失敗イベント時に正しく永続化されます。

1.1.10. Intercepting Step Execution

Job同様、Step実行中には様々なイベントが発生し、そこでユーザが何らかの機能を実行したい場合があります。たとえば、フッターが必要なフラットファイルを書き出すには、フッターを書き込めるように、Step完了時にItemWriterへ通知の必要があります。是の実現にはStepスコープのリスナの一つを使います。

StepListener(これ自体はマーカーに過ぎない)の拡張の一つを実装するクラスはlisteners要素でstepに適用します。listeners要素は、step, tasklet, chunk、で妥当です。なお、そのリスナ関数を適用するレベルに対し宣言することを推奨します。もし単一クラスで複数リスナ(StepExecutionListenerItemReadListenerなど)を兼ねる場合、最も細かいレベルで宣言します。以下はchunkにリスナを適用する例です。

Java Configuration

@Bean
public Step step1() {
        return this.stepBuilderFactory.get("step1")
                                .<String, String>chunk(10)
                                .reader(reader())
                                .writer(writer())
                                .listener(chunkListener())
                                .build();
}

<step>要素もしくは*StepFactoryBeanファクトリの一つを使用する場合で、ItemReader, ItemWriter, ItemProcessor自身がStepListenerの一つを実装しているとStepに自動登録されます。これはStepに直接インジェクションするコンポーネントにだけ適用されます。もしリスナが別コンポーネント内にネストする場合、明示的な登録が必要です(Registering ItemStream with a Stepで解説)。

StepListenerインタフェースに加えて、同様な処理のためのアノテーションがあります。POJOにこれらアノテーションを付与するメソッドを持たせると、対応するStepListenerに変換します。ItemReader, ItemWriter, Taskletなどchunkコンポネントの実装にそうしたアノテーションを付与するのが良くある使い方です。このアノテーションは、<listener/>要素用のXMLパーサーがアナライズするように、ビルダーのlistenerでも登録をします。よって、stepにリスナーを登録するには、XML namespaceかビルダーのどちらかを使えば良い、ということです。

StepExecutionListener

StepExecutionListenerStep実行における最も汎用的なリスナーです。Step開始・終了後に、完了・失敗に関わらず、通知を受けます。例は以下の通りです。

public interface StepExecutionListener extends StepListener {

    void beforeStep(StepExecution stepExecution);

    ExitStatus afterStep(StepExecution stepExecution);

}

afterStepの戻り値型ExitStatusによってリスナーでStepの完了コードを修正可能です。

このインタフェースに対応するアノテーションは以下です。

  • @BeforeStep
  • @AfterStep

ChunkListener

chunkはトランザクションスコープ内で処理するアイテムとして定義します。コミットインターパルで、トランザクションをコミットすると、chunkをコミットします。ChunkListenerはchunk処理開始かchunk正常処理完了後にロジックを挟むのに使います。以下がインタフェースです。

public interface ChunkListener extends StepListener {

    void beforeChunk(ChunkContext context);
    void afterChunk(ChunkContext context);
    void afterChunkError(ChunkContext context);

}

beforeChunkメソッドはトランザクション開始後でItemReaderのread呼び出し前に呼ばれます。逆に、afterChunkメソッドはchunkコミット後によばれます(ロールバックが起きてない場合に限る)。

このインタフェースに対応するアノテーションは以下です。

  • @BeforeChunk
  • @AfterChunk
  • @AfterChunkError

ChunkListenerはchunk宣言が無い場合にも適用可能です。TaskletStepChunkListenerを呼ぶ責任があるため、非アイテム指向のtaskletにも同様に適用します(taskletの前後に呼ばれる)。

ItemReadListener

前述のskip logicで解説したように、スキップレコードを後々追えるようにログを残すと有益な場合がある、と述べました。読込エラーの場合には、以下インタフェース定義に示す、ItemReaderListenerで実現できます。

public interface ItemReadListener<T> extends StepListener {

    void beforeRead();
    void afterRead(T item);
    void onReadError(Exception ex);

}

beforeReadメソッドはItemReaderの毎回のread呼び出しの前に呼ばれます。afterReadはreadが正常に完了して読みだしたアイテムを次に渡し終えた後に呼ばれます。読み出し中に何らかのエラーが発生した場合、onReadErrorメソッドが呼ばれます。発生した例外をログ出力するなどが可能です。

このインタフェースに対応するアノテーションは以下です。

  • @BeforeRead
  • @AfterRead
  • @OnReadError

ItemProcessListener

ItemReadListener同様、アイテムのprocess処理も、以下インタフェース定義に示すような形で、リッスンが可能です。

public interface ItemProcessListener<T, S> extends StepListener {

    void beforeProcess(T item);
    void afterProcess(T item, S result);
    void onProcessError(T item, Exception e);

}

beforeProcessメソッドはItemProcessorprocess前で処理アイテムを渡す前に呼ばれます。afterProcessメソッドはアイテム正常処理後に呼ばれます。処理中にエラーが発生した場合、onProcessErrorメソッドが呼ばれます。発生した例外とそこで処理対象だったアイテムが渡されるので、それをログ出力などします。

このインタフェースに対応するアノテーションは以下です。

  • @BeforeProcess
  • @AfterProcess
  • @OnProcessError

ItemWriteListener

アイテム書き込みのリッスンはItemWriteListenerで行い、そのインタフェース定義は以下の通りです。

public interface ItemWriteListener<S> extends StepListener {

    void beforeWrite(List<? extends S> items);
    void afterWrite(List<? extends S> items);
    void onWriteError(Exception exception, List<? extends S> items);

}

beforeWriteメソッドはItemWriterwrite前で書き込みアイテムリストを渡す前に呼ばれます。afterWriteメソッドはアイテム正常書き込み後に呼ばれます。エラーが発生した場合、onWriteErrorメソッドが呼ばれます。発生した例外と書き込み対象アイテムが渡されるので、それをログ出力などします。

このインタフェースに対応するアノテーションは以下です。

  • @BeforeWrite
  • @AfterWrite
  • @OnWriteError

SkipListener

ItemReadListener, ItemProcessListener, ItemWriteListenerはいずれもエラー通知の機構を備えますが、スキップしたレコードについての情報は得られません。たとえば、onWriteErrorは、アイテムがリトライして成功したとしても呼ばれます(is called even if an item is retried and successful.)。このため、スキップしたアイテムをトラッキングするために別のインターフェスが存在し、以下のようなインタフェース定義になります。

public interface SkipListener<T,S> extends StepListener {

    void onSkipInRead(Throwable t);
    void onSkipInProcess(T item, Throwable t);
    void onSkipInWrite(S item, Throwable t);

}

onSkipInReadは読み込み中にスキップしたアイテムがあれば呼ばれます。ロールバックすると、複数回スキップにより同一アイテムを登録する場合があることに注意してください(It should be noted that rollbacks may cause the same item to be registered as skipped more than once.)。onSkipInWriteは書き込み中にアイテムスキップした場合に呼ばれます。アイテムは正常に読み込まれている(かつ未スキップ)ので、引数にそのアイテムが渡されます。

このインタフェースに対応するアノテーションは以下です。

  • @OnSkipInRead
  • @OnSkipInWrite
  • @OnSkipInProcess

SkipListeners and Transactions

SkipListenerの最もよくある使い方はスキップアイテムのログ出力で、このログは別のバッチ処理かあるいは人間がスキップとなった原因を修正したり調査するのに使います。大本のトランザクションロールバックとなるケースは多数考えられ、Spring Batchは以下2点を保証します。

  1. 適切なスキップメソッド(発生するエラーのタイミングに依存)がアイテムごとに1度だけ呼ばれる。
  2. SkipListenerトランザクションコミット前に必ず常に呼ばれる。これにより、リスナーで呼ぶなんらかのトランザクショナルリソースがItemWriterでの失敗によってロールバックしないようにしています。

1.2. TaskletStep

chunk指向処理だけがStepの唯一の処理方法ではありません。Stepがシンプルにストアド呼び出しするだけの場合はどうでしょうか。ItemReaderで呼び出してストアド実行後にnullを返すような作りには出来ます。しかし、これは少々不自然で、何もしないItemWriterを作らなければなりません。Spring BatchではこうしたケースではTaskletStepを使います。

Taskletexecuteという1つのメソッドを持つシンプルなインタフェースで、TaskletStepが反復呼び出しを行い、RepeatStatus.FINISHEDを返すか失敗を示す例外をスローするまで実行します。Taskletの毎回の呼び出しはトランザクションでラップします。Taskletの実装者は、ストアド・スクリプト・シンプルなSQL更新、などを行います。

TaskletStepを作成するには、ビルダーのtaskletメソッドにTaskletを実装したbeanを渡します。TaskletStepをビルダーで作る際にはchunkを呼ぶ必要はありません。以下はシンプルなtaskletの例です。

@Bean
public Step step1() {
    return this.stepBuilderFactory.get("step1")
                            .tasklet(myTasklet())
                            .build();
}

TaskletStepはもしtaskletがStepListenerを実装する場合はStepListenerとしても自動的に登録します。

1.2.1. TaskletAdapter

ItemReaderItemWriterのアダプタ同様、TaskletにもTaskletAdapterというSpring Batchが用意するアダプタクラスがあります。これが有用な一例としてはレコードセットのフラグ更新に既存のDAOを使う場合です。TaskletAdapterにより、以下例のように、Taskletのアダプターを記述することなくDAOのメソッドを呼び出せます。

Java Configuration

@Bean
public MethodInvokingTaskletAdapter myTasklet() {
        MethodInvokingTaskletAdapter adapter = new MethodInvokingTaskletAdapter();

        adapter.setTargetObject(fooDao());
        adapter.setTargetMethod("updateFoo");

        return adapter;
}

1.2.2. Example Tasklet Implementation

たいていのバッチjobaは、各種リソースのセットアップためメイン処理開始前に実行すべきstepや、そうしたリソースをクリーンアップする処理の終了後に実行すべきstepがあります。巨大なファイルを扱うjobの場合、別の場所へ正常にアップロード完了後にローカルの対象ファイルを削除する事がほとんどです。以下の例(Spring Batch samples projectの抜粋)はまさにそのような事をするTasklet実装の例です。

public class FileDeletingTasklet implements Tasklet, InitializingBean {

    private Resource directory;

    public RepeatStatus execute(StepContribution contribution,
                                ChunkContext chunkContext) throws Exception {
        File dir = directory.getFile();
        Assert.state(dir.isDirectory());

        File[] files = dir.listFiles();
        for (int i = 0; i < files.length; i++) {
            boolean deleted = files[i].delete();
            if (!deleted) {
                throw new UnexpectedJobExecutionException("Could not delete file " +
                                                          files[i].getPath());
            }
        }
        return RepeatStatus.FINISHED;
    }

    public void setDirectoryResource(Resource directory) {
        this.directory = directory;
    }

    public void afterPropertiesSet() throws Exception {
        Assert.notNull(directory, "directory must be set");
    }
}

上のTasklet実装は所定ディレクトリ内の全ファイルを削除します。executeメソッドは一度だけ呼ばれることに注意してください。あとはStepからこのTaskletを参照します。

Java Configuration

@Bean
public Job taskletJob() {
        return this.jobBuilderFactory.get("taskletJob")
                                .start(deleteFilesInDir())
                                .build();
}

@Bean
public Step deleteFilesInDir() {
        return this.stepBuilderFactory.get("deleteFilesInDir")
                                .tasklet(fileDeletingTasklet())
                                .build();
}

@Bean
public FileDeletingTasklet fileDeletingTasklet() {
        FileDeletingTasklet tasklet = new FileDeletingTasklet();

        tasklet.setDirectoryResource(new FileSystemResource("target/test-outputs/test-dir"));

        return tasklet;
}

1.3. Controlling Step Flow

jobで複数stepをグループ化する機能では、あるstepから別のstepへのjob "flows" を制御する方法が必要となります。Stepの失敗が必ずしもJobの失敗を意味しません。また、Stepが次を実行すべきか決定するための'success'が複数種類存在する場合もあります。Stepsのグループ設定次第では、ある特定のstepが全く実行されない場合がありえます。

1.3.1. Sequential Flow

もっともシンプルなflowのケースは、以下イメージのように、すべてのstepをシーケンシャルに実行するjobです。

Figure 3. Sequential Flow

こういう設定をするにはstep要素の'next'属性を以下の例のように使います。

Java Configuration

@Bean
public Job job() {
        return this.jobBuilderFactory.get("job")
                                .start(stepA())
                                .next(stepB())
                                .next(stepC())
                                .build();
}

上のケースでは、'step A'がStepリストの最初にあるので最初に実行します。'step A'が正常終了すると、'step B'を実行し、その後は同様です。ただし、'step A'が失敗する場合、Job全体が失敗して、'step B'は実行しません。

1.3.2. Conditional Flow

上記例では、2パターンのみ存在します。

  1. Stepが成功して次のStepを実行する。
  2. Stepが失敗するとJobが失敗する。

これで十分なケースも多いです。しかし、失敗にするのではなく、Stepの失敗が別のStepをトリガーするケースはどうでしょうか。以下がそうしたflowのイメージです。

Figure 4. Conditional Flow

複雑なケースを扱うために、Spring Batch namespaceではstep要素内に定義する遷移要素を用意しています。そうした遷移要素の一つがnext要素です。next属性同様、next要素は次に実行するStepJobに指示します。ただし、属性とは異なり、Stepには任意数のnext要素を指定可能で、その場合失敗時のデフォルトの振る舞いは存在しません。つまり、遷移要素を使う場合、Step遷移に対するすべての振る舞いを明示的に定義する必要があります。なお、単一stepはnext属性とtransition要素両方を指定できません。

next要素にはマッチするパターンと次に実行するstepを、以下サンプルのように、指定します。

Java Configuration

@Bean
public Job job() {
        return this.jobBuilderFactory.get("job")
                                .start(stepA())
                                .on("*").to(stepB())
                                .from(stepA()).on("FAILED").to(stepC())
                                .end()
                                .build();
}

java設定を使う場合はonメソッドにシンプルなパターンマッチングを使い、Step実行の結果であるExitStatusにマッチさせるものを指定します。

パターンには2つの特殊文字を使用可能です。

  • "*"は0以上の文字列にマッチ。
  • "?"は1文字にマッチ。

例えば、"c*t"は"cat"と"count"にマッチし、"c?t"は"cat"にはマッチするが"count"にはしない。

Stepに配置する遷移要素に上限はありませんが、Step実行が返すExitStatusに対応する要素が無い場合、フレームワークは例外をスローしてJobは失敗します。フレームワークは遷移順序を最もマッチするものからしないもの順で決定します。つまり、上記例で順序を"stepA"に入れ替えたとしても、"FAILED"のExitStatusは"stepC"に遷移します(This means that, even if the ordering were swapped for "stepA" in the example above, an ExitStatus of "FAILED" would still go to "stepC".)。

Batch Status Versus Exit Status

条件付きflowでJobを設定する場合、BatchStatusExitStatusの違いを意識する事は重要です。BatchStatusenumで、JobExecutionStepExecution双方のプロパティであり、フレームワークJobStepのステータスを記録するために使います。以下の値のいずれか1つになります。COMPLETED, STARTING, STARTED, STOPPING, STOPPED, FAILED, ABANDONED, UNKNOWN。これらの多くは自己説明的です。COMPLETEDはstepやjobが正常終了した場合のステータスで、FAILEDは失敗時、などです。

以下の例はJava設定で'on'要素を持つものです。

...
.from(stepA()).on("FAILED").to(stepB())
...

一目見た感じでは、'on'がStepBatchStatusを参照するように見えます。しかし、実際にはStepExitStatusを参照します。名前が示すように、ExitStatusはそのStepの終了後のステータスを表します。

英語で書くなら、"exit codeがFAILEDであればstepBに遷移する"( "go to stepB if the exit code is FAILED")、です。デフォルトではexit codeはStepBatchStatusと常に同じになり、これが上記サンプルがなぜ動作するのかの理由です。ただし、exit codeを別の値にしたい場合は? 好例がsamples projectのskip sample jobにあります。

Java Configuration

@Bean
public Job job() {
        return this.jobBuilderFactory.get("job")
                        .start(step1()).on("FAILED").end()
                        .from(step1()).on("COMPLETED WITH SKIPS").to(errorPrint1())
                        .from(step1()).on("*").to(step2())
                        .end()
                        .build();
}

step1には3パターンがありえます。

  1. Stepが失敗、jobも失敗。
  2. Stepが正常終了。
  3. Stepが正常終了するがexit codeはCOMPLETED WITH SKIPS'。この場合、エラー処理に別のstepを実行する。

上記の設定は動作します。ただし、以下例のように、レコードスキップ条件でexit codeを変更するように修正します。

public class SkipCheckingListener extends StepExecutionListenerSupport {
    public ExitStatus afterStep(StepExecution stepExecution) {
        String exitCode = stepExecution.getExitStatus().getExitCode();
        if (!exitCode.equals(ExitStatus.FAILED.getExitCode()) &&
              stepExecution.getSkipCount() > 0) {
            return new ExitStatus("COMPLETED WITH SKIPS");
        }
        else {
            return null;
        }
    }
}

上のStepExecutionListenerはまず、Stepが正常終了かつStepExecutionのskip countが0より大きい、かどうかをチェックします。両条件を満たす場合、新規ExitStatusをexit code COMPLETED WITH SKIPSで返します。

1.3.3. Configuring for Stop

BatchStatus and ExitStatusの解説を読んだ後、JobBatchStatusExitStatusの決定ルールを知りたいと感じたかもしれません。Stepのステータスは実行するコードで決定し、Jobは設定で決定します。

これまでに解説したjob設定はすべて少なくとも一つの遷移無しの最終Stepを持っています。たとえば、以下例のように、step実行後、Jobは終了します。

@Bean
public Job job() {
        return this.jobBuilderFactory.get("job")
                                .start(step1())
                                .build();
}

Stepに遷移先が無い場合、Jobのステータスは以下のように決定します。

  • その最終StepExitStatus FAILEDで終了する場合、JobBatchStatusExitStatusは両方ともFAILEDになる。
  • それ以外の場合、JobBatchStatusExitStatusは両方ともCOMPLETEDになる。

ある種のバッチジョブ、シンプルなシーケンシャルstepのjobなどでは、上記の終了ルールで十分ですが、カスタム定義のjob停止が必要な場合もあります。そうした用途のために、Spring BatchはJobを停止するための遷移要素を3つ用意しています(前述のnext要素に加えて)。それらの停止要素はJobを特定のBatchStatusで停止します。なお、その停止遷移要素はJobのいずれのStepsBatchStatusあるいはExitStatusのどちらにも影響を与えません。これらの要素はJobの最終ステータスにだけ影響を与えます。たとえば、jobのすべてのstepがFAILEDでありながら、jobはCOMPLETEDに出来ます。

Ending at a Step

stepを終了するとBatchStatusCOMPLETEDjobを停止します。ステータスCOMPLETEDで終了するJobはリスタート出来ません(フレームワークJobInstanceAlreadyCompleteExceptionをスローする)。

Java設定を使う場合、そのタスクには'end'メソッドを使います。また、endメソッドはJobExitStatusをカスタマイズするための'exitStatus'パラメータを取ることも可能です。'exitStatus'を指定しないはデフォルトでExitStatusCOMPLETEDとなり、BatchStatusもそうなります。

以下のケースでは、step2が失敗するとJobBatchStatus COMPLETEDおよびExitStatus COMPLETEDで停止してstep3は実行しません。そうでない場合、step3に遷移します。なお、step2が失敗する場合はJobはリスタート出来ません(ステータスがCOMPLETEDなので)。

@Bean
public Job job() {
        return this.jobBuilderFactory.get("job")
                                .start(step1())
                                .next(step2())
                                .on("FAILED").end()
                                .from(step2()).on("*").to(step3())
                                .end()
                                .build();
}

Failing a Step

所定ポイントでstepを失敗するよう設定するとJobBatchStatus FAILEDで停止します。endと異なり、Job失敗でリスタート不能にはなりません。

以下のケースでは、step2が失敗するとJobBatchStatus FAILED ExitStatus EARLY TERMINATIONで停止してstep3は実行しません。そうでない場合、step3に遷移します。また、step2が失敗してJobをリスタートすると、step2から再開します。

Java Configuration

@Bean
public Job job() {
        return this.jobBuilderFactory.get("job")
                        .start(step1())
                        .next(step2()).on("FAILED").fail()
                        .from(step2()).on("*").to(step3())
                        .end()
                        .build();
}

Stopping a Job at a Given Step

特定のstepでjobを停止する設定をすると、JobBatchStatus STOPPEDで停止します。Job停止により処理は一時停止するため、オペレーターはJobの再開前になんらかの作業が出来ます。

java設定の場合、stopAndRestartメソッドは'restart'属性を必要とし、この属性にはJobリスタート時にピックアップするstepを指定します。

以下のケースでは、step1COMPLETEで終了するとjobは停止します。リスタートするとstep2から再開します。

@Bean
public Job job() {
        return this.jobBuilderFactory.get("job")
                        .start(step1()).on("COMPLETED").stopAndRestart(step2())
                        .end()
                        .build();
}

1.3.4. Programmatic Flow Decisions

あるケースでは、ExitStatus以外の情報も使用して次に実行するstepを決定したい場合があります。この場合、以下サンプルのように、決定を行うJobExecutionDeciderを使用します。

public class MyDecider implements JobExecutionDecider {
    public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
        String status;
        if (someCondition()) {
            status = "FAILED";
        }
        else {
            status = "COMPLETED";
        }
        return new FlowExecutionStatus(status);
    }
}

次の例では、Java設定でJobExecutionDeciderを実装するbeanをnextに直接渡しています。

Java Configuration

@Bean
public Job job() {
        return this.jobBuilderFactory.get("job")
                        .start(step1())
                        .next(decider()).on("FAILED").to(step2())
                        .from(decider()).on("COMPLETED").to(step3())
                        .end()
                        .build();
}

1.3.5. Split Flows

これまでに解説したケースのJobはいずれも線形に一度に1つのstepを実行します。この一般的なスタイルに加えて、Spring Batchはjobをparallel flowsに設定可能です。

Java設定ではビルダーを用いて設定をスプリットできます。以下に示す例は、'split'要素は複数の'split'要素を持ち、そこで個々のflowを定義します。また、'split'要素には、これまでに解説した遷移要素、'next'属性や'next'要素・'end'や'fail'要素、を入れられます。

@Bean
public Job job() {
        Flow flow1 = new FlowBuilder<SimpleFlow>("flow1")
                        .start(step1())
                        .next(step2())
                        .build();
        Flow flow2 = new FlowBuilder<SimpleFlow>("flow2")
                        .start(step3())
                        .build();

        return this.jobBuilderFactory.get("job")
                                .start(flow1)
                                .split(new SimpleAsyncTaskExecutor())
                                .add(flow2)
                                .next(step4())
                                .end()
                                .build();
}

1.3.6. Externalizing Flow Definitions and Dependencies Between Jobs

jobのflowの一部を別のbeanとして切り出すことで、再使用できます。これには2通りの方法があります。1つは単に、以下に示すように、flowをどこか別の場所で定義したbeanとして宣言します。

Java Configuration

@Bean
public Job job() {
        return this.jobBuilderFactory.get("job")
                                .start(flow1())
                                .next(step3())
                                .end()
                                .build();
}

@Bean
public Flow flow1() {
        return new FlowBuilder<SimpleFlow>("flow1")
                        .start(step1())
                        .next(step2())
                        .build();
}

上の例のように外部flowを定義すると、インラインで定義しているかのように外部flowをjobにstepを追加できることにになります。この方法により、多数のjobが同じテンプレートflowを参照可能となり、そのテンプレートを別のflowに組み込むことも出来ます。また、個々のflowのインテグレーションテストを分離するのにも役立ちます。

flow外部化の別の方法はJobStepの使用です。JobStepFlowStepと似てますが、flow内のstepを別のjob実行として実際に作成・実行します。

以下はJobStepの例です。

Java Configuration

@Bean
public Job jobStepJob() {
        return this.jobBuilderFactory.get("jobStepJob")
                                .start(jobStepJobStep1(null))
                                .build();
}

@Bean
public Step jobStepJobStep1(JobLauncher jobLauncher) {
        return this.stepBuilderFactory.get("jobStepJobStep1")
                                .job(job())
                                .launcher(jobLauncher)
                                .parametersExtractor(jobParametersExtractor())
                                .build();
}

@Bean
public Job job() {
        return this.jobBuilderFactory.get("job")
                                .start(step1())
                                .build();
}

@Bean
public DefaultJobParametersExtractor jobParametersExtractor() {
        DefaultJobParametersExtractor extractor = new DefaultJobParametersExtractor();

        extractor.setKeys(new String[]{"input.file"});

        return extractor;
}

job parameters extractorはStepExecutionContextを実行JobJobParametersに変換する方法を決定するものです。JobStepはjobとstepのモニタリングとレポーティングに細かいオプションをつけたい場合に有用です。また、JobStepは以下の質問に対する回答でもあります。"job間の依存関係をどのように作るのか?" これは大規模システムを小さいモジュールとjobのflow制御に分割する優れた方法です。

1.4. Late Binding of Job and Step Attributes

これまでに解説したXMLとフラットファイルサンプルはいずれもファイル取得にSpringのResourceを使用します。ResourcegetFilejava.io.Fileを返すので機能します。XMLとフラットファイルのリソースは以下サンプルのようにSpringの機能で設定できます。

Java Configuration

@Bean
public FlatFileItemReader flatFileItemReader() {
        FlatFileItemReader<Foo> reader = new FlatFileItemReaderBuilder<Foo>()
                        .name("flatFileItemReader")
                        .resource(new FileSystemResource("file://outputs/file.txt"))
                        ...
}

上のResourceは指定のファイルシステムロケーションからファイルをロードします。注意点として、抽象ロケーション(absolute locations)はスラッシュ2個(//)で開始します。たいていのSpringアプリケーションで、このやり方で十分であり、これらリソース名はコンパイル時に決定しているためです。ただし、バッチのケースでは、ファイル名はjobのパラメータで動的に決定する場合もあります。この場合にはシステムプロパティを読み込み'-D'パラメータを使います。

以下はプロパティからファイル名を読み込み方法です。

Java Configuration

@Bean
public FlatFileItemReader flatFileItemReader(@Value("${input.file.name}") String name) {
        return new FlatFileItemReaderBuilder<Foo>()
                        .name("flatFileItemReader")
                        .resource(new FileSystemResource(name))
                        ...
}

この方法を動かすにはシステム引数(-Dinput.file.name="file://outputs/file.txt"など)を指定します。

※ ここでは``PropertyPlaceholderConfigurerが使用可能ですが、システムプロパティが常にセットされる場合は不要です。これはSpringのResourceEditor```がシステムプロパティのフィルタリングとプレースホルダー置換を行うためです。

また、バッチ設定において、システムプロパティではなくjobのJobParametersでファイル名をパラメータ化してアクセスしたい場合があります。これをするには以下のように、Spring Batchでは各種JobStepで遅延バインディングが可能です。

Java Configuration

@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters['input.file.name']}") String name) {
        return new FlatFileItemReaderBuilder<Foo>()
                        .name("flatFileItemReader")
                        .resource(new FileSystemResource(name))
                        ...
}

以下例のようにJobExecutionStepExecutionExecutionContextは同様な方法でアクセス可能です。

Java Configuration

@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobExecutionContext['input.file.name']}") String name) {
        return new FlatFileItemReaderBuilder<Foo>()
                        .name("flatFileItemReader")
                        .resource(new FileSystemResource(name))
                        ...
}

Java Configuration

@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{stepExecutionContext['input.file.name']}") String name) {
        return new FlatFileItemReaderBuilder<Foo>()
                        .name("flatFileItemReader")
                        .resource(new FileSystemResource(name))
                        ...
}

※ 遅延バインディングを使うbeanはscope="step"を宣言する必要があります。詳細はStep Scopeを参照。

1.4.1. Step Scope

上で取り上げた遅延バインディングの例は、以下サンプルのように、bean定義にすべて"step"スコープを宣言しています。

Java Configuration

@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters[input.file.name]}") String name) {
        return new FlatFileItemReaderBuilder<Foo>()
                        .name("flatFileItemReader")
                        .resource(new FileSystemResource(name))
                        ...
}

遅延バインディングを使うにはStepスコープが必要で、これは属性を参照するには、Step開始までbeanをインスタンス化出来ないためです。このスコープはSpringコンテナのデフォルトには含まれないので、scopeの明示的な追加が必要です。追加するには、batch namespace・StepScopeのbean定義を明示的に追加・@EnableBatchProcessingの使用、のどれか1つを使用します。以下はbatch namespaceの使用例です。

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:batch="http://www.springframework.org/schema/batch"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="...">
<batch:job .../>
...
</beans>

以下の例は明示的なbean定義の追加です。

<bean class="org.springframework.batch.core.scope.StepScope" />

1.4.2. Job Scope

Jobスコープは、Spirng Batch 3.0で導入され、Stepスコープと同様のものですがJobコンテキストのスコープで、ジョブ実行時にそのbeanのインスタンスが1つだけになります。また、参照の遅延バインディングが可能で、JobContext#{..}でアクセスできます。この機能により、以下例に示すように、jobやjob execution contextおよびジョブパラメータ、からbeanプロパティを取得できます。

Java Configuration

@JobScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters[input]}") String name) {
        return new FlatFileItemReaderBuilder<Foo>()
                        .name("flatFileItemReader")
                        .resource(new FileSystemResource(name))
                        ...
}

Java Configuration

@JobScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobExecutionContext['input.name']}") String name) {
        return new FlatFileItemReaderBuilder<Foo>()
                        .name("flatFileItemReader")
                        .resource(new FileSystemResource(name))
                        ...
}

Springコンテナにはデフォルトで含まれないスコープなので、明示的な追加が必要です。batch namespace・JobScopeのbean定義を明示的に追加・@EnableBatchProcessingの使用、のどれか1つを使用します。以下はbatch namespaceの使用例です。

<beans xmlns="http://www.springframework.org/schema/beans"
                  xmlns:batch="http://www.springframework.org/schema/batch"
                  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                  xsi:schemaLocation="...">

<batch:job .../>
...
</beans>

以下の例はJobScopeのbeanを明示的に追加しています。

<bean class="org.springframework.batch.core.scope.JobScope" />

Spring Batch 4.1.x - Reference Documentation - Configuring and Running a Jobのテキトー翻訳

https://docs.spring.io/spring-batch/4.1.x/reference/html/job.html#configureJob

https://qiita.com/kagamihoge/items/12fbbc2eac5b8a5ac1e0 俺の訳一覧リスト

1. Configuring and Running a Job

domain sectionでは、以下図を用いてアーキテクチャデザイン全体について解説しました。

Figure 1. Batch Stereotypes

Jobオブジェクトはstepの単なるコンテナのように見えますが、開発者が知っておいた方が良い多数の設定オプションがあります。また、Jobの実行方法とそのメタデータが実行中にどのように格納されるのかにも注意が必要です。このチャプターでは各種設定オプションとJob実行時の注意事項について解説します。

1.1. Configuring a Job

Jobインタフェースには複数の実装がありますが、ビルダーが設定の違いを吸収します。

@Bean
public Job footballJob() {
    return this.jobBuilderFactory.get("footballJob")
                     .start(playerLoad())
                     .next(gameLoad())
                     .next(playerSummarization())
                     .end()
                     .build();
}

Job(とそれに含むStep)はJobRepositoryを必要とします。JobRepositoryの設定はBatchConfigurerで行います。

上は3つのStepインスタンスを持つJobの例です。また、jobのビルダーでは、パラレル(Split)関連、宣言的なフロー制御(Decision)、フロー定義(Flow)の外部化、も設定出来ます。

1.1.1. Restartability

バッチjob実行時の重要な課題の一つはリスタート時のJobの振る舞いについてです。もし特定のJobInstanceに対するJobExecutionが既に存在する場合、Jobの実行はリスタートと見なします。理想的には、すべてのjobは失敗した箇所から開始可能であるべきですが、それが可能ではないケースがあります。あるケースで新規のJobInstanceを生成する事を保証するのは開発者の責任です。ただし、Spring Batchはその補助機能を提供します。Jobをリスタート不可にする場合、常に新規のJobInstanceで実行し、restartableのプロパティを'false'に設定します。

Java Configuration

@Bean
public Job footballJob() {
    return this.jobBuilderFactory.get("footballJob")
                     .preventRestart()
                     ...
                     .build();
}

別の言い方をすると、restartableをfalseにする事は"このJobは再開出来ない"を意味します。restartableがfalseのJobをリスタートするとJobRestartExceptionをスローします。

Job job = new SimpleJob();
job.setRestartable(false);

JobParameters jobParameters = new JobParameters();

JobExecution firstExecution = jobRepository.createJobExecution(job, jobParameters);
jobRepository.saveOrUpdate(firstExecution);

try {
    jobRepository.createJobExecution(job, jobParameters);
    fail();
}
catch (JobRestartException e) {
    // ここを通る
}

上のJUnitコードは、リスタート不可のjobの1回目に実行するためのJobExecutionを生成し、特に例外は起きません。そして、2回目にはJobRestartExceptionをスローします。

1.1.2. Intercepting Job Execution

Jobの実行中に、何らかのカスタムコードを差し込むために、ライフサイクルの各種イベント通知が有用な場合があります。SimpleJobは適時JobListenerを呼ぶことでこれを実装しています。

public interface JobExecutionListener {

    void beforeJob(JobExecution jobExecution);

    void afterJob(JobExecution jobExecution);

}

SimpleJobのjobにJobListenersを追加するにはlisteners要素で行います。

Java Configuration

@Bean
public Job footballJob() {
    return this.jobBuilderFactory.get("footballJob")
                     .listener(sampleListener())
                     ...
                     .build();
}

なお、Jobの成功か失敗かに依らずafterJobは呼ばれます。成功か失敗かを判別したい場合はJobExecutionから取得します。

public void afterJob(JobExecution jobExecution){
    if( jobExecution.getStatus() == BatchStatus.COMPLETED ){
        //job success
    }
    else if(jobExecution.getStatus() == BatchStatus.FAILED){
        //job failure
    }
}

インタフェースに対応するアノテーションは以下の通りです。

  • @BeforeJob
  • @AfterJob

1.1.4. JobParametersValidator

jobをXMLで宣言したり、AbstractJobのサブクラスにする場合、実行時にjobパラメータ用のvalidatorをオプションで宣言できます。たとえば、すべての必須パラメータを検証してjobを開始したい場合に役立ちます。DefaultJobParametersValidatorでは必須およびオプションの単純なパラメータの組み合わせ検証が可能で、複雑な検証にはインタフェースを自前で実装します。

validatorの設定はjavaのbuilderでも設定可能です。

@Bean
public Job job1() {
    return this.jobBuilderFactory.get("job1")
                     .validator(parametersValidator())
                     ...
                     .build();
}

1.2. Java Config

Spring 3ではXMLに加えてjavaでのアプリケーション設定機能が追加されました。Spring Batch 2.2.0現在、バッチjobはjava configで設定可能です。javaベースの設定には2つのコンポーネントがあり、@EnableBatchProcessingと2つのビルダーがあります。

@EnableBatchProcessingはSpringファミリーの@Enable~アノテーションと同様の動作をします。@EnableBatchProcessingはバッチjobを組み立てるためのベースとなる設定を提供します。ベース設定内で、StepScopeインスタンスが作られ、他にもいくつかのbeanがautowired可能にしています。

  • JobRepository - bean name "jobRepository"
  • JobLauncher - bean name "jobLauncher"
  • JobRegistry - bean name "jobRegistry"
  • PlatformTransactionManager - bean name "transactionManager"
  • JobBuilderFactory - bean name "jobBuilders"
  • StepBuilderFactory - bean name "stepBuilders"

この設定のコアとなるインタフェースはBatchConfigurerです。デフォルト実装は上のbeanを提供し、コンテキストにDataSourceのbean定義が必要です。このデータソースはJobRepositoryで使用します。BatchConfigurerのカスタム実装を作ることで上述のbeanのカスタマイズが可能です。基本的には、DefaultBatchConfigurerBatchConfigurerがコンテキスに無ければこれを使用する)を拡張して必要なgetterをオーバーライドします。なお、スクラッチから実装することも可能です。以下の例はカスタムのtransaction managerを使用する例です。

@Bean
public BatchConfigurer batchConfigurer() {
        return new DefaultBatchConfigurer() {
                @Override
                public PlatformTransactionManager getTransactionManager() {
                        return new MyTransactionManager();
                }
        };
}

@EnableBatchProcessingを持つconfigクラスは1つだけにして下さい。このアノテーションを1つでもどこかのクラスに付与すると、上述のbeanが利用可能になります。

所定のベースconfigにおいて、job設定のためのビルダーのファクトリーを使用可能です。以下はJobBuilderFactoryStepBuilderFactoryで2つのstepのjobを設定する例です。

@Configuration
@EnableBatchProcessing
@Import(DataSourceConfiguration.class)
public class AppConfig {

    @Autowired
    private JobBuilderFactory jobs;

    @Autowired
    private StepBuilderFactory steps;

    @Bean
    public Job job(@Qualifier("step1") Step step1, @Qualifier("step2") Step step2) {
        return jobs.get("myJob").start(step1).next(step2).build();
    }

    @Bean
    protected Step step1(ItemReader<Person> reader,
                         ItemProcessor<Person, Person> processor,
                         ItemWriter<Person> writer) {
        return steps.get("step1")
            .<Person, Person> chunk(10)
            .reader(reader)
            .processor(processor)
            .writer(writer)
            .build();
    }

    @Bean
    protected Step step2(Tasklet tasklet) {
        return steps.get("step2")
            .tasklet(tasklet)
            .build();
    }
}

1.3. Configuring a JobRepository

@EnableBatchProcessingを使う場合、特に設定無くJobRepositoryを使えます。このセクションでは自前での設定についてを解説します。

前述の通り、JobRepositoryはSpring Batch内の各種永続化オブジェクト、JobExecutionStepExecutionなど、のための基本的なCRUD操作で使います。このクラスはフレームワークの主要機能、JobLauncher, Job, Step、など多くの場所で使用します。

java設定の場合、JobRepositoryが使用可能です。DataSourceがあればJDBCベースの実装が使われ、無ければMapベースが使われます。ただし、BatchConfigurerの実装を利用してJobRepositoryのカスタマイズは可能です。

Java Configuration

...
// 以下はBatchConfigurerの実装内に置くものとする。
@Override
protected JobRepository createJobRepository() throws Exception {
    JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
    factory.setDataSource(dataSource);
    factory.setTransactionManager(transactionManager);
    factory.setIsolationLevelForCreate("ISOLATION_SERIALIZABLE");
    factory.setTablePrefix("BATCH_");
    factory.setMaxVarCharLength(1000);
    return factory.getObject();
}
...

上記の設定オプションは、 dataSourceとtransactionManagerを除いて、いずれも必須ではありません。設定しない場合、上のサンプルコードに書かれたデフォルト値を使います。They are shown above for awareness purposes. max varchar lengthのデフォルトは2500で、sample schema scriptsのlong VARCHARカラムのlengthになります。

1.3.1. Transaction Configuration for the JobRepository

namespaceもしくはFactoryBeanを使用する場合、transactional adviceがrepositoryのaroundに自動的に作られます。これは、バッチメタデータに含まれる状態のうち、失敗後のリスタートに必要な状態が正しく永続化される、事を保証します。repositoryのメソッドが非トランザクションの場合、フレームワーク振る舞いが不定になります。The isolation level in the create* method attributes is specified separately to ensure that when jobs are launched, もし2つのプロセスが同時に同一jobを起動すると片方だけが成功します。このメソッド用のデフォルトのisolation levelはSERIALIZABLEで、この設定はかなり攻めているため、READ_COMMITTEDでも同様に機能します。2つのプロセスが衝突をしないような場合にREAD_COMMITTEDが適します。ただ、create*メソッドの呼び出しは極めて短時間なので、DBがサポートしていれば、SERIALIZEDが問題となる可能性は低いです。とはいえ、オーバーライドは可能です。

Java Configuration

// 以下はBatchConfigurerの実装内に置くものとする。
@Override
protected JobRepository createJobRepository() throws Exception {
    JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
    factory.setDataSource(dataSource);
    factory.setTransactionManager(transactionManager);
    factory.setIsolationLevelForCreate("ISOLATION_REPEATABLE_READ");
    return factory.getObject();
}

namespaceあるいはfactory beansを使わない場合、AOPを使用するrepositoryのtransactionalな振る舞いの設定が必要です。

Java Configuration

@Bean
public TransactionProxyFactoryBean baseProxy() {
        TransactionProxyFactoryBean transactionProxyFactoryBean = new TransactionProxyFactoryBean();
        Properties transactionAttributes = new Properties();
        transactionAttributes.setProperty("*", "PROPAGATION_REQUIRED");
        transactionProxyFactoryBean.setTransactionAttributes(transactionAttributes);
        transactionProxyFactoryBean.setTarget(jobRepository());
        transactionProxyFactoryBean.setTransactionManager(transactionManager());
        return transactionProxyFactoryBean;
}

1.3.2. Changing the Table Prefix

JobRepositoryのもう一つの設定可能なプロパティにメタデータテーブルのプレフィクスがあります。デフォルトではすべてBATCHが頭につきます。BATCH_JOB_EXECUTIONとBATCH_STEP_EXECUTIONなどです。このプレフィクスを変更しなければならないケースが存在します。スキーマ名をテーブル名に付ける必要があるとか、同一スキーマ内に複数のメタデータテーブルを作る必要があるなどで、この場合はテーブルのプレフィクスを変更する必要があります。

Java Configuration

// 以下はBatchConfigurerの実装内に置くものとする。
@Override
protected JobRepository createJobRepository() throws Exception {
    JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
    factory.setDataSource(dataSource);
    factory.setTransactionManager(transactionManager);
    factory.setTablePrefix("SYSTEM.TEST_");
    return factory.getObject();
}

上のように変更すると、メタデータテーブルに対するすべてのクエリに"SYSTEM.TEST_"のプレフィクスがつきます。BATCH_JOB_EXECUTIONはSYSTEM.TEST_JOB_EXECUTIONで参照します。

※ テーブルのプレフィクスだけが変更可能です。テーブルとカラム名は変更できません。

1.3.3. In-Memory Repository

DBにドメインオブジェクトの永続化をしたくないケースが存在します。1つは速度で、コミットポイントごとのドメインオブジェクトの保存には幾分かの時間がかかります。もう1つは、特定のjobに限っては状態を永続化したくない場合です。このような用途のために、Spring Batchはjob repositoryのインメモリMap版を用意しています。

Java Configuration

// 以下はBatchConfigurerの実装内に置くものとする。
@Override
protected JobRepository createJobRepository() throws Exception {
    JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
    factory.setDataSource(dataSource);
    factory.setTransactionManager(transactionManager);
    factory.setIsolationLevelForCreate("ISOLATION_REPEATABLE_READ");
    return factory.getObject();
}

インメモリのrepositoryは揮発性でJVMインスタンス間を超えてのリスタートが出来ない点に注意してください。同一パラメータの2つのjobインスタンスを同時に実行する事も保証出来ないので、マルチスレッドJobには適しておらず、また、locally partitioned Stepも同様です。よって、それらの機能を使いたい場合にはrepositoryのDBバージョンを使用してください。

なお、transaction managerを定義する必要があり、その理由はrepository内にはrollback semanticsがあるのとビジネスロジックにはたいていtransactionalな箇所(RDBMSアクセス)があるためです。テスト目的ではResourcelessTransactionManagerを使うと便利です。

1.3.4. Non-standard Database Types in a Repository

使用するDBがサポート対象リストに無い場合、SQLバリアントがなるべく近いものを、サポート対象から1つ選べば使える可能性があります。これを設定するにはnamespaceショートカットの代わりにJobRepositoryFactoryBeanを使用し、一番近いDBタイプをセットします。

Java Configuration

// 以下はBatchConfigurerの実装内に置くものとする。
@Override
protected JobRepository createJobRepository() throws Exception {
    JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
    factory.setDataSource(dataSource);
    factory.setDatabaseType("db2");
    factory.setTransactionManager(transactionManager);
    return factory.getObject();
}

JobRepositoryFactoryBeanはDBタイプが未指定の場合はDataSourceから自動検出します。)プラットフォームごとの主な違いはプライマリキーのインクリメント方法で、これが異なる場合にはincrementerFactoryをオーバーライドする必要があります(Spring Frameworkの標準実装の一つを選んで使用する)。

これが動作しない場合、もしくはRDBMSでは無い場合、SimpleJobRepositoryが依存する各種のDaoインタフェースを実装してSpringの作法に沿ってマニュアルでそれらのbeanをワイヤリングするのが唯一の解決策です。

1.4. Configuring a JobLauncher

@EnableBatchProcessingを使う場合、特に設定無くJobRegistryを使えます。このセクションでは自前での設定についてを解説します。

JobLauncherインタフェースのベーシックな実装がSimpleJobLauncherです。このオブジェクトの依存性は、executionを取得するための、JobRepositoryだけです。

Java Configuration

// 以下はBatchConfigurerの実装内に置くものとする。
...
// This would reside in your BatchConfigurer implementation
@Override
protected JobLauncher createJobLauncher() throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        jobLauncher.afterPropertiesSet();
        return jobLauncher;
}
...

JobExecutionを取得すると、Jobの実行メソッドに渡され、最終的に呼び出し元にJobExecutionを返します。

Figure 2. Job Launcher Sequence

このシーケンスは単純化したものでスケジューラから起動する場合にはうまくいきます。しかし、HTTPリクエスト経由の起動時には問題があります。この場合、起動は非同期で行う必要があり、SimpleJobLauncherは呼び出し元へ即時リターンする必要があります。バッチなど長時間実行プロセスでHTTPリクエストをオープンにしたままにするのは良くありません。

Figure 3. Asynchronous Job Launcher Sequence

SimpleJobLauncherTaskExecutorを設定することでこのようなケースに簡単に対応できます。

Java Configuration

@Bean
public JobLauncher jobLauncher() {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository());
        jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
        jobLauncher.afterPropertiesSet();
        return jobLauncher;
}

jobの非同期実行制御にはTaskExecutorインタフェースの実装であれば何でも使用可能です。

1.5. Running a Job

最低点、バッチjobの実行には2つの要素、実行するJobJobLauncher、が必要です。両者は同一コンテキストでも異なるコンテキストでも構いません。例えば、CLIからjobを起動する場合、各jobごとに新規のJVMインスタンス化し、各jobごとに固有のJobLauncherを持ちます。ただし、HttpRequestスコープ内のwebコンテナから起動する場合、通常JobLauncherは1つで、これは複数リクエストがjobを起動するために非同期job実行の設定にします。

1.5.1. Running Jobs from the Command Line

エンタープライズのスケジューラからjobを起動するユーザにとっては、CLIが主要なインターフェースになります。たいていのスケジューラ(NativeJobを使用しないQuartzを除く)はOSのプロセスと直接やり取りして、基本的にはシェルスクリプトでキックします。Javaプロセスを起動するには、スクリプトの他、Perl, Ruby, antやmavenなど'build tools'、などがあります。とはいえ、たいていはシェルスクリプトが最もポピュラーで、このサンプルでもシェルにフォーカスします。

The CommandLineJobRunner

jobの起動スクリプトJava Virtual Machineをキックするので、エントリーポイントとなるmainメソッドを持つクラスが必要です。Spring BatchはまさしくそのためのCommandLineJobRunnerクラスを用意しています。ただし、このクラスはアプリケーションをブートする1つの方法に過ぎず、Javaプロセスを起動する方法は複数あり、このクラスだけが唯一の方法ではありません。CommandLineJobRunnerは4つのタスクを実行します。

  • 適切なApplicationContextのロード
  • コマンドライン引数をパースしてJobParametersに入れる
  • 引数に基づくjobの特定
  • job実行にアプリケーションコンテキストのJobLauncherを使用

これらのタスクはすべて渡す引数のみで実行します。以下が必須の引数です。

Table 1. CommandLineJobRunner arguments

|jobPath|ApplicationContextを生成するためのXMLファイルの場所。このファイルにはJob実行に必要な情報をすべて持つ必要がある。| |jobName|実行するjob名|

これらの引数は最初がpathで次がnameにします。それらの後に来る引数はすべてJobParametersと見なし、'name=value'の形式で渡します。

<bash$ java CommandLineJobRunner io.spring.EndOfDayJobConfiguration endOfDay schedule.date(date)=2007/05/05

通常はjarのmainクラスの宣言にはマニフェストを使用しますが、ここでは説明簡易化のため、直接クラスを指定しています。この例ではdomainLanguageOfBatchと同一の'EndOfDay'サンプルを使用しています。最初の引数は'io.spring.EndOfDayJobConfiguration'でJobを持つconfigクラスの完全修飾クラス名です。次の引数は'endOfDay'でjob名です。最後の引数'schedule.date(date)=2007/05/05'はJobParametersになります。java configの例は以下の通りです。

@Configuration
@EnableBatchProcessing
public class EndOfDayJobConfiguration {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job endOfDay() {
        return this.jobBuilderFactory.get("endOfDay")
                                    .start(step1())
                                    .build();
    }

    @Bean
    public Step step1() {
        return this.stepBuilderFactory.get("step1")
                                    .tasklet((contribution, chunkContext) -> null)
                                    .build();
    }
}

この例はかなり単純化したもので、通常はSpring Batchでバッチjobを動かすには他にも色々なものが必要ですが、CommandLineJobRunnerが必要とする2つのコンポーネントJobJobLauncher、がある事を示しています。

ExitCodes

CLIからバッチjobを起動する場合、基本的にはエンタープライズのスケジューラを用います。スケジューラの多くはシンプル(fairly dumb)でプロセスレベルで動作します。つまり、スケジューラは、シェルスクリプトなど、スケジューラが呼び出したOSプロセスについてだけ関知します。この場合、jobの成功か失敗かをスケジューラに戻してやり取りする唯一の方法はリターンコードだけです。リターンコードはプロセスがスケジューラに返す数値で、実行結果を意味します。最もシンプルな場合、0が成功で、1が失敗です。しかし、より複雑なシナリオが考えられます。たとえば、もしjob Aのreturn 4はjob Bをキックし、return 5はjob Cをキックする、などです。この種の振る舞いはスケジューラレベルで設定しますが、重要なのは、Spring Batchなどのフレームワークは特定バッチjobの'Exit Code'の数値表現を返す方法がある、という点です。Spring BatchではこれをExitStatusで表現しており、詳細はChapter 5で解説します。完了コード(exit code)の解説の点で、知るべき最も重要な点は、ExitStatusには完了コードプロパティがありこの値はフレームワーク(か開発者)が設定し、JobLauncherが返すJobExecutionの一部として返されます。CommandLineJobRunnerExitCodeMapperインタフェースで文字列の完了コードを数値に変換します。

public interface ExitCodeMapper {

    public int intValue(String exitCode);

}

ExitCodeMapperの役割は、文字列の完了コードを数値表現で返すことです。ジョブランナーのデフォルト実装はSimpleJvmExitCodeMapperで、0が正常終了、1が汎用エラー、2はコンテキストにJobが見つからなかったなどのジョブランナーのエラーです。3以上が必要な場合、ExitCodeMapperのカスタム実装が必要です。CommandLineJobRunnerApplicationContextを作成するクラスで、明示的なワイヤリングが書けないので、上書きしたい値はautowiredする必要があります。つまり、ExitCodeMapperの実装がBeanFactoryにある場合、コンテキスト作成後にランナーにインジェクトされます。自前のExitCodeMapperを使うにはrootレベルbeanとして宣言し、ランナーがロードするApplicationContextの一部となるようにしておきます。

1.5.2. Running Jobs from within a Web Container

歴史的に、バッチジョブなどのオフライン処理は上述のようなCLIで起動していました。しかし、HttpRequest経由で実行するケースがよりベターな選択肢な事も増えました。レポーティング・アドホックなジョブ実行・webアプリケーションサポートなどなどです。定義上バッチジョブは長時間実行するので、最も重要な関心事は非同期なジョブ起動です。

Figure 4. Asynchronous Job Launcher Sequence From Web Container

ここでのcontrollerはSpring MVCのコントローラーです。Spring MVCの詳細については https://docs.spring.io/spring/docs/current/spring-framework-reference/web.html#mvc を参照してください。コントローラーは非同期モードに設定したJobLauncherJobを実行します。このJobLauncherJobExecutionを即時リターンします。Jobは実行中のままですが、ノンブロッキングな振る舞いによりコントローラーでHttpRequest処理時に必要な即時リターンが出来ます。以下はその例です。

@Controller
public class JobLauncherController {

    @Autowired
    JobLauncher jobLauncher;

    @Autowired
    Job job;

    @RequestMapping("/jobLauncher.html")
    public void handle() throws Exception{
        jobLauncher.run(job, new JobParameters());
    }
}

1.6. Advanced Meta-Data Usage

ここまで、JobLauncherJobRepositoryインタフェースを解説してきました。共に、jobの起動と、バッチドメインオブジェクトの基礎的なCRUD操作を行います。

Figure 5. Job Repository

JobLauncherJobExecutionの新規オブジェクトの生成と実行にJobRepositoryを使います。Jobの実行中、JobStepの実装は実行の更新に同一のJobRepositoryを使います。基礎的な操作はシンプルなケースでは十分ですが、数百バッチジョブと複雑なスケジューリング要求の大規模なバッチ環境では、メタデータのより高度なアクセスが必要です。

Figure 6. Advanced Job Repository Access

JobExplorerJobOperatorインタフェースは以降で解説します。これらはメタデータの問い合わせと制御機能を持ちます。

1.6.1. Querying the Repository

高度な機能の前に知るべき基本的な事としては、このインタフェースは既存のexecutionsのリポジトリを問い合わせる機能があります。JobExplorerインタフェースが提供する機能は以下です。

public interface JobExplorer {

    List<JobInstance> getJobInstances(String jobName, int start, int count);

    JobExecution getJobExecution(Long executionId);

    StepExecution getStepExecution(Long jobExecutionId, Long stepExecutionId);

    JobInstance getJobInstance(Long instanceId);

    List<JobExecution> getJobExecutions(JobInstance jobInstance);

    Set<JobExecution> findRunningJobExecutions(String jobName);
}

上のメソッドシグネチャの通り、JobExplorerJobRepositoryのリードオンリーバージョンで、JobRepositoryのように、factory beanを使用して設定します。

Java Configuration

...
// 以下はBatchConfigurerの実装内に置くものとする。
@Override
public JobExplorer getJobExplorer() throws Exception {
        JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean();
        factoryBean.setDataSource(this.dataSource);
        return factoryBean.getObject();
}
...

このチャプター前半で、バージョンやスキーマが異なる場合に備えてJobRepositoryのテーブルプレフィクスは変更可能、と説明しました。JobExplorerもそれらのテーブルを参照するので、こちらでもプレフィクスを設定する必要があります。

Java Configuration

...
// 以下はBatchConfigurerの実装内に置くものとする。
@Override
public JobExplorer getJobExplorer() throws Exception {
        JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean();
        factoryBean.setDataSource(this.dataSource);
        factoryBean.setTablePrefix("SYSTEM.");
        return factoryBean.getObject();
}

1.6.2. JobRegistry

JobRegistry(と親インタフェースJobLocator)は必須では無く、コンテキストで利用可能なjobをトラッキングしたい場合に有用です。別のアプリケーションコンテキスト(子コンテキストなど)で作られるjobがある場合、そうしたjobを集中管理する場合にも有用です。カスタムのJobRegistryであれば登録済みjob名とその他プロパティを操作できます。フレームワークの提供する実装は1つだけで、これはjob名からjobインスタンスというシンプルなmapベースです。

@EnableBatchProcessingを使う場合、JobRegistryは特に設定無く使えます。自前の設定をするには以下のようにします。

// @EnableBatchProcessingで提供するクラスなので、SimpleBatchConfigurationのgetterを
// オーバーライドでカスタマイズが可能。
@Override
@Bean
public JobRegistry jobRegistry() throws Exception {
        return new MapJobRegistry();
}

自動的にJobRegistryを処理するには2つの方法があり、bean post processorとregistrar lifecycle componentです。これらは以降のセクションで解説します。

JobRegistryBeanPostProcessor

bean post-processorですべてのjobが生成後にそれらを登録します。

Java Configuration

@Bean
public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor() {
    JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
    postProcessor.setJobRegistry(jobRegistry());
    return postProcessor;
}

Although it is not strictly necessary, the post-processor in the example has been given an id so that it can be included in child contexts (e.g. as a parent bean definition) and cause all jobs created there to also be registered automatically.

AutomaticJobRegistrar

子コンテキストを作成し、job作成時にそのコンテキストのjobを登録するライフサイクルコンポーネントです。この利点は、子コンテキストのjob名はレジストリ内でグローバルに一意ですが、その依存性は"natural" nameを持つ場合があります。たとえば、複数のXML設定ファイルがそれぞれ1つだけJobを持ち、同一のbean名、例えば"reader"、でItemReaderのそれぞれ異なる定義を持つ場合です。これらのXMLファイルを同一コンテキストにインポートすると、reader定義はクラッシュして別の定義をオーバーライドしてしまいますが、automatic registrarではこれを回避します。これにより、アプリケーションの分割モジュール由来のjobを簡単に統合できます。

Java Configuration

@Bean
public AutomaticJobRegistrar registrar() {

    AutomaticJobRegistrar registrar = new AutomaticJobRegistrar();
    registrar.setJobLoader(jobLoader());
    registrar.setApplicationContextFactories(applicationContextFactories());
    registrar.afterPropertiesSet();
    return registrar;

}

このregistrarは2つの必須プロパティがあり、1つはApplicationContextFactory(ここでは簡易的にfactory beanから生成している)の配列で、もう1つはJobLoaderです。JobLoaderは子コンテキストのライフサイクル管理とJobRegistry内のjob登録を担当します。

ApplicationContextFactoryは子コンテキスト生成の責務があり、最も一般的な使用法は上述のClassPathXmlApplicationContextFactoryとしてです。このファクトリの機能の1つは、デフォルトでは、設定のいくつかを親コンテキストから子にコピーします。よって、親と同じ設定を使いたい場合、子でPropertyPlaceholderConfigurerAOP設定を再定義する必要がありません。

必要に応じてAutomaticJobRegistrarJobRegistryBeanPostProcessorと組み合わせて使用可能です(DefaultJobLoaderを使用する場合に限る)。メインの親コンテキストにも子にもjobを定義する場合、こちらが望ましい場合があります。

1.6.3. JobOperator

上述の通り、JobRepositoryメタデータCRUD操作を提供し、JobExplorerメタデータのリードオンリーの操作を提供します。これらの操作の組み合わせは、バッチオペレータがよく行うJobの停止・リスタート・サマライズなどの、一般的なモニタリングタスクの実行に有用です。Spring Batchはその種の操作をJobOperatorで提供します。

public interface JobOperator {

    List<Long> getExecutions(long instanceId) throws NoSuchJobInstanceException;

    List<Long> getJobInstances(String jobName, int start, int count)
          throws NoSuchJobException;

    Set<Long> getRunningExecutions(String jobName) throws NoSuchJobException;

    String getParameters(long executionId) throws NoSuchJobExecutionException;

    Long start(String jobName, String parameters)
          throws NoSuchJobException, JobInstanceAlreadyExistsException;

    Long restart(long executionId)
          throws JobInstanceAlreadyCompleteException, NoSuchJobExecutionException,
                  NoSuchJobException, JobRestartException;

    Long startNextInstance(String jobName)
          throws NoSuchJobException, JobParametersNotFoundException, JobRestartException,
                 JobExecutionAlreadyRunningException, JobInstanceAlreadyCompleteException;

    boolean stop(long executionId)
          throws NoSuchJobExecutionException, JobExecutionNotRunningException;

    String getSummary(long executionId) throws NoSuchJobExecutionException;

    Map<Long, String> getStepExecutionSummaries(long executionId)
          throws NoSuchJobExecutionException;

    Set<String> getJobNames();

}

上記の操作はJobLauncher, JobRepository, JobExplorer, JobRegistryなど多数の異なるインタフェース由来のメソッドを集約したものです。よって、JobOperatorの実装、SimpleJobOperatorは多数の依存性を持ちます。

 /**
  * このbeanにインジェクトされるすべての依存性は@EnableBatchProcessingが作成する。
  */
 @Bean
 public SimpleJobOperator jobOperator(JobExplorer jobExplorer,
                                JobRepository jobRepository,
                                JobRegistry jobRegistry) {

        SimpleJobOperator jobOperator = new SimpleJobOperator();

        jobOperator.setJobExplorer(jobExplorer);
        jobOperator.setJobRepository(jobRepository);
        jobOperator.setJobRegistry(jobRegistry);
        jobOperator.setJobLauncher(jobLauncher);

        return jobOperator;
 }

※ job repositoryにテーブルプレフィクスを設定する場合、忘れずにjob explorerにも設定してください。

1.6.4. JobParametersIncrementer

JobOperatorの大半は見たままの挙動です、詳細についてはavadoc of the interfaceにあります。ただし、startNextInstanceには注意が必要です。このメソッドは常にJobの新規インスタンスを開始します。JobExecutionに致命的な問題が発生して最初からJobをやり直したい場合に特に有効です。しかし、JobLauncherがパラメータが前回とは異なる場合に新規JobInstanceを開始する新規のJobParametersを必要とするのとは違って、startNextInstanceメソッドは、新規インスタンスJobに強制するために、Jobに紐付くJobParametersIncrementerを使用します。

public interface JobParametersIncrementer {

    JobParameters getNext(JobParameters parameters);

}

JobParametersIncrementerの責務は、与えられたJobParametersを使用し、これに含まれる何らかの必須の値をインクリメントして'次の'JobParametersオブジェクトを返します。これは、'次の'インスタンスを作るのにJobParametersの何を変更すれば良いのかをフレームワークが知らない場合に有用です。たとえば、JobParametersにdateが1つだけの場合、次回のインスタンスを作る必要がありますが、1日進めるのか、それとも1週間でしょうか。Jobの識別に何らかの数値を使う場合にも同じことが言えます。以下がその例です。

public class SampleIncrementer implements JobParametersIncrementer {

    public JobParameters getNext(JobParameters parameters) {
        if (parameters==null || parameters.isEmpty()) {
            return new JobParametersBuilder().addLong("run.id", 1L).toJobParameters();
        }
        long id = parameters.getLong("run.id",1L) + 1;
        return new JobParametersBuilder().addLong("run.id", id).toJobParameters();
    }
}

この例では、'run.id'キーをJobInstancesの識別に使います。JobParametersがnullの場合、Jobをまだ一度も実行していないと見なして初期値を返します。そうでない場合、古い値を取得し、1増やして返します。

ビルダーのincrementerメソッドでJobにincrementerを関連付けします。

@Bean
public Job footballJob() {
    return this.jobBuilderFactory.get("footballJob")
                                     .incrementer(sampleIncrementer())
                                     ...
                     .build();
}

1.6.5. Stopping a Job

JobOperatorのよくある使い方の一つはJobのgracefullyな停止です。

Set<Long> executions = jobOperator.getRunningExecutions("sampleJob");
jobOperator.stop(executions.iterator().next());

即時シャットダウン強制する手段がないので、即時シャットダウンはしません。フレームワークの制御下に無いビジネスサービスなど開発者のコードを実行中の場合は特にそうです。しかし、制御がフレームワークに戻り次第、StepExecutionBatchStatus.STOPPEDをセットしてセーブし、JobExecutionにも終了前に同様の処理をします。

1.6.6. Aborting a Job

FAILEDのjob実行はリスタート可能です(Jobがrestartableの場合)。ABANDONEDのjob実行はフレームワークによるリスタートは出来ません。なお、ABANDONEDはリスタートしたjob実行内でstep実行をskippableにするのにも使います。もし以前に失敗したjob実行でABANDONEDとなったstepがある状態でjob実行する場合、その次のstep(jobフロー定義とstep実行完了ステータスが決定する)に移行します。

プロセスが死んだ("kill -9"やサーバエラー)場合、当然そのjobは動作していませんが、プロセスが死ぬ前に誰もJobRepositoryに通知しないのでそのことを知る術がありません。失敗かアボート(ステータスをFAILEDABANDONEDに変更)のどちらにするかを決めて手動更新する必要があります。これは運用判断で自動決定は出来ません。restartableでは無いか、リスタートデータがvalidであると判断出来れば、FAILEDに変更します。job実行をアボートするにはSpring Batch Admin JobServiceユーティリティを使います。