kagamihogeの日記

kagamihogeの日記です。

Spring Batch 4.1.x - Reference Documentation - Spring Batch Integrationのテキトー翻訳

https://docs.spring.io/spring-batch/4.1.x/reference/html/spring-batch-integration.html#springBatchIntegration

https://qiita.com/kagamihoge/items/12fbbc2eac5b8a5ac1e0 俺の訳一覧リスト

1. Spring Batch Integration

1.1. Spring Batch Integration Introduction

Spring Batchのユーザの多くはそのスコープ外の機能を必要とする事があり、それらはSpring Integrationで効率的かつ簡潔に実装可能な場合があります。逆に、Spring IntegrationのユーザがSpring Batchの機能を必要としたり両フレームワークの効率的な連携を必要とする場合もあります。これについて、いくつかのパターンとユースケースがあり、Spring Batch Integrationで扱います。

Spring BatchとSpring Integrationの境目は常に明瞭なわけではなく、以下2つのアドバイスが役に立ちます。粒度に気を配り、共通パターンを適用する(Think about granularity, and apply common patterns.)。共通パターンのいくつかは本リファレンスで解説します。

バッチ処理にメッセージング追加によりオペレーション自動化・関心事の分離・strategizing*1が可能となります。例えば、メッセージをジョブ実行のトリガにし、メッセージ送信を様々な方法で行います。または、ジョブが完了もしくは失敗時に、そのイベントがメッセージ送信をトリガし、コンシューマはアプリケーション本体とは無関係な運用上の関心事を行います。メッセージングはジョブ内に埋め込むことも可能です(例えば、チャネルから処理対象のアイテムを読み込んだり書き込んだりする)。リモートパーティショニングとリモートチャンキングは多数のワーカーにワークロードを分散する手段を提供します。

このセクションでは以下のキーコンセプトを解説します。

1.1.2. Launching Batch Jobs through Messages

コアSpring Batch APIからバッチジョブを開始するには、基本的には以下2つの選択肢があります。

  • CommandLineJobRunnerCLI経由。
  • JobOperator.start()もしくはJobLauncher.run()のプログラム経由。

たとえば、シェルスクリプトでBatch Jobsを実行する場合はCommandLineJobRunnerを使います。もしくは、直接JobOperatorを使う場合もあります(webアプリケーションの一部としてSpring Batchを使う場合など)。しかし、さらに複雑なユースケースではどうでしょうか? Batch Job用のデータ取得に(S)FTPにポーリングすつとか、アプリケーションで同時に複数データソースを扱う必要がある場合です。たとえば、web経由のデータファイルだけでなくFTPやその他からも受信する場合です。これはSpring Batch実行前に入力ファイル変換が必要になります。

そこで、Spring Integrationと各種アダプターでバッチジョブ実行が柔軟になります。たとえば、File Inbound Channel Adapterによりファイルシステム上のディレクトリを直接モニタし、入力ファイルが到着したらBatch Jobの開始が出来ます。また、Spring Integration flowsで複数の異なるアダプタを使用し、設定のみで同時に複数データソースからバッチジョブで使うデータを簡単に取り込めます。Spring Integrationでこれらを実装するのは簡単で、疎結合が容易であり、JobLauncherのイベント駆動実行が出来ます。

Spring Batch Integrationにはバッチジョブを起動するJobLaunchingMessageHandlerクラスがあります。JobLaunchingMessageHandlerの入力はSpring Integrationのメッセージで、これはJobLaunchRequestペイロードを持ちます。このクラスは起動するJobとBatch jobの起動に必要なJobParametersのラッパーです。

以下はBatch jobを開始する一般的なSpring Integration message flowのイメージ図です。EIP (Enterprise Integration Patterns) websiteにメッセージングのアイコンとその解説があります。

Figure 1. Launch Batch Job

Transforming a file into a JobLaunchRequest

package io.spring.sbi;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.integration.launch.JobLaunchRequest;
import org.springframework.integration.annotation.Transformer;
import org.springframework.messaging.Message;

import java.io.File;

public class FileMessageToJobRequest {
    private Job job;
    private String fileParameterName;

    public void setFileParameterName(String fileParameterName) {
        this.fileParameterName = fileParameterName;
    }

    public void setJob(Job job) {
        this.job = job;
    }

    @Transformer
    public JobLaunchRequest toRequest(Message<File> message) {
        JobParametersBuilder jobParametersBuilder =
            new JobParametersBuilder();

        jobParametersBuilder.addString(fileParameterName,
            message.getPayload().getAbsolutePath());

        return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
    }
}

The JobExecution Response

バッチジョブを実行すると、JobExecutionを返します。このインスタンスで実行ステータスを確認します。JobExecutionが正常に作成可能であれば、実際の実行が成功するかどうかに関わらず、JobExecutionは常に返されます。

JobExecutionインスタンスが返されるかどうかはTaskExecutor次第です。synchronous(single-threaded) TaskExecutor実装を使う場合、JobExecutionのレスポンスはジョブが完了してから返します。asynchronous TaskExecutorの場合、JobExecutionは即時返します。ユーザはJobExecutionidを取得(JobExecution.getJobId()を使用)してから、JobExplorerでjobの更新後ステータスをJobRepositoryにクエリーします。詳細については、 Spring Batch reference documentationのQuerying the Repositoryを参照してください。

Spring Batch Integration Configuration

以下の設定はfile inbound-channel-adapterを生成して指定ディレクトリのCSVファイルをリッスンし、そのファイルをtransformer(FileMessageToJobRequest)に渡し、Job Launching Gatewayでjobを起動し、logging-channel-adapterJobExecutionのログ出力をします。

Java Configuration

@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
    FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
    fileMessageToJobRequest.setFileParameterName("input.file.name");
    fileMessageToJobRequest.setJob(personJob());
    return fileMessageToJobRequest;
}

@Bean
public JobLaunchingGateway jobLaunchingGateway() {
    SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
    simpleJobLauncher.setJobRepository(jobRepository);
    simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
    JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);

    return jobLaunchingGateway;
}

@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) {
    return IntegrationFlows.from(Files.inboundAdapter(new File("/tmp/myfiles")).
                    filter(new SimplePatternFileListFilter("*.csv")),
            c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1))).
            handle(fileMessageToJobRequest()).
            handle(jobLaunchingGateway).
            log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").
            get();
}

Example ItemReader Configuration

いま、ファイルのポーリングしてジョブ起動するとして、Spring BatchのItemReaderでjobパラメータ"input.file.name"のディレクトリを使用する設定を行います。bean設定は以下のようになります。

