kagamihogeの日記

kagamihogeの日記です。

spring-integrationでcommons-ioを使用したファイルのtail

Spring Integrationでファイルのtailを実現する。デフォルトではOSのtailコマンドを実行するが、設定によりOS非依存のApache Commons IOに切り替えられる。今回はそのサンプルコードについて。

ソースコードなど

build.gradle

https://start.spring.io/ を使用してspring-initegration関連を含めた依存性を作成する。

plugins {
  id 'java'
  id 'org.springframework.boot' version '3.1.4'
  id 'io.spring.dependency-management' version '1.1.3'
}

group = 'com.example'
version = '0.0.1-SNAPSHOT'

java {
  sourceCompatibility = '17'
}

repositories {
  mavenCentral()
}

dependencies {
  implementation 'org.springframework.boot:spring-boot-starter-integration'
  implementation 'org.springframework.integration:spring-integration-file'
  
  developmentOnly 'org.springframework.boot:spring-boot-devtools'
  testImplementation 'org.springframework.boot:spring-boot-starter-test'
  testImplementation 'org.springframework.integration:spring-integration-test'
}

tasks.named('test') {
  useJUnitPlatform()
}

java

import java.nio.file.Path;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.file.dsl.Files;
import org.springframework.integration.file.dsl.TailAdapterSpec;

@SpringBootApplication
public class ApacheCommonsFileTailingApplication {
  public static void main(String[] args) {
    new SpringApplicationBuilder(ApacheCommonsFileTailingApplication.class)
      .web(WebApplicationType.NONE)
      .run(args);
  }

  @Bean
  public IntegrationFlow fileReadingFlow() {
    TailAdapterSpec tailAdapter = Files.tailAdapter(Path.of("sample.txt").toFile())
        .delay(1000)
        .end(true)
        .reopen(false);

    return IntegrationFlow.from(tailAdapter).handle("sampleBean", "sampleHandle").get();
  }
}
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@Component
public class SampleBean {
  public void sampleHandle(Message<String> e) {
    System.out.println(e);
  }
}

これを実行してsample.txtを修正すると以下のような出力が得られる。

GenericMessage [payload=hogehoge, headers={file_originalFile=sample.txt, id=97e015a2-ffdf-28b5-2b3e-b0f9e8119d08, file_name=sample.txt, timestamp=1696850274578}]

基本的にはFiles.tailAdapterを使用するだけ。FileTailInboundChannelAdapterFactoryBean.java#L243 を見ると if (this.delay == null && this.end == null && this.reopen == null) のelseの場合にApacheCommonsFileTailingMessageProducerに切り替わるのが分かる。

ハマった点

windowsでデフォルト設定のFiles.tailAdapterを使用するとtailコマンドが無くてエラーになる

以下のエラーのように、デフォルト設定でwindowsだとtailコマンドが見つからなくて実行時エラーになる。

Exception in thread "SimpleAsyncTaskExecutor-1" org.springframework.messaging.MessagingException: Failed to exec tail command: 'tail -F -n 0 C:\java\workspaces\e202212\d\sp\integfilepolling\sample.txt'
    at org.springframework.integration.file.tail.OSDelegatingFileTailingMessageProducer.runExec(OSDelegatingFileTailingMessageProducer.java:136)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.io.IOException: Cannot run program "tail": CreateProcess error=2, 指定されたファイルが見つかりません。
    at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1143)
    at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1073)
    at java.base/java.lang.Runtime.exec(Runtime.java:594)
    at java.base/java.lang.Runtime.exec(Runtime.java:453)

tailはWSLでwindowsでも実行可能。ただOSDelegatingFileTailingMessageProducer.java#L97を見るとtailとべた書きしてあるのでwindowsでどうにかこうにか動かす方法は分からなかった。

commons-ioのTailerが2回実行される

こちらはspring-integrationと直接の関係は無い。ApacheCommonsFileTailingMessageProducerが裏で使用するライブラリを直接使用する場合、javadocを見てなんとなく以下のように書くとhandleが2回呼ばれてしまう。

import java.io.File;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.io.input.Tailer;
import org.apache.commons.io.input.TailerListener;
import org.apache.commons.io.input.TailerListenerAdapter;

public class SampleTailer extends TailerListenerAdapter {
  @Override
  public void handle(String line) {
    System.out.println(line);
  }
  
  @Override
  public void handle(Exception ex) {
    System.out.println(ex);
  }
  
  public static void main(String[] args) throws InterruptedException {
    TailerListener listener = new SampleTailer();
    Tailer tailer = Tailer.create(new File("sample.txt"), listener, 1000, true); 

    ExecutorService ex = Executors.newFixedThreadPool(1);
    ex.submit(tailer); 
  }
}

この原因はTailer.createの中でThread#startするため。TailerRunnableをimplementsしているので、単にこのクラスを生成して適当なExecutorServiceにsubmitで良い。

Tailer tailer = new Tailer(new File("sample.txt"), listener, 1000, true);
ExecutorService ex = Executors.newFixedThreadPool(1);
ex.submit(tailer);