kagamihogeの日記

kagamihogeの日記です。

jBatchでデータロードしてみる

やること

jBatchを使用してテキストファイルの中身をOracleにロードする。

とはいえ、単なるデータロードであればワザワザjBatchを使うのはオーバーヘッドがかさむだけである。OracleSQL*Loaderや、手動ならSQL DeveloperやA5:SQL等各種のツールを使うのが手っ取り早い。

ただし、ごく単純なデータロードの実装を通して学べることも多い。このエントリの目的は、ごく単純なデータロードを通してjBatchの挙動を知ることにある。

やることとしては、jBatchを使用してOracleにデータをロードし、また、いくつかのパラメータを変更して、そのときの実行時間の変化を確認する。

環境

準備

データロード対象のテーブル

DROP TABLE hoge PURGE;
CREATE TABLE hoge 
(
  hoge_id INTEGER,
  hoge_value VARCHAR(20)
);

idとvalueだけのテーブルにデータをINSERTしていく。

前準備

ALTER SYSTEM FLUSH BUFFER_CACHE;

バッチを一回流すごとに、Oracleのバッファキャッシュをクリアしていく。

データロード対象のテキストファイ

gvMWxxfOjYsLNgsggALU
NEadKxrnnkhWtfQSmyai
AhALLooojhwCXzNrsDJk
(省略)

各行20バイトが30000行。また、partitionのところでは別途説明。

ソース

sample-job-dataload.xml

<?xml version="1.0" encoding="UTF-8"?>
<job id="sample-job-partition" xmlns="http://xmlns.jcp.org/xml/ns/javaee"
    version="1.0">

    <listeners>
        <listener ref="executionTimeJobListener"></listener>
    </listeners>

    <step id="myStep">
        <chunk>
            <reader ref="myDataloadReader"></reader>
            <processor ref="myDataloadProcessor"></processor>
            <writer ref="myDataloadWriter"></writer>
        </chunk>
        <partition>
            <plan partitions="1" threads="1">
                <properties partition="0">
                    <property name="startId" value="1" />
                    <property name="file" value="C:\\Java\\sampledataload\\input_p_1.txt" />
                    </properties>
            </plan>
        </partition>

        <end on="COMPLETED" />
    </step>
</job>

バッチの定義。あとでパーティションを試すのでこうなっている。idの初期値はプロパティで定義する。

persistence.xml

<?xml version="1.0" encoding="UTF-8"?>
<persistence version="2.1" xmlns="http://xmlns.jcp.org/xml/ns/persistence"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/persistence http://xmlns.jcp.org/xml/ns/persistence/persistence_2_1.xsd">
    <persistence-unit name="glassfish4oracle11gee"
        transaction-type="JTA">
        <jta-data-source>jdbc/oracle11gee</jta-data-source>
    </persistence-unit>
</persistence>

GlassFish4でOracle11gXEのJDBC接続をつくる とか参照にGlassFishにデータソースの設定してあるものとする。

SampleDataloadReader

@Dependent
@Named("myDataloadReader")
public class SampleDataloadReader implements ItemReader {

    private BufferedReader br;

    @Inject
    JobContext jobCtx;

    @Override
    public void open(Serializable checkpoint) throws Exception {
        long execID = jobCtx.getExecutionId();
        Properties parameters = BatchRuntime.getJobOperator().getParameters(execID);
        String inputFile = parameters.getProperty("file");
        
        br = Files.newBufferedReader(Paths.get(inputFile), Charset.defaultCharset());
    }

    @Override
    public Object readItem() throws Exception {
        String line = br.readLine();
        if (line == null) {
            return null;
        }
        return line;
    }

    @Override
    public void close() throws Exception {
        br.close();
    }
    
    @Override
    public Serializable checkpointInfo() throws Exception {
        return null;
    }
}

reader. テキストファイルからデータ読んでるだけ。

SampleDataloadProcessor

@Dependent
@Named("myDataloadProcessor")
public class SampleDataloadProcessor implements ItemProcessor {

    @Override
    public Object processItem(Object item) throws Exception {
        return item;
    }
}

processor. 処理はなんも無し。

SampleDataloadWriter

@Dependent
@Named("myDataloadWriter")
public class SampleDataloadWriter implements ItemWriter {

    @PersistenceContext(unitName = "glassfish4oracle11gee")
    private EntityManager em;

    @Inject
    JobContext jobCtx;
    
    private int hogeId = 0;
    
    private int count = 0;

    @Override
    public void open(Serializable checkpoint) throws Exception {
        long eId = jobCtx.getExecutionId();
        String t = BatchRuntime.getJobOperator().getParameters(eId).getProperty("startId");
        hogeId =  Integer.parseInt(t);
    }

    @Override
    public void writeItems(List<Object> items) throws Exception {
        for (Object o : items) {
            Hoge newRecord = new Hoge(hogeId, (String)o);
            em.persist(newRecord);
            
            hogeId++;
            count++;
        }
    }