Java Configuration

@Bean
@StepScope
public ItemReader sampleReader(@Value("#{jobParameters[input.file.name]}") String resource) {
...
    FlatFileItemReader flatFileItemReader = new FlatFileItemReader();
    flatFileItemReader.setResource(new FileSystemResource(resource));
...
    return flatFileItemReader;
}

上記例のポイントはResourceプロパティ用に#{jobParameters['input.file.name']}をインジェクトし、ItemReader beanをStep scopeにしています。beanをStepスコープにすることで遅延バインディングが使用可能となり、これによりjobParameters変数にアクセスできます。

1.2. Available Attributes of the Job-Launching Gateway

job-launching gatewayの以下の属性でjobの制御設定が出来ます。

  • id: 基底Spring bean定義である以下どちらかのインスタンスを識別する。
    • EventDrivenConsumer
    • PollingConsumer(実際の実装はコンポーネントの入力チャネルがSubscribableChannelPollableChannelかどうかに依存)
  • auto-startup: エンドポイントがスタートアップ時に自動開始するかのBooleanフラグ。デフォルトtrue
  • request-channel: エンドポイントの入力MessageChannel
  • reply-channel: 実行結果のJobExecutionペイロードの送り先MessageChannel
  • reply-timeout: リプライチャネルにリプライメッセージが正常に送信されるのをゲートウェイがwaitする時間(ミリ秒)を指定します。超える場合は例外をスローします。この属性はチャネルがブロックする場合にだけ適用します(例、bounded queue channelがフルなど)。また、注意点として、DirectChannelに送信の場合、その実行は送信スレッドで行います。よって、送信オペレーションの失敗はさらに下流コンポーネントが起こしている可能性があります。reply-timeout属性は基底のMessagingTemplateインスタンスsendTimeoutプロパティにマップします。未指定の場合、属性のデフォルトは-1、これの意味は、デフォルトではGatewayは無限にwaitする、になります。
  • job-launcher: オプション。カスタムJobLauncher beanの参照。未指定ではアダプタはjobLauncheridで登録するインスタンスを再利用します。デフォルトインスタンスが無い場合、例外をスローします。
  • order: エンドポイントをサブスクライバとしてSubscribableChannelに接続した時の呼び出し順序を指定。

1.3. Sub-Elements

GatewayPollableChannelからメッセージを受信する場合、グローバルデフォルトのPollerを指定するか、Job Launching GatewayPoller sub-elementを指定する必要があります。

Java Configuration

@Bean
@ServiceActivator(inputChannel = "queueChannel", poller = @Poller(fixedRate="1000"))
public JobLaunchingGateway sampleJobLaunchingGateway() {
    JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher());
    jobLaunchingGateway.setOutputChannel(replyChannel());
    return jobLaunchingGateway;
}

1.3.1. Providing Feedback with Informational Messages

Spring Batchのjobは長時間実行が可能なので、進行状況の取得が重要な場合があります。たとえば、バッチジョブの一部または全部を失敗する場合、ステークホルダで通知を受けたい場合があります。Spring Batchは以下を通じてそうした情報を収集する仕組みがあります。

  • Active polling
  • Event-driven listeners

Spring Batchのjobを非同期実行する場合(例えばJob Launching Gatewayを使用)、JobExecutionインスタンスを返します。この場合、継続的にポーリングしてステータス更新するためにJobExecution.getJobId()を使えます。JobExplorerを使用してJobRepositoryからJobExecutionの更新されたインスタンスを取得します。しかし、これは最良とは言えず、イベント駆動がより適しています。

このため、Spring Batchは、3つの汎用リスナーを用意しています。

  • StepListener
  • ChunkListener
  • JobExecutionListener

以下イメージでは、Spring BatchのjobはStepExecutionListenerも設定しています。このとき、Spring Integrationはイベント受信とイベント前後にstepを処理します。たとえば、受信したStepExecutionRouterでチェックします。チェック結果に基づき、各種処理(例:Mail Outbound Channel Adapterにメッセージをルーティング)が可能で、状態に応じたEmail通知を送信できます。

Figure 2. Handling Informational Messages

以下2例は、StepExecutionイベントでGatewayにメッセージ送信するリスナー設定と、その出力をlogging-channel-adapterにログ出力します。

まず、通知インテグレーションのbeanを生成します。

Java Configuration

@Bean
@ServiceActivator(inputChannel = "stepExecutionsChannel")
public LoggingHandler loggingHandler() {
    LoggingHandler adapter = new LoggingHandler(LoggingHandler.Level.WARN);
    adapter.setLoggerName("TEST_LOGGER");
    adapter.setLogExpressionString("headers.id + ': ' + payload");
    return adapter;
}

@MessagingGateway(name = "notificationExecutionsListener", defaultRequestChannel = "stepExecutionsChannel")
public interface NotificationExecutionListener extends StepExecutionListener {}

※ configurationに@IntegrationComponentScanが必要です。

次に、jobにstep-levelリスナーを追加します。

Java Configuration

