Backend&Devops/ElasticSearch

[ElasticSearch] logstash + mysql window 환경 연동

기은P 2022. 7. 19. 16:36
반응형

1. 개요

ElasticSearch에 RDB로 이용하고 있는 mysql 데이터를 적재하고 싶어서 찾아보다가, elk의 스택인 logstash를 이용해서 batch job 형태로 주기적으로 적재할 수 있는 방법을 찾게 되었습니다.

일단은 테스트 용도이기 때문에, 윈도우 환경에서 작업을 했고, 리눅스 환경에서의 예제는 많지만 윈도우 환경에서의 예제는 없는 것 같아 정리해보려고 합니다.(elk가 생각보다 상당히 리소스를 많이 차지합니다)

 

 

2. 준비물

1) ElasticSearch

https://www.elastic.co/kr/downloads/elasticsearch

 

Download Elasticsearch

Download Elasticsearch or the complete Elastic Stack (formerly ELK stack) for free and start searching and analyzing in minutes with Elastic.

www.elastic.co

2) Logstash

https://www.elastic.co/kr/downloads/logstash

 

Download Logstash Free | Get Started Now

Download Logstash or the complete Elastic Stack (formerly ELK stack) for free and start collecting, searching, and analyzing your data with Elastic in minutes.

www.elastic.co

3) Mysql, Mysql connector

RDB로 사용하는 Mysql은 알아서 설치가 되었다고 생각하고...

https://mvnrepository.com/artifact/mysql/mysql-connector-java/8.0.18?__cf_chl_rt_tk=zaRajG9xCh3rQuIRHVjRuu4RLOo4WgNidVrt8TTON2k-1658197549-0-gaNycGzNCL0 

 

Maven Repository: mysql » mysql-connector-java » 8.0.18

JDBC Type 4 driver for MySQL Note: There is a new version for this artifact mysql mysql-connector-java 8.0.18 // https://mvnrepository.com/artifact/mysql/mysql-connector-java implementation group: 'mysql', name: 'mysql-connector-java', version: '8.0.18' //

mvnrepository.com

mysql-connctor-java.jar를 받아줍니다

 

 

3. 기본 포트

ElasticSearch = http://localhost:9200/

 

 

 

4. Logstash 설정 파일

제 설정 파일의 폴더 구성은 아래와 같습니다.

 

config/pipelines.yml

...

# List of pipelines to be loaded by Logstash
#
# This document must be a list of dictionaries/hashes, where the keys/values are pipeline settings.
# Default values for omitted settings are read from the `logstash.yml` file.
# When declaring multiple pipelines, each MUST have its own `pipeline.id`.
#
# Example of two pipelines:
#
- pipeline.id: "mysql"
  pipeline.workers: 1
  pipeline.batch.size: 1
  path.config: "C:/elk/logstash-8.3.2/config/*.config"
#   config.string: "input { generator {} } filter { sleep { time => 1 } } output { stdout { codec => dots } }"
# - pipeline.id: another_test
#   queue.type: persisted

...

저 4개의 주석을 제외해주고 작성해주는데,

path.config: 뒤의 경로는 실제로 logstash가 설치된 config 폴더의 경로를 입력해주면 됩니다.

 

 

config/logstash.conf

# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.

input {
  jdbc {
	jdbc_driver_library => "C:/elk/logstash-8.3.2/lib/mysql-connector-java-8.0.18.jar"
	jdbc_driver_class => "com.mysql.jdbc.Driver"
	jdbc_connection_string => "jdbc:mysql://localhost:3306/maindatabase?serverTimezone=UTC"
	jdbc_user => "root"
	jdbc_password => "1234"
	jdbc_paging_enabled => true
	tracking_column => "unix_ts_in_secs"
	use_column_value => true
	tracking_column_type => "numeric"
	schedule => "*/60 * * * * *"
	statement => "SELECT *, UNIX_TIMESTAMP(updateAt) AS unix_ts_in_secs
		FROM maindatabase.shop WHERE (UNIX_TIMESTAMP(updateAt) > :sql_last_value
		AND updateAt < NOW()) ORDER BY updateAt ASC"
  }
}

filter {
  mutate {
    copy => { "id" => "[@metadata][_id]"}
    remove_field => ["id", "@version", "unix_ts_in_secs"]
  }
}

output{
   elasticsearch{
		hosts => ["127.0.0.1:9200"]
    	index => "shop" 
    	document_id => "%{[@metadata][_id]}"
    	document_type => "_doc"
    #user => "elastic"
    #password => "changeme"
   }
}

logstash를 받고 나서 logstash-sample.conf가 있을 텐데 그 파일명을 위처럼 수정해주시면 되고, 위의 내용을 입력합니다.

 

이 파일의 설정값으로 RDB에서 특정 데이터베이스의 테이블의 ID를 읽어서 ElasticSearch의 특정 주기마다 적재하기 때문에 이 설정값에 대한 이해가 중요합니다!!

 

1) jdbc_driver_library : 아까 받은 mysql connector java의 경로를 작성합니다

2) jdbc_connection_string : database 접속 주소를 설정합니다. 

3) schedule : 스케쥴 주기(크론식)

4) statement(중요함!!) : 데이터베이스에서 테이블을 조회하는 쿼리를 작성합니다. FROM 뒤에는 테이블을 작성하시고, :sql_last_value는 현재 스케쥴러가 동작한 시간이 들어가기 때문에, 이에 맞춰서 sql을 작성하면 됩니다.

 

5) hosts : ElasticSearch의 주소를 입력합니다.

6) index : _doc/index에 적재될 이름을 지정합니다. (document_id는 보통 테이블의 Unique한 id번호로 지정하게 됩니다.)

 

 

 

5. 실행

cmd창을 열고, C:\elk\logstash-8.3.2\bin과 같은 logstash가 설치된 bin 폴더에 들어가서 아래의 명령어를 입력해 실행합니다.

-f 인자로 "config/logstash.conf", 4번에서 설정한 logstash의 설정 파일 인자를 넘겨주는 것입니다.

logstash.bat -f config/logstash.conf

 

log4j에 의해서 실행 중인 로그를 확인할 수 있는데,

로그의 상태가 ERROR인 상태의 내용을 확인하면서 조정하시면 정상적으로 RDB의 테이블에서 데이터를 읽어서 ElasticSearch에 적재를 할 수 있게 됩니다.

 

아래와 같이 shop라는 인덱스에 적재가 된 내용을 확인해볼 수 있습니다.

 

 

 

반응형