kagamihogeの日記

kagamihogeの日記です。

OracleからLogstashでElasticsearchにデータおくる

OracleからLogstashを経由してElasticsearchにデータを追加する。Oracleのtimestampカラムを使用して、Logstashが定期的に前回以降のデータを取得して、Elasticsearchのインデックスに追加する。KIbanaはデータ確認用としてのみ使用する。

なお、環境構築はdockerで行うのでELK Stackとは直接的に関係無い設定がこのエントリには含まれる。また、ホスト側はWIndows 10な点に注意。

手順

dockerのネットワーク作成

それぞれ異なるコンテナで動かすため、通信設定が必要。方法は幾つかあるけど、ここではあらかじめネットワークを作っておくやり方にした。

docker network create --driver bridge common_link

参考: https://qiita.com/reneice/items/20e981062b093264cd0a

Oracle Database 18c XE

http://kagamihoge.hatenablog.com/entry/2018/12/28/204636 を参考にdockerでOracle Database 18c XEをうごかす。なお、上記リンク先と異なる点として、docker-compose.ymlにネットワーク設定を追加している。修正後のファイルは以下の通り。

version: '3'
services:
  database:
    image: oracle/database:18.4.0-xe
    volumes:
      - C:\mydata\oracle\oradata:/opt/oracle/oradata
    ports:
      - 11521:1521
      - 18080:8080
      - 15500:5500
    environment:
      - ORACLE_PWD=oracle
    networks:
      - common_link
networks:
    common_link:
        external: true

Logstashが参照するテーブル定義は以下の通り。

CREATE TABLE LOGSTASH_SAMPLE (
  "ID"                 NUMBER(10, 0) NOT NULL,
  "TIMESTAMPE_VALUE"   TIMESTAMP(6) default current_timestamp NOT NULL,
  constraint "LOGSTASH_SAMPLE_PK" PRIMARY KEY("ID")
);

ELK Stackのdocker-compose.xml

Elasticsearch, Logstash, Kibanaのdocker-compose.xmlを作成する。

version: '3'
services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.0.1
    ports:
      - 9200:9200
      - 9300:9300
    volumes:
      - ./elasticsearch/data:/usr/share/elasticsearch/data
    networks:
      - common_link
    environment:
      - discovery.type=single-node
  logstash:
    image: docker.elastic.co/logstash/logstash-oss:7.0.1
    container_name: logstash
    volumes:
      - ./logstash/pipeline:/usr/share/logstash/pipeline
      - ./logstash/log:/usr/share/logstash/logs
      - ./logstash/jdbc:/opt/jdbc
      - ./logstash/metadata/:/usr/share/logstash/metadata
    depends_on:
      - elasticsearch
    networks:
      - common_link
  kibana:
    image: docker.elastic.co/kibana/kibana-oss:7.0.1
    ports:
      - 5601:5601
    depends_on:
      - elasticsearch
    networks:
      - common_link
networks:
    common_link:
        external: true
  • networks ですべてのコンテナを同一ネットワークに入れておく。また、Logstashのところで後述するがこれによってホスト名をdatabaseとかelasticsearchとかで参照可能になる。

Logstash

Logstashの設定について。まず、上記docker-compose.ymlのlogstashのvolumesでマウントしてるディレクトリについて。

  • logstash
    • jdbc - oracleに接続するためのJDBCドライバを配置。
    • metadata - timestampカラムに対する最終取得時刻を保存するファイルをここに自動保存する
    • pipeline - パイプラインと呼ばれる処理設定ファイルを配置。

パイプライン設定ファイル

logstashでは、パイプラインと呼ばれる処理設定ファイルを記述する。このエントリではlogstash/pipeline/sample.confファイルを作成する。

input {
  jdbc {
    jdbc_driver_library => "/opt/jdbc/ojdbc7.jar"
    jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
    jdbc_connection_string => "jdbc:oracle:thin:@database:1521/XEPDB1"
    jdbc_user => "system"
    jdbc_password => "**********"
    schedule => "* * * * *"
    statement_filepath => "/usr/share/logstash/pipeline/sql/sample.sql"
    tracking_column => timestampe_value
    tracking_column_type  => "timestamp"
    use_column_value => true
    jdbc_default_timezone => "Japan"
    last_run_metadata_path=> "/usr/share/logstash/metadata/last_run_metadata.txt"
    clean_run => true
  }
}

output {
  elasticsearch {
    hosts => ["elasticsearch:9200"]
    index => "sample_data"
    document_id => "sample_%{id}"
  }
}
  • jdbc_driver_class - かなりハマったが、Java::oracle.jdbc.driver.OracleDriver"と、プレフィクスにJava::と書かなければならない。
  • jdbc_connection_string - 前述の通りdockerのネットワーク設定をしてあるので、ここではdatabaseというホスト名でoracleにアクセスできる。
  • statement_filepath - データ取得に使うSQLファイルを指定。ここではファイルに外出ししているが、直接各ことも可能。ドキュメント参照。
  • tracking_column - データ取得時に参照するカラムを指定する。timestampe以外も使えるのでその辺はドキュメントを参照。
  • last_run_metadata_path - ここでは最終取得時刻を保存するファイルを指定。
  • clean_run - 開発用に毎回クリーン実行するためtrueにしている。

データ取得用sql

select id, timestampe_value from logstash_sample where timestampe_value > :sql_last_value
  • :sql_last_value - last_run_metadata_pathに保存してある値をこれで参照できる。これによって、logstashがこのSQLを実行するたびに最終取得時刻以降のデータを差分取得してくる。

last_run_metadata_pathの最終取得時刻保存ファイル

ここでは最終取得時刻が保存される。中身はこんな感じ。

--- !ruby/object:DateTime '2019-05-03 17:10:45.559465000 +09:00'

実行

この状態でELK stackのdockerを起動する。正常動作すれば、logstashが定期的にoracleからelasticsearchにデータを出力するログが流れる。

logstash         | [2019-05-06T02:58:02,743][INFO ][logstash.inputs.jdbc     ] (0.144195s) select id, timestampe_value from logstash_sample where timestampe_value > TIMESTAMP '1970-01-01 09:00:00.000000 +09:00'
logstash         | [2019-05-06T02:59:00,376][INFO ][logstash.inputs.jdbc     ] (0.006701s) select id, timestampe_value from logstash_sample where timestampe_value > TIMESTAMP '2019-05-03 17:10:45.559465 +09:00'
logstash         | [2019-05-06T03:00:00,170][INFO ][logstash.inputs.jdbc     ] (0.000849s) select id, timestampe_value from logstash_sample where timestampe_value > TIMESTAMP '2019-05-03 17:10:45.559465 +09:00'