public Job importPaymentsJob() {
    return jobBuilderFactory.get("importPayments")
        .start(stepBuilderFactory.get("step1")
                .chunk(200)
                .listener(notificationExecutionsListener())
                ...
}

1.3.2. Asynchronous Processors

非同期プロセッサ(Asynchronous Processors)でアイテム処理のスケーリングが出来ます。非同期プロセッサのユースケースでは、AsyncItemProcessorはディスパッチャとして振る舞い、新スレッドでItemProcessorのロジックを実行します。アイテムが完了すると、書き込むためにAsynchItemWriterFutureが渡されます。

このため、基本的にはfork-joinを実装出来るような、非同期アイテム処理でパフォーマンスが向上する可能性があります。AsyncItemWriterは結果を収集してすべての結果が利用可能になり次第chunkにwrite backします。

以下はAsyncItemProcessor設定例です。

Java Configuration

@Bean
public AsyncItemProcessor processor(ItemProcessor itemProcessor, TaskExecutor taskExecutor) {
    AsyncItemProcessor asyncItemProcessor = new AsyncItemProcessor();
    asyncItemProcessor.setTaskExecutor(taskExecutor);
    asyncItemProcessor.setDelegate(itemProcessor);
    return asyncItemProcessor;
}

delegateプロパティはItemProcessor beanを設定し、taskExecutorプロパティは適当なTaskExecutorを設定します。

以下はAsyncItemWriterの設定例です。

Java Configuration

@Bean
public AsyncItemWriter writer(ItemWriter itemWriter) {
    AsyncItemWriter asyncItemWriter = new AsyncItemWriter();
    asyncItemWriter.setDelegate(itemWriter);
    return asyncItemWriter;
}

こちらも、delegateプロパティには実際の処理をするItemWriter beanを設定します。

1.3.3. Externalizing Batch Process Execution

これまでの解説ではSpring IntegrationがSpring Batchをラップするユースケースを見てきました。しかし、Spring Batchが内部的にSpring Integrationを使うことも可能です。この場合、Spring Batchのユーザは外部プロセスにアイテム処理あるいはchunkをデリゲートします。これにより複雑な処理の負荷軽減が可能です。Spring Batch Integrationは以下のための専用機能があります。

  • Remote Chunking
  • Remote Partitioning

Remote Chunking

Figure 3. Remote Chunking

ChunkMessageChannelItemWriterでchunk処理を外部化し、アイテムを外に送信して結果を収集します。送信すると、Spring Batchは読み込みとアイテムのグループ化を継続し、結果待ちをしません。ChunkMessageChannelItemWriterは結果の収集と統合をしてSpring Batchに処理を戻します。

Spring Integrationを使用して、処理のコンカンレンシーを完全に制御します(DirectChannelの代わりにQueueChannelを使用するなど)。 Channel Adapters (JMSやAMQPなど)のSpring Integrationリッチコレクションに依存することで、Batchジョブのchunkを外部システムに分散出来ます。

remote chunkするstepのシンプルなjobは以下のような設定になります。

Java Configuration

public Job chunkJob() {
     return jobBuilderFactory.get("personJob")
             .start(stepBuilderFactory.get("step1")
                     .<Person, Person>chunk(200)
                     .reader(itemReader())
                     .writer(itemWriter())
                     .build())
             .build();
 }

ItemReaderにはマスターでデータ読み込みに使用するbeanを指定します。ItemWriterには上述の専用ItemWriterChunkMessageChannelItemWriter)を指定します。processor(必要であれば設定)はマスターの設定にはありませんが、これはワーカーで設定します。以下の設定は基本的なマスターのセットアップです。必要に応じて、スロットルリミットなど、コンポーネントのプロパティを設定してください。

Java Configuration

@Bean
public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
    factory.setBrokerURL("tcp://localhost:61616");
    return factory;
}

/*
 * Configure outbound flow (requests going to workers)
 */
@Bean
public DirectChannel requests() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlows
            .from(requests())
            .handle(Jms.outboundAdapter(connectionFactory).destination("requests"))
            .get();
}

/*
 * Configure inbound flow (replies coming from workers)
 */
@Bean
public QueueChannel replies() {
    return new QueueChannel();
}

@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlows
            .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("replies"))
            .channel(replies())
            .get();
}

/*
 * Configure the ChunkMessageChannelItemWriter
 */
@Bean
public ItemWriter<Integer> itemWriter() {
    MessagingTemplate messagingTemplate = new MessagingTemplate();
    messagingTemplate.setDefaultChannel(requests());
    messagingTemplate.setReceiveTimeout(2000);
    ChunkMessageChannelItemWriter<Integer> chunkMessageChannelItemWriter
            = new ChunkMessageChannelItemWriter<>();
    chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate);
    chunkMessageChannelItemWriter.setReplyChannel(replies());
    return chunkMessageChannelItemWriter;
}

上の設定ではいくつかのbeanを設定しています。ActiveMQでメッセージングミドルウェアと、Spring Integrationのinbound/outbound JMSアダプタを設定しています。前述のitemWriter beanはjob stepで参照しており、ChunkMessageChannelItemWriterでメッセージングを介してchunkを書き込みます。

次にワーカーの設定を見ていきます。

Java Configuration

@Bean
public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
    factory.setBrokerURL("tcp://localhost:61616");
    return factory;
}

/*
 * Configure inbound flow (requests coming from the master)
 */
@Bean
public DirectChannel requests() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlows
            .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
            .channel(requests())
            .get();
}

/*
 * Configure outbound flow (replies going to the master)
 */
@Bean
public DirectChannel replies() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlows
            .from(replies())
            .handle(Jms.outboundAdapter(connectionFactory).destination("replies"))
            .get();
}

/*
 * Configure the ChunkProcessorChunkHandler
 */
@Bean
@ServiceActivator(inputChannel = "requests", outputChannel = "replies")
public ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler() {
    ChunkProcessor<Integer> chunkProcessor
            = new SimpleChunkProcessor<>(itemProcessor(), itemWriter());
    ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler
            = new ChunkProcessorChunkHandler<>();
    chunkProcessorChunkHandler.setChunkProcessor(chunkProcessor);
    return chunkProcessorChunkHandler;
}

これら設定項目の多くはマスターのものと似通っています。ワーカーはSpring Batch JobRepositoryにもjob設定にもアクセスしません。中核のbeanはchunkProcessorChunkHandlerです。ChunkProcessorChunkHandlerchunkProcessorプロパティはSimpleChunkProcessorを取り、これはItemWriter(とオプションでItemProcessor)を指定し、これ(ら)がマスターからchunk受信時にワーカーで動作します。

詳細については、Remote Chunkingの"Scalability"チャプターを参照してください。

4.1以降、Spring Batch Integrationはremote chunkingセットアップの簡素化用に@EnableBatchIntegrationを追加しました。これによりアプリケーションコンテキストに2つのbeanを作ります。

  • RemoteChunkingMasterStepBuilderFactory: マスターstepの設定に使用
  • RemoteChunkingWorkerBuilder: リモートワーカのintegration flow設定に使用

これらAPIは以下図のように多数のコンポーネントの設定に使用します。

Figure 4. Remote Chunking Configuration

マスター側では、以下の宣言によりマスターstepを設定します。

  • アイテム読み込みとワーカー送信用のreader
  • ワーカーにリクエストを送信する出力チャネル(リクエスト送信("Outgoing requests"))
  • ワーカーから返信を受信する入力チャネル(返信受信("Incoming replies"))

ChunkMessageChannelItemWriterMessagingTemplateの明示的な設定は必要ありません(必要であれば明示的な設定を行う)。

ワーカー側では、RemoteChunkingWorkerBuilderでワーカーに以下を設定します。

  • 入力チャネルでマスターが送信したリクエストをリッスン(リクエスト受信("Incoming requests"))
  • 各リクエストに対しChunkProcessorChunkHandlerhandleChunkメソッドを介してItemProcessorItemWriterを呼び出す。
  • 出力チャネルでマスターに返信を送信(返信送信"Outgoing replies")

