1. 개요
ElasticSearch에 RDB로 이용하고 있는 mysql 데이터를 적재하고 싶어서 찾아보다가, elk의 스택인 logstash를 이용해서 batch job 형태로 주기적으로 적재할 수 있는 방법을 찾게 되었습니다.
일단은 테스트 용도이기 때문에, 윈도우 환경에서 작업을 했고, 리눅스 환경에서의 예제는 많지만 윈도우 환경에서의 예제는 없는 것 같아 정리해보려고 합니다.(elk가 생각보다 상당히 리소스를 많이 차지합니다)
2. 준비물
1) ElasticSearch
https://www.elastic.co/kr/downloads/elasticsearch
2) Logstash
https://www.elastic.co/kr/downloads/logstash
3) Mysql, Mysql connector
RDB로 사용하는 Mysql은 알아서 설치가 되었다고 생각하고...
mysql-connctor-java.jar를 받아줍니다
3. 기본 포트
ElasticSearch = http://localhost:9200/
4. Logstash 설정 파일
제 설정 파일의 폴더 구성은 아래와 같습니다.
config/pipelines.yml
...
# List of pipelines to be loaded by Logstash
#
# This document must be a list of dictionaries/hashes, where the keys/values are pipeline settings.
# Default values for omitted settings are read from the `logstash.yml` file.
# When declaring multiple pipelines, each MUST have its own `pipeline.id`.
#
# Example of two pipelines:
#
- pipeline.id: "mysql"
pipeline.workers: 1
pipeline.batch.size: 1
path.config: "C:/elk/logstash-8.3.2/config/*.config"
# config.string: "input { generator {} } filter { sleep { time => 1 } } output { stdout { codec => dots } }"
# - pipeline.id: another_test
# queue.type: persisted
...
저 4개의 주석을 제외해주고 작성해주는데,
path.config: 뒤의 경로는 실제로 logstash가 설치된 config 폴더의 경로를 입력해주면 됩니다.
config/logstash.conf
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.
input {
jdbc {
jdbc_driver_library => "C:/elk/logstash-8.3.2/lib/mysql-connector-java-8.0.18.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/maindatabase?serverTimezone=UTC"
jdbc_user => "root"
jdbc_password => "1234"
jdbc_paging_enabled => true
tracking_column => "unix_ts_in_secs"
use_column_value => true
tracking_column_type => "numeric"
schedule => "*/60 * * * * *"
statement => "SELECT *, UNIX_TIMESTAMP(updateAt) AS unix_ts_in_secs
FROM maindatabase.shop WHERE (UNIX_TIMESTAMP(updateAt) > :sql_last_value
AND updateAt < NOW()) ORDER BY updateAt ASC"
}
}
filter {
mutate {
copy => { "id" => "[@metadata][_id]"}
remove_field => ["id", "@version", "unix_ts_in_secs"]
}
}
output{
elasticsearch{
hosts => ["127.0.0.1:9200"]
index => "shop"
document_id => "%{[@metadata][_id]}"
document_type => "_doc"
#user => "elastic"
#password => "changeme"
}
}
logstash를 받고 나서 logstash-sample.conf가 있을 텐데 그 파일명을 위처럼 수정해주시면 되고, 위의 내용을 입력합니다.
이 파일의 설정값으로 RDB에서 특정 데이터베이스의 테이블의 ID를 읽어서 ElasticSearch의 특정 주기마다 적재하기 때문에 이 설정값에 대한 이해가 중요합니다!!
1) jdbc_driver_library : 아까 받은 mysql connector java의 경로를 작성합니다
2) jdbc_connection_string : database 접속 주소를 설정합니다.
3) schedule : 스케쥴 주기(크론식)
4) statement(중요함!!) : 데이터베이스에서 테이블을 조회하는 쿼리를 작성합니다. FROM 뒤에는 테이블을 작성하시고, :sql_last_value는 현재 스케쥴러가 동작한 시간이 들어가기 때문에, 이에 맞춰서 sql을 작성하면 됩니다.
5) hosts : ElasticSearch의 주소를 입력합니다.
6) index : _doc/index에 적재될 이름을 지정합니다. (document_id는 보통 테이블의 Unique한 id번호로 지정하게 됩니다.)
5. 실행
cmd창을 열고, C:\elk\logstash-8.3.2\bin과 같은 logstash가 설치된 bin 폴더에 들어가서 아래의 명령어를 입력해 실행합니다.
-f 인자로 "config/logstash.conf", 4번에서 설정한 logstash의 설정 파일 인자를 넘겨주는 것입니다.
logstash.bat -f config/logstash.conf
log4j에 의해서 실행 중인 로그를 확인할 수 있는데,
로그의 상태가 ERROR인 상태의 내용을 확인하면서 조정하시면 정상적으로 RDB의 테이블에서 데이터를 읽어서 ElasticSearch에 적재를 할 수 있게 됩니다.
아래와 같이 shop라는 인덱스에 적재가 된 내용을 확인해볼 수 있습니다.