    @Override
    public void close() throws Exception {
        System.out.println("# insert count=" + count);
    }

    @Override
    public Serializable checkpointInfo() throws Exception {
        return null;
    }
}

INSERTを実行する。コミットはjBatch側で勝手にやってくれる。

StartDataload

@Path("/startdataload")
public class StartDataload {
    
    @GET
    public void go() {
        JobOperator job = BatchRuntime.getJobOperator();

        long id = job.start("sample-job-dataload", null);
        System.out.println("id = " + id);
    }
}

jBatchをキックするためのエントリポイント。ArquillianでもServletでも何でも良いが今回はJAX-RSにした。http://localhost:8080/jbatchsample/rest/startdataloadのURLでキックする。

MyApplication

@ApplicationPath("/rest")
public class MyApplication extends Application {

}

JAX-RS用の設定クラス。

ExecutionTimeJobListener

@Dependent
@Named("executionTimeJobListener")
public class ExecutionTimeJobListener implements JobListener {

    private long start = 0;
    
    @Override
    public void beforeJob() throws Exception {
        start = System.currentTimeMillis();

    }

    @Override
    public void afterJob() throws Exception {
        long end = System.currentTimeMillis();
        System.out.println(end - start);
    }

}

実行時間の計測用クラス。jBatchのジョブレベルリスナーで、ジョブの開始時と終了時のSystem.currentTimeMillis()の差を取っている。

実行時間

パターン 1 2 3
なし 166647 179543 175293

これを基準値として、各種パラメータ変更後の動作と比較する。

パラメータ変更

実行時間に影響が出そうなパラメータなどを変更し、差が出るかどうかを確認してみる。

コミット間隔

jBatchを使用しているので、厳密に言うとchunkのitem-countである。ただ、今回はOracleが前提なので実質的にはコミット間隔と同義である。

item-countのデフォルト値は10なので、これを100に変更してみる。

<chunk item-count="100">
パターン 1 2 3
なし 166647 179543 175293
item-count="100" 28317 28825 27947

おおむね6~7倍の差が出ている。伝統的なセオリー通り、このテのバッチ処理ではコミット間隔の調整が有効なことが伺える。

バッチ更新

JPAからJDBCのバッチ更新を使う などにある通り、JDBCのバッチ更新を使用してクエリを一度に送り込む量を増やし、それによって性能改善が見込めるケースがある。

persistence.xmlにEclipseLink用のバッチ更新用の設定を追加する。なお、コミット間隔は100のままで実行時間を計測した。

    <persistence-unit name="glassfish4oracle11gee"
        transaction-type="JTA">
        <jta-data-source>jdbc/oracle11gee</jta-data-source>
        <properties>
            <property name="eclipselink.jdbc.batch-writing" value="jdbc"/>
            <property name="eclipselink.jdbc.batch-writing.size" value="100"/>
        </properties>
    </persistence-unit>
パターン 1 2 3
なし 166647 179543 175293
item-count="100" 28317 28825 27947
item-count="100", batch-writing.size=100 16002 16710 16880

partition

jBatchのpartitionを使用し、マルチスレッドで処理を行う。

jBatchのpartition数を3、スレッド数を3に設定する。簡略化のため、処理対象ファイル30000件は10000 * 3ファイルにあらかじめ分割しておく。

        <partition>
            <plan partitions="3" threads="3">
                <properties partition="0">
                    <property name="startId" value="1" />
                    <property name="file" value="C:\\Java\\sampledataload\\input_p_1.txt" />
                </properties>
                <properties partition="1">
                    <property name="startId" value="10001" />
                    <property name="file" value="C:\\Java\\sampledataload\\input_p_2.txt" />
                </properties>
                <properties partition="2">
                    <property name="startId" value="20001" />
                    <property name="file" value="C:\\Java\\sampledataload\\input_p_3.txt" />
                </properties>
            </plan>
        </partition>
パターン 1 2 3
なし 166647 179543 175293
partitions="3" 148529 148020 158559

若干ではあるが改善が見られたが、これは環境依存が大きいと思われる。Oracle側のプロセスがスレッド数と同じだけ確保できなければ意味が無いので、そこはアプリケーションサーバのコネクションプーリングに依存する。

また、表内の行順序はインデックス範囲スキャンと深い関連があるため、パーティションの分割でデータロードの時間だけが早くなっても意味が無い場合もありえる。参考:表の行順序とインデックス範囲スキャン

実行時間のまとめ

パターン 1 2 3
なし 166647 179543 175293
item-count="100" 28317 28825 27947
item-count="100", batch-writing.size=100 16002 16710 16880
partitions="3" 148529 148020 158559
ぜんぶ 14249 15237 13272

上記表の「ぜんぶ」は、コミット間隔・バッチ更新・パーティションを適用したもの。なんも指定無しの1/10くらいにはなっていることが伺える。

ソースコード

https://github.com/kagamihoge/jbatchsample