SimpleChunkProcessorChunkProcessorChunkHandlerの明示的な設定は必要ありません(必要であれば明示的な設定を行う)。

以下はこれらAPIの設定例です。

@EnableBatchIntegration
@EnableBatchProcessing
public class RemoteChunkingJobConfiguration {

    @Configuration
    public static class MasterConfiguration {

        @Autowired
        private RemoteChunkingMasterStepBuilderFactory masterStepBuilderFactory;

        @Bean
        public TaskletStep masterStep() {
            return this.masterStepBuilderFactory.get("masterStep")
                       .chunk(100)
                       .reader(itemReader())
                       .outputChannel(requests()) // リクエストをワーカーに送信
                       .inputChannel(replies())   // ワーカーから返信を受信
                       .build();
        }

        // ミドルウェアのbeanをここでセットアップ(詳細省略)

    }

    @Configuration
    public static class WorkerConfiguration {

        @Autowired
        private RemoteChunkingWorkerBuilder workerBuilder;

        @Bean
        public IntegrationFlow workerFlow() {
            return this.workerBuilder
                       .itemProcessor(itemProcessor())
                       .itemWriter(itemWriter())
                       .inputChannel(requests()) // リクエストをマスターから受信
                       .outputChannel(replies()) // マスターに返信を送信
                       .build();
        }

        // ミドルウェアのbeanをここでセットアップ(詳細省略)

    }

}

remote chunking jobのより複雑な例についてはこちらを参照してください。

Remote Partitioning

Figure 5. Remote Partitioning

Remote Partitioningはアイテム処理ではなくボトルネックを引き起こすI/O関連で有効です。Remote Partitioningの場合、Spring Batchのstepを実行するワーカーに処理を外出しします。このため、ワーカーはItemReader, ItemProcessor, ItemWriterを持ちます。これ用に、Spring Batch IntegrationにはMessageChannelPartitionHandlerがあります

PartitionHandlerの実装はMessageChannelを使用してリモートワーカに指示を出してそのレスポンスを受け取ります。これらはリモートワーカと通信するのに使う(JMSとAMQPなど)トランポートのクラス群です。

"Scalability"チャプターのセクションのremote partitioningでは、リモートパーティショニングの概念と設定に必要なコンポーネントの概要と、ジョブ実行を複数のローカルスレッドにパーティショニングするデフォルトのTaskExecutorPartitionHandlerを使うサンプルについて、解説しています。複数JVMにリモートパーティショニングするには、追加で以下2つのコンポーネントが必要です。

  • A remoting fabric or grid environment
  • remoting fabricあるいはgrid environmentを使用するPartitionHandler実装。

remote chunking同様、JMSを"remoting fabric"に使えます。その場合、上述の通り、PartitionHandler実装にMessageChannelPartitionHandlerを使います。以下のサンプルはpartitioned jobが設定済みとして、JMS設定とMessageChannelPartitionHandlerの例です。

Java Configuration

/*
 * Configuration of the master side
 */
@Bean
public PartitionHandler partitionHandler() {
    MessageChannelPartitionHandler partitionHandler = new MessageChannelPartitionHandler();
    partitionHandler.setStepName("step1");
    partitionHandler.setGridSize(3);
    partitionHandler.setReplyChannel(outboundReplies());
    MessagingTemplate template = new MessagingTemplate();
    template.setDefaultChannel(outboundRequests());
    template.setReceiveTimeout(100000);
    partitionHandler.setMessagingOperations(template);
    return partitionHandler;
}

@Bean
public QueueChannel outboundReplies() {
    return new QueueChannel();
}

@Bean
public DirectChannel outboundRequests() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow outboundJmsRequests() {
    return IntegrationFlows.from("outboundRequests")
            .handle(Jms.outboundGateway(connectionFactory())
                    .requestDestination("requestsQueue"))
            .get();
}

@Bean
@ServiceActivator(inputChannel = "inboundStaging")
public AggregatorFactoryBean partitioningMessageHandler() throws Exception {
    AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
    aggregatorFactoryBean.setProcessorBean(partitionHandler());
    aggregatorFactoryBean.setOutputChannel(outboundReplies());
    // configure other propeties of the aggregatorFactoryBean
    return aggregatorFactoryBean;
}

@Bean
public DirectChannel inboundStaging() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow inboundJmsStaging() {
    return IntegrationFlows
            .from(Jms.messageDrivenChannelAdapter(connectionFactory())
                    .configureListenerContainer(c -> c.subscriptionDurable(false))
                    .destination("stagingQueue"))
            .channel(inboundStaging())
            .get();
}

/*
 * Configuration of the worker side
 */
@Bean
public StepExecutionRequestHandler stepExecutionRequestHandler() {
    StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler();
    stepExecutionRequestHandler.setJobExplorer(jobExplorer);
    stepExecutionRequestHandler.setStepLocator(stepLocator());
    return stepExecutionRequestHandler;
}

@Bean
@ServiceActivator(inputChannel = "inboundRequests", outputChannel = "outboundStaging")
public StepExecutionRequestHandler serviceActivator() throws Exception {
    return stepExecutionRequestHandler();
}

@Bean
public DirectChannel inboundRequests() {
    return new DirectChannel();
}

public IntegrationFlow inboundJmsRequests() {
    return IntegrationFlows
            .from(Jms.messageDrivenChannelAdapter(connectionFactory())
                    .configureListenerContainer(c -> c.subscriptionDurable(false))
                    .destination("requestsQueue"))
            .channel(inboundRequests())
            .get();
}

@Bean
public DirectChannel outboundStaging() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow outboundJmsStaging() {
    return IntegrationFlows.from("outboundStaging")
            .handle(Jms.outboundGateway(connectionFactory())
                    .requestDestination("stagingQueue"))
            .get();
}

また、以下サンプルのようにhandler属性にpartitionHandler beanを設定をしてください。

Java Configuration

        public Job personJob() {
                return jobBuilderFactory.get("personJob")
                                .start(stepBuilderFactory.get("step1.master")
                                                .partitioner("step1.worker", partitioner())
                                                .partitionHandler(partitionHandler())
                                                .build())
                                .build();
        }

リモートパーティショニングのjobのより複雑な例についてはこちらを参照してください。

リモートパーティショニングのセットアップを簡易化するための@EnableBatchIntegrationがあります。このアノテーションはリモートパーティショニングの2つのbeanを提供します。

  • RemotePartitioningMasterStepBuilderFactory: マスターのstepを設定するのに使用。
  • RemotePartitioningWorkerStepBuilderFactory: ワーカーのstepを設定するのに使用。

