kagamihogeの日記

kagamihogeの日記です。

Spring Batchのstep間データ共有

Spring Batchでstep間でデータを共有する方法について。機能的に豊富では無いが、そこはSpring Batchとして積極的に提供する機能では無い、という設計判断なのだと思う。

以下ではそのやり方について述べる。

JobのExecutionContext

JobのExecutionContextに小規模なデータを保存し、step間でこれを共有する。

この方法の特徴は、メタデータのテーブル、デフォルトではBATCH_JOB_EXECUTION_CONTEXTのカラムに保存される。単なるカラム(例:OracleだとCLOB)にJSONシリアライズした文字列が保存される。POJOを保存可能。

このため、小規模データや中間データの参照情報に向いている。逆に、あまり巨大な中間データを保存するのには向いていない。もし中間データをメタデータにstep実行履歴として永続化したとして、「中間」の性質上後で使うことはほぼ無い*1。後で使う例としては、jobの途中のstepからリスタートしたいケースがあるが、これも中間テーブル等で対応できる。

デメリットが目をつむれる程度の小規模データか、後述する中間データの参照情報を保存、というのがこれの基本的な使い方になる、と思われる。

インメモリ

単にプログラム内の変数でstep間でデータを共有する。

@Compopent

@Component
public class IntermediateData  {
  Map<String, SampleData> sharedData;

中間データを持つ適当な@Componentを用意し、複数のstep間で@Autowiredして共有する。

ただの@Compopentなので手軽に使え、メモリの許す限りに大きいデータを持つことも可能。

ただし、インメモリでシングルトンな点に注意が必要。jobが失敗すれば中間データは失われるため、設計上step処理途中からリスタートが必要な場合には使えない。また、シングルトンなのでもしマルチスレッド処理があり得るならその考慮が必要。加えて、jobを複数同時に起動する場合、シングルトンインスタンスに同時にアクセスする問題がある。これは後述の@JobScopeで解決可能。

@Compopent + @JobScope

@Component
@JobScope
public class IntermediateData  {
  Map<String, SampleData> sharedData;

あるbeanのスコープをjob実行単位にしたい場合は@JobScopeを使用する。

中間データを外部保存

適当な永続化機構に中間データを保存する。

伝統的には中間ファイルや中間テーブルがある。他にも、組み込みDBやRedisなどKVS、RabbitMQなどのキューなど、環境に応じて様々な選択肢がある。

もし処理データ対象や範囲指定がある場合、ExecutionContextや@Compopentなどを使用する。中間テーブル名や中間ファイルのパス、処理対象データのインデックス範囲、などをそこで持つ。

中間データを使わない

中間データはなるべく無い方が望ましい。特に、失敗時のリスタートの設計考慮事項、例えば中間データが残らないようにする等、が増えてしまう。バグの元だし、オペレーションが複雑化して事故の元にもなりやすい。

個人的には、Spring Batchのstepに気をとられて分割し過ぎない方が良いように思う。たとえば、RDBだと複数stepのループ処理をSQLだけで済ませられないか、を検討してみるとか。

参照

*1:履歴を残したいならそれ用のテーブル等を用意すると思う

Spring Cloud Config + Spring Cloud Busでプロパティ自動更新

http://kagamihoge.hatenablog.com/entry/2020/02/14/091501 ではSpring Cloud Configを試した。問題点としてプロパティの更新がある。/actuator/refreshで1つのclientのリフレッシュは出来るが、100個あったら100回送信しなければならない。その解決方法について。

なおソースコードhttp://kagamihoge.hatenablog.com/entry/2020/02/14/091501 をベースにしているので、適宜参照のこと。

RabbitMQ経由でclientからclientに更新通知

あるclientにPOSTでリフレッシュ要求を送信すると、RabbigMQを介して別のclientに対してリフレッシュ要求が送信される。これを実現するためにSpring Cloud Busを使用する。

clientのbuild.gradleに依存性を追加する。

   implementation 'org.springframework.cloud:spring-cloud-starter-bus-amqp'

clientのプロパティファイルを修正する。RabbitMQの接続設定を追加。actuatorのエンドポイントを全部開放しておく。

management.endpoints.web.exposure.include=*

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

RabbigMQを起動する。

docker pull rabbitmq:3-management
docker run -d --hostname my-rabbit --name some-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management

この状態で、gitにプロパティファイルを適当に編集してpushしたあと、http://localhost:8080/actuator/bus-refresh にPOSTする。なお、動作を確認するにはclientを2つ以上立ち上げておくと良い。片方のclientの/actuator/bus-refreshをすると、もう片方のclientでも更新後のプロパティが参照できる。

RabbitMQ経由でserverからclientに更新通知

次に、serverの変更をRabbitMQ経由でclientに通知する方法。serverのリフレッシュ通知エンドポイントを叩くと、RabbitMQを介してclientに更新通知が行く、という仕組み。これの実現には、6. Push Notifications and Spring Cloud Busという、spring-cloud-config-monitorとSpring Cloud Busの組み合わせで行う。

severに依存性を追加する。

