kagamihogeの日記

kagamihogeの日記です。

spring-batchでJDBCのキーブレイク

背景

Javaでキーブレイク*1を意識することはあまり無い。JPAならOneToManyで自動的にコレクションにマッピングされるし、JSONXMLでも同様である。

今回、spring-batchでO/Rをとある事情で使えずSQLを直接使用、かつ、キーブレイクが必要になった。その実現方法について考える。こちらにある通り、SingleItemPeekableItemReaderとtaskletを組み合わせれば可能だが、今回はchunkで何とかする方法について。

基本的な考え方はこちらと同様にSingleItemPeekableItemReaderでキーブレイクを実現する。このときdelegate先はJdbcCursorItemReaderにする。そして、一回のItemReader#read内でキーブレイクに達するまでJDBCカーソルを回し続ける。これにより、ItemReader#readがキーブレイク単位のオブジェクトを返すようになる。

spring-batchのchunkの考え方からするとちょっと気持ち悪いが……とりえあず実装を見ていく。

実装

plugins {
  id 'org.springframework.boot' version '2.3.4.RELEASE'
  id 'io.spring.dependency-management' version '1.0.10.RELEASE'
  id 'java'
}
group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'
configurations {
  compileOnly {
    extendsFrom annotationProcessor
  }
}
repositories {
  mavenCentral()
}
dependencies {
  implementation 'org.springframework.boot:spring-boot-starter-batch'
  implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
  compileOnly 'org.projectlombok:lombok'
  runtimeOnly 'com.h2database:h2'
  annotationProcessor 'org.projectlombok:lombok'
  testImplementation('org.springframework.boot:spring-boot-starter-test') {
    exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
  }
  testImplementation 'org.springframework.batch:spring-batch-test'
}
test {
  useJUnitPlatform()
}

サンプルとして、1:nのテーブルからデータを取得し、親側のキーでキーブレイクする処理、を考える。

エンティティ的にはこんな感じ。このサンプルではテーブルを自動生成するためだけにしか使わない。

@Entity
@Data
public class Sample {
    @Id
    Long id;
    String value;

    @OneToMany
    @JoinColumn(name = "sampleId")
    List<SampleDetail> details;
}
@Entity
@Data
public class SampleDetail {
    @Id
    SampleDetailId id;
    String value;
}
@SuppressWarnings("serial")
@Embeddable
public class SampleDetailId implements Serializable {
    Long id;
    Long sampleId;
}

データ取得のために実行するSQLはこんな感じ。

select s.id, d.id, d.value 
from sample s 
join sample_detail d on s.id = d.sample_id 
order by s.id

以下はこのエントリの核となるキーブレイクするreader。

import javax.sql.DataSource;

import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemStreamReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.support.SingleItemPeekableItemReader;

public class JdbcCursorKeyBreakRreader implements ItemStreamReader<KeyBreakItem> {
    SingleItemPeekableItemReader<SampleRow> peekableReader = new SingleItemPeekableItemReader<>();

    public JdbcCursorKeyBreakRreader(DataSource dataSource) {
        JdbcCursorItemReader<SampleRow> jdbcCursor = new JdbcCursorItemReader<>();
        jdbcCursor.setDataSource(dataSource);
        jdbcCursor.setName("jdbcCursor");
        jdbcCursor.setSql("select s.id, d.id, d.value from sample s join sample_detail d on s.id = d.sample_id order by s.id");
        jdbcCursor.setRowMapper((rs, rowNum) -> new SampleRow(rs.getLong(1), rs.getLong(2), rs.getString(3)));

        peekableReader.setDelegate(jdbcCursor);
    }

    @Override
    public void open(ExecutionContext executionContext) throws ItemStreamException {
        peekableReader.open(executionContext);
    }

    @Override
    public void update(ExecutionContext executionContext) throws ItemStreamException {
        peekableReader.update(executionContext);
    }

    @Override
    public void close() throws ItemStreamException {
        peekableReader.close();
    }

    KeyBreakItem data = null;

    @Override
    public KeyBreakItem read()
            throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
        data = new KeyBreakItem();

        while (peekableReader.peek() != null) {
            SampleRow current = peekableReader.read();
            SampleRow next = peekableReader.peek();

            // chunkアイテムに各行の内容を反映させる
            data.setId(current.getId());
            data.getDetailIds().add(current.getDetailId());

            if (next != null) {
                // キーブレイク判定処理
                if (!current.getId().equals(next.getId())) {
                    return data;
                }
            } else {
                return data;
            }
        }

        return null;
    }
}