これらAPIは以下図のように多数のコンポーネントの設定に使用します。

Figure 6. Remote Partitioning Configuration (with job repository polling)

Figure 7. Remote Partitioning Configuration (with replies aggregation)

マスター側では、RemotePartitioningMasterStepBuilderFactoryを使用して以下を宣言することでマスターstepを設定します。

  • データをパーティション化するのに使用するPartitioner
  • ワーカーにリクエスト送信するための出力チャネル ("Outgoing requests")
  • ワーカーからの返信を受信するための入力チャネル("Incoming replies")(replies aggregation設定時)
  • ポーリングのインターバルとタイムアウト(job repository polling設定時)

MessageChannelPartitionHandlerMessagingTemplateの明示的な設定は必要ありません(必要な場合は設定する)。

反対のワーカー側では、RemotePartitioningWorkerStepBuilderFactoryを使用してワーカーを設定します。

  • 入力チャネルでマスターからのリクエスト送信をリッスンする("Incoming requests")
  • リクエストに対してStepExecutionRequestHandlerhandleメソッドを呼ぶ。
  • マスターに出力チャネルを通じて返信を送信する("Outgoing replies")

StepExecutionRequestHandlerの明示的な設定は必要ありません(必要な場合は設定する)。

これらのAPIの使用例は以下の通りです。

@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
public class RemotePartitioningJobConfiguration {

    @Configuration
    public static class MasterConfiguration {

        @Autowired
        private RemotePartitioningMasterStepBuilderFactory masterStepBuilderFactory;

        @Bean
        public Step masterStep() {
                 return this.masterStepBuilderFactory
                    .get("masterStep")
                    .partitioner("workerStep", partitioner())
                    .gridSize(10)
                    .outputChannel(outgoingRequestsToWorkers())
                    .inputChannel(incomingRepliesFromWorkers())
                    .build();
        }

        // Middleware beans setup omitted

    }

    @Configuration
    public static class WorkerConfiguration {

        @Autowired
        private RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory;

        @Bean
        public Step workerStep() {
                 return this.workerStepBuilderFactory
                    .get("workerStep")
                    .inputChannel(incomingRequestsFromMaster())
                    .outputChannel(outgoingRepliesToMaster())
                    .chunk(100)
                    .reader(itemReader())
                    .processor(itemProcessor())
                    .writer(itemWriter())
                    .build();
        }

        // Middleware beans setup omitted

    }

}

*1:strategyパターンのようにバッチ処理を独立した関数的な単位と見なせるようになる、といった意味合いと思われ

JEP 351: ZGC: Uncommit Unused Memoryをテキトーに訳した

http://openjdk.java.net/jeps/351

JEP 351: ZGC: Uncommit Unused Memory

Owner    Per Liden
Type    Feature
Scope   Implementation
Status  Candidate
Component   hotspot / gc
Discussion  hotspot dash gc dash dev at openjdk dot java dot net
Effort  S
Duration    S
Reviewed by Mikael Vidstedt, Stefan Karlsson
Created 2019/03/08 10:35
Updated 2019/03/14 21:48
Issue   8220347

Summary

未使用ヒープメモリをOSに返却するようZGCを改良します。

Motivation

現行のZGCは、あるメモリが長時間未使用だとしても、uncommitおよびそのメモリのOS返却を行いません。この振る舞いはあらゆる種類のアプリケーションと環境に最適では無く、特にメモリフットプリントが関心事である場合に顕著です。たとえば、

  • リソース使用分で請求されるコンテナ環境。
  • アプリケーションが長期間アイドルしたり、多数のアプリケーションでリソース共有や競合が発生する環境。
  • アプリケーション実行中にヒープ空間に対する要求が極めて変化する場合。たとえば、スタートアップ中に必要なヒープが通常状態よりも大量に必要となるケース。

HotSpot, G1とShenandoahのその他のGCは、今日ではこれら機能を有しており、上記カテゴリのユーザに受け入れられています。ZGCにこれら機能を追加し、同カテゴリのユーザにZGCを受け入れやすくします。

Description

ZGCヒープはZPagesというヒープ領域セットで構成しています。ZPageはそれぞれ可変量のコミット済ヒープメモリと関連付けられています。ZGCがヒープをコンパクト化すると、ZPageは解放されてZPageCacheというページキャッシュに挿入されます。ページキャッシュのZPagesは新規ヒープアロケーションに対して再使用可能状態にあり、アロケーション後はキャッシュから削除されます。ページキャッシュはパフォーマンスにとって極めて重要で、これはメモリのコミットとuncommitが高価なオペレーションなためです。

ページキャッシュのZPagesはヒープの未使用部分を表現し、これはuncommitとOS返却が"可能(could)"です。このためメモリuncommitはページキャッシュの適当に選択したZPagesを単に切り離せば良く、それからページに関連付けられたメモリをuncommitします。ページキャッシュはLRUと分離サイズ(small, medium, large)でZPagesを維持する仕組みがあるため、ZPages切り離しのメカニズムとメモリuncommitは比較的単純です。チャレンジングなのは、キャッシュからZPageを切り離すタイミングの決定ポリシーです。

シンプルなポリシーとしてはタイムアウトや、ページキャッシュを切り離すZPage生存時間を指定するディレイ値があります。タイムアウトは妥当なデフォルト値で、これはコマンドラインオプションでオーバーライドします。Shenandoah GCはこれと似たポリシーを持ち、デフォルト値5分でコマンドラインオプション-XX:ShenandoahUncommitDelay=<milliseconds>でオーバーライドします。

上記のようなポリシーは妥当な動作をします。しかし、もっと賢い切り離しポリシー設定が可能で、これは別途コマンドラインオプションを必要としません。たとえば、GC頻度や何らかのデータに基づいて最適なタイムアウトを得るヒューリスティックがあります。現時点では採用ポリシーは決定していません。各種ポリシーを評価予定です。まず最初は、シンプルなタイムアウトポリシーと-XX:ZUncommitDelay=<seconds>を提供し、それから、より賢いポリシー(もし見つかれば)を導入します。

ポリシーが決定したとしても、ヒープが最小サイズ(-Xms)以下になるようなZGCのメモリuncommitを絶対にしません。つまり、最小ヒープサイズ(-Xms)が最大ヒープサイズ(-Xmx)と等しい状態でJVMを開始する場合、uncommit機能は無効化します。

