想定する処理としては、下記のようなごく単純な訪問ログにおける時刻の出現回数のサマリを集計処理を考える。このログファイルが複数拠点から送られるので、それらすべてを集計するもんとする。
15:28 08:36 18:57 01:58 14:43
環境
ソースコードとか
ジョブXML
入力ファイルとかが置かれるディレクトリはジョブレベルプロパティで指定する。
一つのstepが存在し、chunkとreducerが定義してある。これらの詳細は後述。
partitionの設定は静的・動的の二種類ある。まず、静的な指定方法を試す。下記は、静的にパーティーション数・スレッド数・各パーティーションごとのプロパティ(ここでは入力ファイル)を設定している。
<?xml version="1.0" encoding="UTF-8"?> <job id="sample-job-partition" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0"> <properties> <property name="input_dir" value="C:\\Java\\sampleinput" /> <property name="output_dir" value="C:\\Java\\sampleoutput" /> </properties> <step id="myStep"> <chunk> <reader ref="mySampleReader"></reader> <processor ref="mySampleProcessor"></processor> <writer ref="mySampleWriter"></writer> </chunk> <partition> <plan partitions="2" threads="2"> <properties partition="0"> <property name="file" value="input1.txt" /> </properties> <properties partition="1"> <property name="file" value="input2.txt" /> </properties> </plan> <reducer ref="myReducer" /> </partition> <end on="COMPLETED"/> </step> </job>
partitionの設定を動的にする場合、mapperを定義する。参照先のmyPartitionMapperについては後述。
<partition> <mapper ref="myPartitionMapper"/> </partition>
PartitionMapper
パーティーション数やスレッド数、パーティションごとのプロパティを動的に設定する場合はPartitionMapper
を実装し、それをmapper
に指定する。
実際にはgetPartitions
等で動的に値を返すコードを書くことになると思われる。ここでは単純に、静的の場合と同等な設定を返すようにしておく。
@Dependent @Named("myPartitionMapper") public class SamplePartitionMapper implements PartitionMapper { @Override public PartitionPlan mapPartitions() throws Exception { return new PartitionPlanImpl() { @Override public int getPartitions() { return 2; } @Override public int getThreads() { return 2; } @Override public Properties[] getPartitionProperties() { int partitions = getPartitions(); Properties[] props = new Properties[partitions]; for (int i = 0; i < partitions; i++) { props[i] = new Properties(); props[i].put("file", "input" + (i+1) + ".txt"); } return props; } }; } }
ItemReader
ジョブXMLまたはmapper
で指定されるプロパティからファイル名を取得し、そこからの読み込み処理を書く。
各パーティションごとのパラメータ(ここでは入力ファイル)をする取得コードは、静的・動的に関わらず同じコードでOK
@Dependent @Named("mySampleReader") public class SampleReader implements ItemReader { private BufferedReader br; @Inject JobContext jobCtx; @Override public void open(Serializable checkpoint) throws Exception { JobOperator jobOperator = BatchRuntime.getJobOperator(); long execID = jobCtx.getExecutionId(); Properties parameters = jobOperator.getParameters(execID); String inputDir = jobCtx.getProperties().getProperty("input_dir");; String inputFile = parameters.getProperty("file"); System.out.println("## execID" + execID + " " + inputDir + " " + inputFile); br = Files.newBufferedReader(Paths.get(inputDir, inputFile), Charset.defaultCharset()); } @Override public Object readItem() throws Exception { String line = br.readLine(); if (line == null || line.length() <= 0) { return null; } return line; } @Override public void close() throws Exception { br.close(); } @Override public Serializable checkpointInfo() throws Exception { return null; } }
ItemProcessor
各行がHH:MM
フォーマットなので:
でスプリットするだけ。
@Dependent @Named("mySampleProcessor") public class SampleProcessor implements ItemProcessor { @Override public Object processItem(Object item) throws Exception { return ((String) item).split(":")[0]; } }
ItemWriter
読み込んだファイルの時刻の出現回数をカウントアップし、終了時にパーティションごとの中間結果をファイルへ出力する。
本来的には、writeItems
は何らかの永続化機構に書き込むことを想定されていると思われる。が、ここではただ単純にファイルに書き出すだけである。
今回くらいの場合、ワザワザ中間結果作らなくても良いんだけど、サンプルなので。
@Dependent @Named("mySampleWriter") public class SampleWriter implements ItemWriter { @Inject JobContext jobCtx; private Map<String, Integer> result; @Override public void open(Serializable checkpoint) throws Exception { result = new HashMap<>(); for (int i = 0; i < 24; i++) { result.put(String.format("%02d", i), 0); } } @Override public void writeItems(List<Object> items) throws Exception { for (Object i : items) { String time = (String) i; Integer newCount = result.get(time) + 1; result.put(time, newCount); } System.out.println("## execID=" + jobCtx.getExecutionId() + " writes:" + items.size()); } @Override public void close() throws Exception { long execID = jobCtx.getExecutionId(); try (BufferedWriter bw = Files.newBufferedWriter(Paths.get(jobCtx.getProperties().getProperty("output_dir"), execID + ".txt"), Charset.defaultCharset())) { for (Map.Entry<String, Integer> e : result.entrySet()) { bw.write(e.getKey() + ":" + e.getValue()); bw.newLine(); } } } @Override public Serializable checkpointInfo() throws Exception { return null; } }
PartitionReducer
各パーティションごとの中間結果をまとめて最終的な集計結果を作成する。結果はただ単に標準出力に出すだけ。また、中間結果ディレクトリ内のファイルをすべて削除する。
@Dependent @Named("myReducer") public class SamplePartitionReducer implements PartitionReducer { @Inject JobContext jobCtx; @Override public void beginPartitionedStep() throws Exception { System.out.println("beginPartitionedStep"); } @Override public void beforePartitionedStepCompletion() throws Exception { System.out.println("beforePartitionedStepCompletion"); final HashMap<String, Integer> summary = new HashMap<>(); for (int i = 0; i < 24; i++) { summary.put(String.format("%02d", i), 0); } FileVisitor<Path> visitor = new SimpleFileVisitor<Path>() { @Override public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { try (BufferedReader br = Files.newBufferedReader(file, Charset.defaultCharset());) { String line; while ((line = br.readLine()) != null) { String[] r = line.split(":"); summary.put(r[0], summary.get(r[0]) + Integer.parseInt(r[1])); } } return FileVisitResult.CONTINUE; } }; Files.walkFileTree(Paths.get(jobCtx.getProperties().getProperty("output_dir")), visitor); System.out.println(summary); } @Override public void afterPartitionedStepCompletion(PartitionStatus status) throws Exception { FileVisitor<Path> visitor = new SimpleFileVisitor<Path>() { @Override public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { Files.delete(file); return FileVisitResult.CONTINUE; } }; Files.walkFileTree(Paths.get(jobCtx.getProperties().getProperty("output_dir")), visitor); System.out.println("afterPartitionedStepCompletion"); } @Override public void rollbackPartitionedStep() throws Exception { System.out.println("rollbackPartitionedStep"); } }
実行の様子
各パーティーションがそれぞれのファイルの集計を中間結果にまとめ、reducerでその中間結果を集約しているログが表示されている。
情報: id = 155 情報: beginPartitionedStep 情報: ## execID=156 C:\\Java\\sampleinput input1.txt 情報: ## execID=157 C:\\Java\\sampleinput input2.txt 情報: ## execID=156 writes:10 情報: ## execID=157 writes:10 情報: ## execID=156 writes:8 情報: ## execID=157 writes:10 情報: ## execID=157 writes:4 情報: beforePartitionedStepCompletion 情報: {08=3, 09=0, 19=3, 22=3, 17=1, 04=1, 23=2, 18=5, 05=2, 15=3, 06=2, 16=0, 07=2, 13=1, 00=1, 14=1, 01=3, 02=1, 11=3, 12=0, 03=1, 21=2, 20=2, 10=0} 情報: afterPartitionedStepCompletion
ハマったこととか
setTransientUserDataによるデータの受け渡しは同一スレッド間
jBatchを使用して悩んだのはstep間でどうデータを受け渡すのか、だった。
今回のケースではJobContext#setTransientUserData
があるのでコレを使えばよいかと思いきや、JobContextのインスタンスはスレッドごとである。パーティションのスレッドと、PartitionReducer
が動作するスレッドは異なるので、setTransientUserData
ではインスタンスを共有できない。
なので、中間ファイルを経由させることにした。