System/Elastic Stack

Logstash Drop filter

cyuu 2018. 7. 22. 12:24


logstash Drop 함수는 특정 조건문에 해당하는 데이터에 대하여 삭제 하는 기능을 하고 있어서, 불필요하게 저장 할 데이터를 줄일 수 있다( 공식문서 )
기본 형식은 아래와 같이  loglevel  라는 field 에 대하여 debug라는 문자가 조건문에 걸릴 경우 drop 함수를 통하여 삭제 하게 된다.
Drop Filter

filter {
  if [loglevel] == "debug" {
    drop { }
  }
}

client 에서 아래와 같이 access_log를  192.168.10.190 서버의 logstash(5044)로 전송 하도록 구성 하였다.
filebeat.yml

- type: log
  enabled: true
  paths:
    - /var/log/httpd/httpd-access.log
    - /var/log/httpd/access_log
  reload.enabled: false
  hosts: ["192.168.10.190:5044"]


192.168.10.190호스트의 logstash에서는 일반적으로 설정 하는 것 처럼 grok 패턴 을 통하여 field 를 쿼리 가능한 형태로 패턴을 추가 하여 주며 ,
cy라는 이름으로 시작하는 index로 저장 될 수 있도록 구성 하였다.

logstash.conf

input {
  beats {
      port => 5044
  }
}
 
filter {
        grok {
                match => [ "message""%{COMMONAPACHELOG} %{QS:referrer} %{QS:agent}"]
                add_field => [ "received_at""%{@timestamp}" ]
                add_field => [ "received_from""%{host}" ]
                remove_tag => ['_grokparsefailure']
        }
        syslog_pri { }
 
}
 
 
 
output {
  elasticsearch {
      hosts => "localhost:9200"
      index => "cy-%{+YYYY.MM.dd}"
  }


Kibana 에서 확인 하면 다음과 같은 field로 구성 된 것을 확인 할 수 있는데, 여기서 request field 에 특정 문자가 들어 올 경우 삭제 할 수 있도록 하려고 한다.
기존 logstash.conf 중 filter에 grok 함수 아래로 if "1.html" in [request] 라는 조건문으로
request field에 1.html 일 경우 drop 함수를 실행 하도록 하였다.
logstash.conf

input {
  beats {
      port => 5044
  }
}
 
filter {
        grok {
                match => [ "message""%{COMMONAPACHELOG} %{QS:referrer} %{QS:agent}"]
                add_field => [ "received_at""%{@timestamp}" ]
                add_field => [ "received_from""%{host}" ]
                remove_tag => ['_grokparsefailure']
        }
        syslog_pri { }
 
        if "1.html" in [request] {
                drop { }
        }
}
 
 
output {
  elasticsearch {
      hosts => "localhost:9200"
      index => "cy-%{+YYYY.MM.dd}"
  }
}

logstash 재시작후 curl 로 1.html과 2.html을 같이 여러번 호출하면 1. html이 저장 안되는 것을 확인 할 수 있다. 




또한, drop으로 삭제 하더라도 일정한 비율로 drop 되는 데이터를 확인 할 수 있도록 drop 비율을 조절 할 수 있다.
drop { percentage => 50 } 와 같은 형태로 percentage를 이용 한다면 1.html을 10번 호출 시 5번만 저장 될 수 있도록 하여, 
상황에 맞도록 로그를 저장 확인 할 수 있도록 하고 있다.

logstash.conf

input {
  beats {
      port => 5044
  }
}
 
filter {
        grok {
                match => [ "message""%{COMMONAPACHELOG} %{QS:referrer} %{QS:agent}"]
                add_field => [ "received_at""%{@timestamp}" ]
                add_field => [ "received_from""%{host}" ]
                remove_tag => ['_grokparsefailure']
        }
        syslog_pri { }
 
        if "1.html" in [request] {
                drop { percentage => 50 }
        }
}
 
 
output {
  elasticsearch {
      hosts => "localhost:9200"
      index => "cy-%{+YYYY.MM.dd}"
  }
}


반응형