最後に、Linux/x64のZGCはヒープ返却にtmpfsやhugetlbfsファイルを使います。これらファイルのメモリuncommitはFALLOC_FL_PUNCH_HOLEサポートのあるfallocate(2)が必要で、これが最初に導入されたのはinux 3.5 (tmpfs)と4.3 (hugetlbfs)です。ZGCはこれより古いLinuxでも以前同様に動作する必要があります。ただし、uncommit機能が無効化される点は除きます。

Testing

  • 開発するuncommit機能の検証に一つ以上のjtregテストを使用する。
  • 既存のベンチマーク、SPECjbbとSPECjvmなど、でデフォルトポリシー使用時にレイテンシやスループット劣化が見られないことを検証する。

Risks and Assumptions

現行プランではuncommit機能は常にONで、明示的な無効化オプションはありません。ただし、-Xms-Xmxを同一値にすることで間接的には可能です。これは余計なオプション追加をしないというZGCの意志の現れです。明示オプション(-XX:-ZUncommit)が必要かどうかは時が決めるでしょう。現状、必要でないと判断しています。

Spring Batch 4.1.x - Reference Documentation - JSR-352 Supportのテキトー翻訳

https://docs.spring.io/spring-batch/4.1.x/reference/html/jsr-352.html#jsr-352

https://qiita.com/kagamihoge/items/12fbbc2eac5b8a5ac1e0 俺の訳一覧リスト

*1

1. JSR-352 Support

Spring Batch 3.0以降はJSR-352を完全に実装しています。このセクションは仕様自体の入れ替えというより、JSR-352固有のコンセプトをSpring Batchに適用する方法について説明します。JSR-352の情報についてはJCP https://jcp.org/en/jsr/detail?id=352 を参照してください。

1.1. General Notes about Spring Batch and JSR-352

Spring BatchとJSR-352は構造的には同じです。どちらもjobはstepで作り、reader, processor, writer, listenerを持ちます。しかし、微妙に異なる箇所があります。たとえば、Spring Batchのorg.springframework.batch.core.SkipListener#onSkipInWrite(S item, Throwable t)は引数を2つ、スキップされたアイテムとスキップ原因の例外、を取ります。JSR-352の方(javax.batch.api.chunk.listener.SkipWriteListener#onSkipWriteItem(List<Object> items, Exception ex))も同様に引数を2つとります。しかし、最初の引数はそのchunkの全アイテムListで2つ目はスキップ原因のExceptionです。こうした差異があるため、Spring Batchでjobを実行する方法が2つ、従来通りのSpring BatchのjobかJSR-352ベースのjob、がある点には注意が必要です。Spring Batchのアーティファクト(reader, writerなど)のjobをJSR-352のJSLで設定してJsrJobOperatorで実行する場合、JSR-352のルールで動作します。なお、JSR-352で開発したバッチアーティファクトはSpring Batchのjobでは動作しない点に注意してください。

1.2. Setup

1.2.1. Application Contexts

Spring Batch内でのJSR-352ベースのjobは2つのアプリケーションコンテキストを構成します。親コンテキストは、JobRepository, PlatformTransactionManagerなどSpring Batchのインフラ部分に関連するbeanを持ち、子コンテキストは実行するjobの設定を持ちます。親コンテキストはフレームワークが提供するjsrBaseContext.xmlで定義します。このコンテキストはJSR-352-BASE-CONTEXTシステムプロパティでオーバーライドできます。

※ ベースコンテキスト(base context)はプロパティインジェクションなどのJSR-352プロセッサで処理しないため、追加処理の必要なコンポーネントはそこで設定しないでください*2

1.2.2. Launching a JSR-352 based job

JSR-352でバッチジョブを実行する方法は大変シンプルです。ジョブを実行するのに必要なコードは以下の通りです。

JobOperator operator = BatchRuntime.getJobOperator();
jobOperator.start("myJob", new Properties());

上記は開発者にとって分かりやすいですが、落とし穴があります*3。Spring Batchは裏側でいくつかのインフラとなるbeanを初期化します。これは開発者がオーバーライド可能です。以下は初回BatchRuntime.getJobOperator()呼び出し時に初期化される一覧です。

Bean Name Default Configuration Notes
dataSource 設定値使用のApache DBCP BasicDataSource デフォルトではHSQLDBが初期化される
transactionManager org.springframework.jdbc.datasource.DataSourceTransactionManager 上記で定義するdataSource beanを参照する
A Datasource initializer   batch.drop.scriptbatch.schema.scriptプロパティのスクリプトを実行するために設定します。デフォルトではHSQLDBスキーマスクリプトを実行します。この振る舞いはbatch.data.source.initプロパティでdisableに出来ます。
jobRepository JDBCベースのSimpleJobRepository JobRepositoryは上述のデータソースとトランザクションマネージャを使用します。スキーマのテーブルプレフィクス(デフォルトBATCH_)はbatch.table.prefixプロパティで設定可能です
jobLauncher org.springframework.batch.core.launch.support.SimpleJobLauncher job実行に使用
batchJobOperator org.springframework.batch.core.launch.support.SimpleJobOperator JsrJobOperatorが各種機能を提供するのにこのbeanをラップする
jobExplorer org.springframework.batch.core.explore.support.JobExplorerFactoryBean JsrJobOperatorが提供する機能のルックアップに使用する
jobParametersConverter org.springframework.batch.core.jsr.JsrJobParametersConverter JobParametersConverterのJSR-352固有実装
jobRegistry org.springframework.batch.core.configuration.support.MapJobRegistry SimpleJobOperatorが使用
placeholderProperties org.springframework.beans.factory.config.PropertyPlaceholderConfigure 上述のプロパティを設定するためにbatch-${ENVIRONMENT:hsql}.propertiesをロードする。ENVIRONMENTはシステムプロパティ(デフォルトhsql)でSpring BatchがサポートするDBを指定可能です

※ 上記beanのいずれもJSR-352ベースのjob実行ではオプション扱いです。どのbeanも必要に応じてカスタマイズのためにオーバライドが可能です。

1.3. Dependency Injection

JSR-352はかなりの程度Spring Batchのプログラミングモデルをベースにしています。よって、明示的に何らかのDI実装を用意する必要はありません。Spring BatchはJSR-352で定義するバッチアーティファクトのロードに3つの方法をサポートしています。

  • Implementation Specific Loader - Spring BatchはSpring上で動作するので、JSR-352バッチジョブ内のSpring DIもサポートする。
  • Archive Loader - JSR-352は論理名とクラス名をマッピングするbatch.xmlを定義する。このファイルを使用する場合は/META-INF/ディレクトリ内に配置する。
  • Thread Context Class Loader - JSR-352はインライン完全修飾クラス名によるJSLでバッチアーティファクトの実装を指定する設定が可能です。Spring BatchはJSR-352設定のjobも同様にサポートします。