   implementation 'org.springframework.cloud:spring-cloud-config-monitor'
    implementation 'org.springframework.cloud:spring-cloud-starter-stream-rabbit'

severのプロパティファイルにRabbigMQの設定を追加する。

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

この状態で、gitにプロパティファイルを適当に編集してpushしたあと、http://localhost:8888/monitor に以下のJSONをPOSTする。

{"path": "sample"}

とりあえず動かしたいならpath*でも良い。デフォルトではアプリケーション名(spring.application.name)にマッチするみたい。

以上で、git push後にserverの http://localhost:8888/monitor にPOSTすればclientに更新が行く仕組みができる。

webhookで/monitorを叩く

これは実際に試してないけど。webhookでプロパティファイル更新をフックして/monitorを叩けば、gitをトリガーにしたプロパティ更新が可能になる、という仕組みらしい。

参考

Spring Cloud Configをためす

Spring Cloud Config を試す。

gitリポジトリの準備

適当なローカルディレクトリとか、GitHubとか、http://kagamihoge.hatenablog.com/entry/2020/02/13/095145 とか、なんでも良いが適当なgitリポジトリを準備する。

プロパティファイルとして/sample.propertiesというファイルをpushしておく。中身はごく普通のプロパティファイル。

hoge.message=sample hello config.

Spring Cloud Config Server

httpでプロパティファイルを公開するserverを準備する。Spring Initializr でDependenciesにConfig Serverを追加する。それで生成されるbuild.gradleは下記のような感じ。

plugins {
    id 'org.springframework.boot' version '2.2.4.RELEASE'
    id 'io.spring.dependency-management' version '1.0.9.RELEASE'
    id 'java'
}

group = 'kagamihoge.sample.springcloudconfig'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '1.8'

repositories {
    mavenCentral()
}

ext {
    set('springCloudVersion', "Hoxton.SR1")
}

dependencies {
    implementation 'org.springframework.cloud:spring-cloud-config-server'
    testImplementation('org.springframework.boot:spring-boot-starter-test') {
        exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
    }
}

dependencyManagement {
    imports {
        mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
    }
}

test {
    useJUnitPlatform()
}

起動用のエントリポイントを作る。@EnableConfigServerが必要。

import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.config.server.EnableConfigServer;

@EnableConfigServer
@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        new SpringApplicationBuilder(Application.class).web(WebApplicationType.SERVLET).run(args);
    }
}

serverのプロパティファイル。gitのuri, username, passwordを指定する。後述のclientも一緒に動かすのでポートも変更している。

server.port=8888
spring.cloud.config.server.git.uri=ssh://root@localhost:32768/root/config.git
spring.cloud.config.server.git.username=root
spring.cloud.config.server.git.password=a

Config Client

serverからプロパティを取得するclientをつくる。プロジェクト作成は同様にSpring Initializrを使用し、Dependenciesにweb, Config Client, actuatorを追加する。actuatorはプロパティのリフレッシュに使用するので、必要無ければ無しでも良い。

以下は生成されたbuild.gradleをそのままコピペしたもの。

plugins {
    id 'org.springframework.boot' version '2.2.4.RELEASE'
    id 'io.spring.dependency-management' version '1.0.9.RELEASE'
    id 'java'
}

group = 'kagamihoge.sample.springcloudconfig'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '1.8'

repositories {
    mavenCentral()
}

ext {
    set('springCloudVersion', "Hoxton.SR1")
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'org.springframework.cloud:spring-cloud-starter-config'
    implementation 'org.springframework.boot:spring-boot-starter-actuator'
    testImplementation('org.springframework.boot:spring-boot-starter-test') {
        exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
    }
}

dependencyManagement {
    imports {
        mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
    }
}

test {
    useJUnitPlatform()
}

プロパティ取得は今まで通りの@Valueが使える。@RefreshScopeは後述。

@SpringBootApplication
@RestController
@RefreshScope
public class Application {
    @Value("${hoge.message}")
    private String message;
    
    @RequestMapping("/")
    public String home() {
        return "Hello World!" + message;
    }
    
    public static void main(String[] args) {
        new SpringApplicationBuilder(Application.class).web(WebApplicationType.SERVLET).run(args);
    }
}

clientのプロパティにいくつかの設定をする。まずspring.cloud.config.uriにserverのurlを指定する。

spring.application.nameはアプリケーション名で、プロパティファイル名の解決にこれが使われる。ルールは https://cloud.spring.io/spring-cloud-config/reference/html/#_quick_start にあるとおり。今回はclientでspring.application.name=sampleと指定しているので{application}sampleとなり、{profile}は未指定なので空、よって/{application}-{profile}.propertiesのルールで/sample.propertiesを参照しにいく事になる。

spring.cloud.config.uri=http://localhost:8888
spring.application.name=sample
management.endpoints.web.exposure.include=refresh

これを起動して http://localhost:8080/ にアクセスするとserverからプロパティを取得して表示が行われる。

プロパティのリフレッシュ

プロパティファイル変更のリフレッシュにはそれ用のエンドポイントをたたく。

このためにはactuatorが必要で、build.gradleでactuatorの依存性とmanagement.endpoints.web.exposure.include=refreshでrefreshエンポイントを使用可能にしておく。

動作検証としてgitのsample.propertiesを適当に変更してpush後、http://localhost:8080/actuator/refreshにPOSTする。なお、@Valueのプロパティは@RefreshScopeを付与する必要がある。これで再度 http://localhost:8080/ アクセスすると変更後のプロパティが取得できる。

参考