キーブレイク判定処理がtrueの間はJDBCカーソルから読み取り続け、falseになったらItemReader#readとしてchunkアイテムを返す。また、読み取った各行の内容を最終的な戻り値となる予定の変数に反映する。

説明用に汎用性は無くしている。といっても、SQLとかマッピングやキーブレイク判定とかを関数化して渡せるようにすれば良い程度だが。

上のJdbcCursorItemReaderRowMapperが返すクラスはこんな感じ。

@Data
@AllArgsConstructor
@NoArgsConstructor
public class SampleRow {
    Long id;
    Long detailId;
    String value;
}

chunkアイテムのItemReader#readが返すクラスはこんな感じ。

@Data
@AllArgsConstructor
@NoArgsConstructor
public class KeyBreakItem {
    Long id;
    List<Long> detailIds = new ArrayList<>();
}

サンプル実行するためのjob定義はこんな感じ。

@EnableJpaRepositories
@EnableBatchProcessing
@SpringBootApplication
public class Application {
    @Bean
    public Job job(JobBuilderFactory jobs, @Qualifier("myjobstep1") Step s1) {
        return jobs.get("demo-batch-job").incrementer(new RunIdIncrementer()).start(s1).build();
    }

    @Bean(name = "myjobstep1")
    public Step step1(StepBuilderFactory steps, DataSource dataSource) {
        return steps
                .get("myjobstep1")
                .<KeyBreakItem, KeyBreakItem>chunk(2)
                .reader(new JdbcCursorKeyBreakRreader(dataSource)).writer(items -> {
                    items.forEach(item -> System.out.println(item));
                    System.out.println("=== write ===");
                })

                .build();
    }

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

サンプルデータ。src/main/resources/data.sql

insert into sample (id, value) values (1 , 'a');
insert into sample (id, value) values (2 , 'a');
insert into sample (id, value) values (3 , 'a');
insert into sample (id, value) values (4 , 'a');
insert into sample (id, value) values (5 , 'a');

insert into sample_detail (id, sample_id, value) values (1 , 1,  'a');
insert into sample_detail (id, sample_id, value) values (2 , 1,  'b');
insert into sample_detail (id, sample_id, value) values (3 , 1,  'c');
insert into sample_detail (id, sample_id, value) values (4 , 2,  'a');
insert into sample_detail (id, sample_id, value) values (5 , 2,  'b');
insert into sample_detail (id, sample_id, value) values (6 , 2,  'c');
insert into sample_detail (id, sample_id, value) values (7 , 3,  'a');
insert into sample_detail (id, sample_id, value) values (8 , 3,  'b');
insert into sample_detail (id, sample_id, value) values (9 , 3,  'c');
insert into sample_detail (id, sample_id, value) values (10 , 4,  'a');
insert into sample_detail (id, sample_id, value) values (11 , 4,  'b');
insert into sample_detail (id, sample_id, value) values (12 , 4,  'c');
insert into sample_detail (id, sample_id, value) values (13 , 5,  'a');
insert into sample_detail (id, sample_id, value) values (14 , 5,  'b');
insert into sample_detail (id, sample_id, value) values (15 , 5,  'c');

実行時の様子。chunk(2)にしているので、キーブレイク2回ごとにwriterが実行されているのがわかる。

KeyBreakItem(id=1, detailIds=[1, 2, 3])
KeyBreakItem(id=2, detailIds=[4, 5, 6])
=== write ===
KeyBreakItem(id=3, detailIds=[7, 8, 9])
KeyBreakItem(id=4, detailIds=[10, 11, 12])
=== write ===
KeyBreakItem(id=5, detailIds=[13, 14, 15])
=== write ===

spring-securityでログイン画面無しの認証

spring securityで独自の認証を実装する | エンジニアっぽいことを書くブログで紹介されているようにAbstractPreAuthenticatedProcessingFilterが使える。このクラスは、javadocによると「外部の認証システムがヘッダーやcookieなどのリクエスト経由で認証情報を渡し、それからpre-authenticationがその情報を基に、自システム固有のユーザ情報を取得する」といった役割を持つ。

ログイン画面が無い場合のよくある方法としては、認証トークンなどをヘッダーやcookieに載せる。それを受け取って、認証トークンの検証をしたり、DBから追加のユーザ情報を取得したり、といった処理をする。以下はそれをspring-securityで実装する方法について述べる。

ソースコード

build.gradle

plugins {
  id 'org.springframework.boot' version '2.3.2.RELEASE'
  id 'io.spring.dependency-management' version '1.0.9.RELEASE'
  id 'java'
}
group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'
configurations {
  compileOnly {
    extendsFrom annotationProcessor
  }
}
repositories {
  mavenCentral()
}
dependencies {
  implementation 'org.springframework.boot:spring-boot-starter-security'
  implementation 'org.springframework.boot:spring-boot-starter-web'
  compileOnly 'org.projectlombok:lombok'
  developmentOnly 'org.springframework.boot:spring-boot-devtools'
  annotationProcessor 'org.projectlombok:lombok'
  testImplementation('org.springframework.boot:spring-boot-starter-test') {
    exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
  }
  testImplementation 'org.springframework.security:spring-security-test'
}
test {
  useJUnitPlatform()
}

AbstractPreAuthenticatedProcessingFilter

AbstractPreAuthenticatedProcessingFilterはリクエストからprincipalとcredentialを取得するabstractメソッドの実装が必須となる。これらはヘッダーなりcookieなりから取得する。以下はサンプルとして単純にリクエストパラメータから取得している。

