OracleからLogstashを経由してElasticsearchにデータを追加する。Oracleのtimestampカラムを使用して、Logstashが定期的に前回以降のデータを取得して、Elasticsearchのインデックスに追加する。KIbanaはデータ確認用としてのみ使用する。
なお、環境構築はdockerで行うのでELK Stackとは直接的に関係無い設定がこのエントリには含まれる。また、ホスト側はWIndows 10な点に注意。
手順
dockerのネットワーク作成
それぞれ異なるコンテナで動かすため、通信設定が必要。方法は幾つかあるけど、ここではあらかじめネットワークを作っておくやり方にした。
docker network create --driver bridge common_link
参考: https://qiita.com/reneice/items/20e981062b093264cd0a
Oracle Database 18c XE
http://kagamihoge.hatenablog.com/entry/2018/12/28/204636 を参考にdockerでOracle Database 18c XEをうごかす。なお、上記リンク先と異なる点として、docker-compose.ymlにネットワーク設定を追加している。修正後のファイルは以下の通り。
version: '3' services: database: image: oracle/database:18.4.0-xe volumes: - C:\mydata\oracle\oradata:/opt/oracle/oradata ports: - 11521:1521 - 18080:8080 - 15500:5500 environment: - ORACLE_PWD=oracle networks: - common_link networks: common_link: external: true
Logstashが参照するテーブル定義は以下の通り。
CREATE TABLE LOGSTASH_SAMPLE ( "ID" NUMBER(10, 0) NOT NULL, "TIMESTAMPE_VALUE" TIMESTAMP(6) default current_timestamp NOT NULL, constraint "LOGSTASH_SAMPLE_PK" PRIMARY KEY("ID") );
ELK Stackのdocker-compose.xml
Elasticsearch, Logstash, Kibanaのdocker-compose.xmlを作成する。
version: '3' services: elasticsearch: image: docker.elastic.co/elasticsearch/elasticsearch:7.0.1 ports: - 9200:9200 - 9300:9300 volumes: - ./elasticsearch/data:/usr/share/elasticsearch/data networks: - common_link environment: - discovery.type=single-node logstash: image: docker.elastic.co/logstash/logstash-oss:7.0.1 container_name: logstash volumes: - ./logstash/pipeline:/usr/share/logstash/pipeline - ./logstash/log:/usr/share/logstash/logs - ./logstash/jdbc:/opt/jdbc - ./logstash/metadata/:/usr/share/logstash/metadata depends_on: - elasticsearch networks: - common_link kibana: image: docker.elastic.co/kibana/kibana-oss:7.0.1 ports: - 5601:5601 depends_on: - elasticsearch networks: - common_link networks: common_link: external: true
networks
ですべてのコンテナを同一ネットワークに入れておく。また、Logstashのところで後述するがこれによってホスト名をdatabase
とかelasticsearch
とかで参照可能になる。
Logstash
Logstashの設定について。まず、上記docker-compose.ymlのlogstashのvolumesでマウントしてるディレクトリについて。
- logstash
パイプライン設定ファイル
logstashでは、パイプラインと呼ばれる処理設定ファイルを記述する。このエントリではlogstash/pipeline/sample.conf
ファイルを作成する。
input { jdbc { jdbc_driver_library => "/opt/jdbc/ojdbc7.jar" jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver" jdbc_connection_string => "jdbc:oracle:thin:@database:1521/XEPDB1" jdbc_user => "system" jdbc_password => "**********" schedule => "* * * * *" statement_filepath => "/usr/share/logstash/pipeline/sql/sample.sql" tracking_column => timestampe_value tracking_column_type => "timestamp" use_column_value => true jdbc_default_timezone => "Japan" last_run_metadata_path=> "/usr/share/logstash/metadata/last_run_metadata.txt" clean_run => true } } output { elasticsearch { hosts => ["elasticsearch:9200"] index => "sample_data" document_id => "sample_%{id}" } }
jdbc_driver_class
- かなりハマったが、Java::oracle.jdbc.driver.OracleDriver"
と、プレフィクスにJava::と書かなければならない。jdbc_connection_string
- 前述の通りdockerのネットワーク設定をしてあるので、ここではdatabase
というホスト名でoracleにアクセスできる。statement_filepath
- データ取得に使うSQLファイルを指定。ここではファイルに外出ししているが、直接各ことも可能。ドキュメント参照。tracking_column
- データ取得時に参照するカラムを指定する。timestampe以外も使えるのでその辺はドキュメントを参照。last_run_metadata_path
- ここでは最終取得時刻を保存するファイルを指定。clean_run
- 開発用に毎回クリーン実行するためtrue
にしている。
データ取得用sql
select id, timestampe_value from logstash_sample where timestampe_value > :sql_last_value
:sql_last_value
-last_run_metadata_path
に保存してある値をこれで参照できる。これによって、logstashがこのSQLを実行するたびに最終取得時刻以降のデータを差分取得してくる。
last_run_metadata_pathの最終取得時刻保存ファイル
ここでは最終取得時刻が保存される。中身はこんな感じ。
--- !ruby/object:DateTime '2019-05-03 17:10:45.559465000 +09:00'
実行
この状態でELK stackのdockerを起動する。正常動作すれば、logstashが定期的にoracleからelasticsearchにデータを出力するログが流れる。
logstash | [2019-05-06T02:58:02,743][INFO ][logstash.inputs.jdbc ] (0.144195s) select id, timestampe_value from logstash_sample where timestampe_value > TIMESTAMP '1970-01-01 09:00:00.000000 +09:00' logstash | [2019-05-06T02:59:00,376][INFO ][logstash.inputs.jdbc ] (0.006701s) select id, timestampe_value from logstash_sample where timestampe_value > TIMESTAMP '2019-05-03 17:10:45.559465 +09:00' logstash | [2019-05-06T03:00:00,170][INFO ][logstash.inputs.jdbc ] (0.000849s) select id, timestampe_value from logstash_sample where timestampe_value > TIMESTAMP '2019-05-03 17:10:45.559465 +09:00'