JSR-352ベースのバッチジョブでSpringのDIを使うには、Springアプリケーションコンテキストを使用するバッチアーティファクトをbeanとして設定します。bean定義後は、jobはそのbeanをbatch.xmlで定義されたかのように参照可能です。

Java Configuration

@Configuration
public class BatchConfiguration {

    @Bean
    public Batchlet fooBatchlet() {
        FooBatchlet batchlet = new FooBatchlet();
        batchlet.setProp("bar");
        return batchlet;
       }
}


<?xml version="1.0" encoding="UTF-8"?>
<job id="fooJob" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">
    <step id="step1" >
        <batchlet ref="fooBatchlet" />
    </step>
</job>

Springコンテキストのassembly(importsなど)は、Springベースのアプリケーション同様に、JSR-352のjobでも動作しまう。JSR-352ベースのjobとの唯一の相違点は、コンテキスト定義のエントリーポイントは/META-INF/batch-jobs/のjob定義となります。

thread context class loaderの方法を使うには、refで完全修飾クラス名を渡します。この方法かbatch.xmlを使う場合、参照クラスはbean生成のために引数無しコンストラクタが必要です。

<?xml version="1.0" encoding="UTF-8"?>
<job id="fooJob" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">
    <step id="step1" >
        <batchlet ref="io.spring.FooBatchlet" />
    </step>
</job>

1.4. Batch Properties

1.4.1. Property Support

JSR-352は、JSLの設定方法を使用して、Job,Step,バッチアーティファクトレベルで定義するプロパティを使用可能です。バッチプロパティは以下の方法で各レベルで設定します。

<properties>
    <property name="propertyName1" value="propertyValue1"/>
    <property name="propertyName2" value="propertyValue2"/>
</properties>

Propertiesは各バッチアーティファクトに対して設定します。

1.4.2. @BatchProperty annotation

バッチアーティファクトPropertiesはクラスのフィールドに@BatchProperty@Inject(仕様上両方のアノテーションが必要)を付与することで参照します。JSR-352の定義上、プロパティのフィールドはString型が必須です。型変換の実装と実行は開発者の責任です。

javax.batch.api.chunk.ItemReaderアーティファクトは上述のプロパティ定義と共に使用可能で以下のようにアクセスします。

public class MyItemReader extends AbstractItemReader {
    @Inject
    @BatchProperty
    private String propertyName1;

    ...
}

"propertyName1"フィールドの値は"propertyValue1"になります。

1.4.3. Property Substitution

デフォルトプロパティ(Property substitution)*4演算子とシンプルな条件付きの式を使います。一般的な使用法は#{operator['key']}です。

サポートする演算子は以下です。

  • jobParameters - job開始/再開後にjobパラメータ値にアクセス。
  • jobProperties - JSLのjobレベルのプロパティにアクセス。
  • systemProperties - 名前付きシステムプロパティにアクセス。
  • partitionPlan - パーティション化stepのパーティーションプランの名前付きプロパティにアクセス。
#{jobParameters['unresolving.prop']}?:#{systemProperties['file.separator']}

左側が使いたい値で、右側がデフォルト値です。この例では、#{jobParameters['unresolving.prop']}が解決不能であれば、システムプロパティfile.separatorの値となります。両方とも解決不能であれば、空文字列を返します。複数の条件を使用可能で、';'で区切ります。

1.5. Processing Models

JSR-352にはSpring Batchと同様の2つの処理モデルがあります。

  • Item based processing - javax.batch.api.chunk.ItemReaderjavax.batch.api.chunk.ItemWriter、任意でjavax.batch.api.chunk.ItemProcessor
  • Task based processing - javax.batch.api.Batchletの実装。この処理モデルはorg.springframework.batch.core.step.tasklet.Taskletと同様。

1.5.1. Item based processing

このコンテキストにおけるitemベースの処理はItemReaderで読み込むアイテム数がセットするchunkサイズです。stepの設定には、item-count(デフォルト10)を指定し、任意でcheckpoint-policyをitem(デフォルト値)に設定します。

...
<step id="step1">
    <chunk checkpoint-policy="item" item-count="3">
        <reader ref="fooReader"/>
        <processor ref="fooProcessor"/>
        <writer ref="fooWriter"/>
    </chunk>
</step>
...

itemベースのチェックポイントを使う場合、time-limitも使えます。指定アイテム数を処理すべきタイムリミットを設定します。タイムアウトに達すると、chunkは読み込んだitemがあったとしても、item-countの設定に関わらず、完了します。

1.5.2. Custom checkpointing

JSR-352はstepのチェックポイント内のコミットインターバルで処理を呼び出します。上述の通りitemベースのチェックポイントはその1つです。しかし、多くの場合これは十分にロバストではありません。このため、仕様でjavax.batch.api.chunk.CheckpointAlgorithmインタフェースの実装によりカスタムチェックポイントを作成可能です。この機能はSpring Batchのカスタムcompletion policyと同等です。CheckpointAlgorithmの実装を使うには、checkpoint-policyを、以下のようにstepのfooCheckpointerCheckpointAlgorithmの実装を参照します。

...
<step id="step1">
    <chunk checkpoint-policy="custom">
        <checkpoint-algorithm ref="fooCheckpointer"/>
        <reader ref="fooReader"/>
        <processor ref="fooProcessor"/>
        <writer ref="fooWriter"/>
    </chunk>
</step>
...

1.6. Running a job

JSR-352ベースのjob実行のエントリーポイントはjavax.batch.operations.JobOperatorです。Spring Batchにはこのインタフェースの実装(org.springframework.batch.core.jsr.launch.JsrJobOperator)があります。この実装クラスはjavax.batch.runtime.BatchRuntimeがロードします。JSR-352ベースのバッチジョブの起動は以下のように実装します。

JobOperator jobOperator = BatchRuntime.getJobOperator();
long jobExecutionId = jobOperator.start("fooJob", new Properties());