   static class TokenPreAuthenticatedProcessingFilter extends AbstractPreAuthenticatedProcessingFilter {
        @Override
        protected Object getPreAuthenticatedPrincipal(HttpServletRequest request) {
            return request.getParameter("token");
        }

        @Override
        protected Object getPreAuthenticatedCredentials(HttpServletRequest request) {
            return "";
        }
    }

上記メソッドで認証情報取得後、基本的には、PreAuthenticatedAuthenticationProviderに委譲する。後述するが、このプロバイダには認証情報が渡されるので、トークンの検証をしたり、自システムのユーザ情報などの取得をする。

また、一旦プロバイダの認証処理が通過すると、フィルタの以上の処理はスキップするようになる。

PreAuthenticatedAuthenticationProviderとAuthenticationUserDetailsService

上述の通り、プロバイダは認証情報を基にユーザ情報などの取得をする。このプロバイダにユーザ情報取得処理を行うAuthenticationUserDetailsServiceを設定する。このインタフェースは、ユーザ情報に相当するUserDetailsを返すloadUserDetails(PreAuthenticatedAuthenticationToken token)メソッドを持つ。

   @Bean
    PreAuthenticatedAuthenticationProvider tokenProvider() {
        PreAuthenticatedAuthenticationProvider provider = new PreAuthenticatedAuthenticationProvider();
        provider.setPreAuthenticatedUserDetailsService((PreAuthenticatedAuthenticationToken token) -> {
            System.out.println("Principal:" + token.getPrincipal());
            System.out.println("Credential:" + token.getCredentials());
            // ここでトークン(token.getPrincipal())を使用し、ユーザ情報をどこかからか取得する。
            
            User user = new User("hoge", "", Collections.emptyList());
            return user;
        });
        return provider;
    }

上記コードでは具体的なユーザ情報取得処理はなんも書いてない。が、本来はDBなり認証サーバなりでユーザ情報を取得する。必要に応じて認可情報も取得してUserに設定する。上記ではspring-securityのorg.springframework.security.core.userdetails.Userにしているが、必要に応じて拡張が必要になるケースもあると思う。

実行

適当なrestcontrollerを用意して実行してみる。

@RestController
public class MyRestController {

    @GetMapping("/hoge")
    public String hoge() {
        System.out.println("/hoge");
        return "hoge";
    }
}
Principal:hogeToken
Credential:
/hoge
/hoge

といった感じでpre-authenticationフィルタは最初のみ通過しているのがわかる。

ソースコード全体

上記をすべてのっけたconfigのコード全体。説明のためbeanをぜんぶ単一クラスにまとめたが、実際には必要に応じてクラスに分割すると思われる。

import java.util.Collections;

import javax.servlet.http.HttpServletRequest;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.annotation.web.builders.HttpSecurity;
import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
import org.springframework.security.config.annotation.web.configuration.WebSecurityConfigurerAdapter;
import org.springframework.security.core.userdetails.User;
import org.springframework.security.web.authentication.preauth.AbstractPreAuthenticatedProcessingFilter;
import org.springframework.security.web.authentication.preauth.PreAuthenticatedAuthenticationProvider;
import org.springframework.security.web.authentication.preauth.PreAuthenticatedAuthenticationToken;

@Configuration
@EnableWebSecurity
public class SecurityConfig extends WebSecurityConfigurerAdapter {
    @Override
    protected void configure(HttpSecurity http) throws Exception {
        AbstractPreAuthenticatedProcessingFilter tokenFilter = new TokenPreAuthenticatedProcessingFilter();
        tokenFilter.setAuthenticationManager(authenticationManager());
        //WebSecurityConfigurerAdapterのデフォルトのauthenticationManagerを設定している。
        //このクラスはProviderManagerで、認証処理をAuthenticationProviderのリストに委譲する。
        
        http
            .authorizeRequests()
            .anyRequest()
            .authenticated()
            .and()
            .addFilter(tokenFilter)
        ;
    }
    
