본문 바로가기
System/Elastic Stack

logstash 패턴을 이용한 ELK 파싱(nginx access log,nginx access log + geoip)

* logstash 패턴을 이용한 ELK  파싱

ELK 구성시 logstash 를 통하여 input output 으로 로그를 elasticearch 로 삽입할 경우 filter 를 통하여
해당 로그에서 필요한 부분만 필드로 지정하여 저장 할 수 있는데 이때 사용 되는 것이 pattern 이다.

logstash 에서는 Grok를 이용하여 패턴을 관리한다.
기본적으로 %{PATTERN:IDENTIFIER} 형식으로 기본적으로 지정되어 있는 패턴을 식별자로 구분 하여 로그메세지의 여러점을 해당 
패턴에 일치 할 경우 해당 식별자로 포함한다.

grok 에 대한 정보는 아래 사이트 에서 확인 가능 하다.

logstash 기본 패턴은 아래 사이트에서 확인 가능하다.

아래 사이트에서는 파싱되는 과정을 웹상에서 디버거 하여 테스트 가능하다.




ELK 서버에서 pattern 만 따로 관리 하기 위해 아래와 같은 디렉토리를 생성 권한을 수정 한다.
[root@localhost ~]# sudo mkdir -p /opt/logstash/patterns
[root@localhost ~]# chown logstash:logstash /opt/logstash/patterns


* Logstash 패턴:Nginx

/opt/logstash/patterns/nginx 파일에 패턴을 저장, 권한을 바꾼다.

[root@localhost ~]# vi /opt/logstash/patterns/nginx
NGUSERNAME [a-zA-Z\.\@\-\+_%]+
NGUSER %{NGUSERNAME}
NGINXACCESS %{IPORHOST:clientip} %{NGUSER:ident} %{NGUSER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:
httpversion}" %{NUMBER:response} (?:%{NUMBER:bytes}|-) (?:"(?:%{URI:referrer}|-)"|%{QS:referrer}) %{QS:agent}
[root@localhost ~]# chown logstash:logstash /opt/logstash/patterns/nginx

logstash input으로 들어온 로그중 type 이 nginx-access 로 되어 있는 경우 message 를 NGINXACCESS 라는 패턴에 적용 파싱 하게 된다.

[root@localhost ~]# vi /etc/logstash/conf.d/11-nginx-filter.conf
Nddginx Filter
filter {
  if [type] == "nginx-access" {
    grok {
      match => { "message" => "%{NGINXACCESS}" }
    }
  }
}

해당 설정이 변경 되면 logstash 를 재적용 한다.

[root@localhost ~]# /etc/init.d/logstash restart
Killing logstash (pid 50120) with SIGTERM
Waiting logstash (pid 50120) to die...
Waiting logstash (pid 50120) to die...
Waiting logstash (pid 50120) to die...
logstash stopped.
logstash started.


target 이 되는 client 서버에서 nginx log 설정 및 filbeat 설정을 진행 한다.
nginx 의 기본 log format 은 기본으로 설정 되어 있으며, access log가 정상적으로 쌓이는것을 확인 할 수 있다.

[root@ELK_CLIENT ~]# vi /etc/nginx/nginx.conf
http {
    log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
                      '$status $body_bytes_sent "$http_referer" '
                      '"$http_user_agent"  $request_time ';

    access_log  /var/log/nginx/access.log  main;

[root@ELK_CLIENT ~]# tail -n 10 /var/log/nginx/access.log
::1 - - [14/Dec/2016:16:42:48 +0900] "GET / HTTP/1.1" 200 3698 "-" "curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.21 Basic ECC zlib/1.2.3 libidn/1.18 libssh2/1.4.2"  0.000
::1 - - [14/Dec/2016:16:42:49 +0900] "GET / HTTP/1.1" 200 3698 "-" "curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.21 Basic ECC zlib/1.2.3 libidn/1.18 libssh2/1.4.2"  0.000


기존 filebeat 설정에 추가 적으로 nginx accesslog 를 설정 한다 이때 document_type 을 설정 하여 pattern 으로 걸릴 수 있도록 설정 한다.

[root@ELK_CLIENT ~]# vi /etc/filebeat/filebeat.yml
    -
      paths:
        - /var/log/secure
        - /var/log/messages
      input_type: log
      document_type: syslog

    -
      paths:
        - /var/log/nginx/access.log
      document_type: nginx-access
      input_type: log

... 생략 ...

[root@ELK_CLIENT ~]#  service filebeat restart

테스트를 위해 curl 를 통해 로그가 쌓이도록 한다.
[root@ELK_CLIENT ~]# curl http://localhost

kibana 에서 filebeat 에 대한 pattern 을 refresh 하게 되면 필드가 추가 된것 을 확인 할 수 있다.


discover 에서 document type 이 nginx-access 로 된 로그가 확인이 된다.


해당 타입으로 된 로그가 파싱되어 각 필드로 저장 되는 것을 확인 할 수 있다.





* Download Latest GeoIP Database

위 파싱된 아이피및 데이터를 기준으로 source ip에 대한 geo ip 를 연동 할 수 있다.
geoip db 를 저장 한다.

[root@localhost ~]# cd /etc/logstash
[root@localhost logstash]# sudo curl -O "http://geolite.maxmind.com/download/geoip/database/GeoLiteCity.dat.gz"
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 11.1M  100 11.1M    0     0  6464k      0  0:00:01  0:00:01 --:--:-- 6466k

[root@localhost logstash]# gunzip GeoLiteCity.dat.gz


* Configure Logstash to use GeoIP

logstash filter 에서 아래와 같이 필드를 추가 하여 설정 한다.

[root@localhost logstash]# vi /etc/logstash/conf.d/11-nginx-filter.conf
filter {
  if [type] == "nginx-access" {
    grok {
      match => { "message" => "%{NGINXACCESS}" }
    }
    geoip {
      source => "clientip"
      target => "geoip"
      database => "/etc/logstash/GeoLiteCity.dat"
      add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
      add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
    }
    mutate {
      convert => [ "[geoip][coordinates]", "float"]
    }
  }
}

logstash 를 재시작 후 아래와 같이 kibana 에서 해당 source 아이피에 대한 goeip 정보를 확인 할 수 있다.







반응형