上記コードは以下を行います。

  • ベースApplicationContextの初期化 - バッチの各種機能を使えるように、フレームワークで基盤部分の初期化を行います。これはJVMごとに1回発生します。初期化コンポーネント@EnableBatchProcessingのそれと似ています。詳細はJsrJobOperatorjavadocを参照してください。
  • jobが要求するApplicationContextのロード - 上の例では、フレームワークは/META-INF/batch-jobsのfooJob.xmlを参照し、前に解説した共有コンテキストの子コンテキストとしてロードします。
  • job起動 - コンテキスト内に定義したjobを非同期に実行する。JobExecutionのidが返される。

※ すべてのJSR-352ベースのバッチジョブは非同期に実行します。

SimpleJobOperatorJobOperator#startを呼ぶ場合、Spring Batchは初回実行か以前の実行のリトライかを判断します。JSR-352のJobOperator#start(String jobXMLName, Properties jobParameters)を使う場合、フレームワークは常に新規JobInstanceを生成します(JSR-352のjobパラメータは一意性を持たない(JSR-352 job parameters are non-identifying))。jobをリスタートするには、JobOperator#restart(long executionId, Properties restartParameters)を使用して下さい。

1.7. Contexts

JSR-352には2つのコンテキストオブジェクトがあり、jobのメタデータにアクセスするものと、バッチアーティファクトからstepにアクセスします。javax.batch.runtime.context.JobContextjavax.batch.runtime.context.StepContextです。これらはstepレベルアーティファクトBatchlet, ItemReaderなど)で利用可能で、JobContextはjobレベルアーティファクトJobListenerなど)でも利用可能です。

カレントのスコープ内でJobContextStepContextの参照を得るには、@Injectを使います。

@Inject
JobContext jobContext;

@Autowire for JSR-352 contexts Springの@Autowireは上記コンテキストのインジェクションには使用出来ません。

Spring Batchでは、JobContextStepContextはこれらに対応するexecutionオブジェクト(JobExecutionStepExecution)をラップします。StepContext#setPersistentUserData(Serializable data)はSpring BatchのStepExecution#executionContextに保存します。

1.8. Step Flow

JSR-352ベースのjobの内部の、stepのflowはSpring Batchのそれと同様の動作します。ただし、多少微妙に異なる点があります。

  • Decision’s are steps - In a regular Spring Batch job, a decision is a state that does not have an independent StepExecution or any of the rights and responsibilities that go along with being a full step.. However, with JSR-352, a decision is a step just like any other and will behave just as any other steps (transactionality, it gets a StepExecution, etc). This means that they are treated the same as any other step on restarts as well.*5
  • next属性とstep transitions - Spring Batchのjobでは、これらは同一stepで一緒に使う事が可能です。JSR-352でも同一stepで使う事が可能で、next属性が評価において優先します。
  • transitions要素の順序 - Spring Batchのjobでは、transition要素は最も一致するものからしないものにソートしてその順序で評価します。JSR-352のjobsはXMLで指定した順序でtransition要素を評価します。

1.9. Scaling a JSR-352 batch job

Spring Batchには4つのスケーリングの方法があります(最後の2つは複数JVMで実行)。

  • Split - パラレルに複数stepを実行。
  • Multiple threads - 複数スレッドで単一stepを実行。
  • Partitioning - パラレル処理でデータを分割(master/slave)。
  • Remote Chunking - ロジックのprocessor pieceをリモートに実行。

JSR-352はバッチジョブのスケーリングに2つのオプションがあります。両オプションとも単一JVMのみをサポートします。

  • Split - Spring Batchのものと同等。
  • Partitioning - Spring Batchと概念的には同等だが微妙に実装は異なる。

1.9.1. Partitioning

概念的には、JSR-352のパーティショニングはSpring Batchと同等です。処理対象の入力の識別のためにメタデータが各スレーブに渡され、スレーブはマスターに処理結果を返します。しかし、いくつか重要な違いがあります。

  • Partitioned Batchlet - 複数スレッドで複数インスタンスBatchletを動かす。各インスタンスJSLもしくはPartitionPlanのプロパティをそれぞれ固有で持つ。
  • PartitionPlan - Spring Batchのパーティショニングでは、ExecutionContextを各パーティションに渡します。JSR-352では、単一のjavax.batch.api.partition.PartitionPlanメタデータProperties配列と共に各パーティションに渡します。
  • PartitionMapper - JSR-352はパーティションメタデータの生成に2種類の方法があります。1つ目はJSL (partition properties)です。2つ目はjavax.batch.api.partition.PartitionMapperの実装です。機能的には、このインタフェースはSpring Batchのorg.springframework.batch.core.partition.support.Partitionerと、パーティショニングのメタデータをプログラム的に生成する、という点で似ています。
  • StepExecutions - Spring Batchでは、パーティションstepはmaster/slaveで動作します。JSR-352でも同一設定で動作します。ただし、スレーブのstepはofficial StepExecutionsを取得しません。このため、JsrJobOperator#getStepExecutions(long jobExecutionId)はマスターにだけStepExecutionを返します。

※ 子StepExecutionsはジョブリポジトリには存在し、JobExplorerおよびSpring Batch Adminを介して利用可能です。

  • Compensating logic - Spring Batchがstepでパーティショニングのmaster/slaveを実装する場合、StepExecutionListenersで何らかの補正処理を実行可能です。しかし、JSR-352のslaveは、エラー発生時の補正処理と、動的なexit status設定が可能なように、コンポーネントのコレクションを渡します。コンポーネントは以下の通りです。
Artifact Interface Description
javax.batch.api.partition.PartitionCollector スレーブstepからマスターに情報を送り返す手段の提供。スレーブスレッドごとに1インスタンス
javax.batch.api.partition.PartitionAnalyzer PartitionCollectorが収集する情報と完了パーティションの結果ステータスを受け取るエンドポイント。
javax.batch.api.partition.PartitionReducer パーティションstepの補正ロジックの提供。

1.10. Testing

JSR-352ベースのjobはすべて非同期実行なため、job完了の確認が困難です。テスト用に、Spring Batchはorg.springframework.batch.test.JsrTestUtilsを提供します。このユーティリティークラスは、jobの開始・jobのリスタート・完了待ち、が出来ます。jobが完了するとJobExecutionを返します。

*1:あんま興味無いセクションなんで他にまして訳がテキトーな点に注意してください。

*2:The base context is not processed by the JSR-352 processors for things like property injection so no components requiring that additional processing should be configured there. が原文

*3:the devil is in the details. が原文。ぐぐれば分かるが「思わぬところに落とし穴」という意味合いの慣用句。オサレに訳せなかったんで直訳

*4:正確に訳すのなら「代替プロパティ」とかになるんだろうけど。まぁええわ。

*5:よくわからんかった