    static class TokenPreAuthenticatedProcessingFilter extends AbstractPreAuthenticatedProcessingFilter {
        @Override
        protected Object getPreAuthenticatedPrincipal(HttpServletRequest request) {
            return request.getParameter("token");
        }

        @Override
        protected Object getPreAuthenticatedCredentials(HttpServletRequest request) {
            return "";
        }
    }

    @Bean
    PreAuthenticatedAuthenticationProvider tokenProvider() {
        PreAuthenticatedAuthenticationProvider provider = new PreAuthenticatedAuthenticationProvider();
        provider.setPreAuthenticatedUserDetailsService((PreAuthenticatedAuthenticationToken token) -> {
            System.out.println("Principal:" + token.getPrincipal());
            System.out.println("credential:" + token.getCredentials());
            // ここでトークン(token.getPrincipal())を使用し、ユーザ情報をどこかからか取得する。
            
            User user = new User("hoge", "", Collections.emptyList());
            return user;
        });
        return provider;
    }
}

参考URL

このエントリでは最低限の認証以外の細かい点は省略している。実際の使用に際しては本家ドキュメントや下記参考URLの参照が必要になると思われる。

Spring Batch 4.2でメトリクスをPrometheus Pushgatewayにおくる

https://docs.spring.io/spring-batch/docs/current/reference/html/monitoring-and-metrics.html を試す。

Spring Batch 4.2はMicrometerベースのメトリクスを自動的に収集する。なので、プロパティでそのメトリクスの送信設定をすれば、データがそちらに送られる。

ここではPrometheus Pushgatewayにメトリクスを送信する。

Prometheus Pushgatewayの準備

dockerで起動する。

docker run -d -p 9091:9091 prom/pushgateway

起動するとhttp://localhost:9091/#でpushgatewayの画面が見れる。

コード

build.gradle。spring-batchに加えて、actuator, prometheus, pushgatewayクライアントの依存性を入れる。

plugins {
  id 'org.springframework.boot' version '2.3.0.RELEASE'
  id 'io.spring.dependency-management' version '1.0.9.RELEASE'
  id 'java'
}

configurations {
  developmentOnly
  runtimeClasspath {
    extendsFrom developmentOnly
  }
  compileOnly {
    extendsFrom annotationProcessor
  }
}

repositories {
  mavenCentral()
}

dependencies {
  implementation 'org.springframework.boot:spring-boot-starter-batch'
  implementation 'org.springframework.boot:spring-boot-starter-actuator'
  implementation 'io.micrometer:micrometer-registry-prometheus'
  implementation 'io.prometheus:simpleclient_pushgateway'

  compileOnly 'org.projectlombok:lombok'
  annotationProcessor 'org.projectlombok:lombok'
  testImplementation('org.springframework.boot:spring-boot-starter-test') {
    exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
  }
  testImplementation 'org.springframework.batch:spring-batch-test'
  implementation 'com.h2database:h2'
}

test {
  useJUnitPlatform()
}

src/main/resources/application.propertiesでpushgateway関連の設定をする。とりあえず動いた、というレベルなので設定の詳細までは良く調べていない。

management.metrics.export.prometheus.pushgateway.enabled=true
management.metrics.export.prometheus.pushgateway.grouping-key.app_name=demo-batch-hoge
management.metrics.export.prometheus.pushgateway.job=demo-batch
management.metrics.export.prometheus.pushgateway.shutdown-operation=push

適当にspring-batchのアプリを作る。

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@EnableBatchProcessing
@SpringBootApplication
public class Main {

  @Bean
  public Job job(JobBuilderFactory jobs, @Qualifier("myjobstep1") Step s1) {
    return jobs.get("demo-batch-job").incrementer(new RunIdIncrementer()).start(s1).build();
  }

  @Bean(name = "myjobstep1")
  public Step step1(StepBuilderFactory steps) {
    return steps.get("myjobstep1").tasklet((contribution, chunkContext) -> {
      System.out.println("hoge");
      return RepeatStatus.FINISHED;
    }).build();
  }

  public static void main(String[] args) {
    SpringApplication.run(Main.class, args);
  }
}

実行後にhttp://localhost:9091/#を見ると以下のようなメトリクスが確認できる。

f:id:kagamihoge:20200528161608j:plain

参考文献