前言
基于之前搭建的ELFK集群,继续学习logstash的使用
环境准备
1.本次仅使用一个节点完成
2.已经完成nginx日志的json格式修改,示例如下
{"@timestamp":"2025-03-12T17:54:07+08:00","host":"192.168.179.112","clientip":"192.168.179.1","SendBytes":0,"responsetime":0.000,"upstreamtime":"-","upstreamhost":"-","http_host":"192.168.179.112","uri":"/index.html","domain":"192.168.179.112","xff":"-","referer":"-","tcp_xff":"-","http_user_agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36 Edg/134.0.0.0","status":"304"}
{"@timestamp":"2025-03-13T10:43:25+08:00","host":"192.168.179.112","clientip":"192.168.179.1","SendBytes":3650,"responsetime":0.000,"upstreamtime":"-","upstreamhost":"-","http_host":"192.168.179.112","uri":"/404.html","domain":"192.168.179.112","xff":"-","referer":"http://192.168.179.112/","tcp_xff":"-","http_user_agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36 Edg/134.0.0.0","status":"404"}3.模拟程序日志示例如下
INFO 2025-03-12 15:58:34 [generate_log] - DAU|3758|查看订单|0|28736.17
INFO 2025-03-12 15:58:39 [generate_log] - DAU|5966|提交订单|0|15252.32
INFO 2025-03-12 16:41:38 [generate_log] - DAU|4363|搜索|1|23269.91
INFO 2025-03-12 16:41:42 [generate_log] - DAU|7485|搜索|1|15301.54
INFO 2025-03-12 16:41:46 [generate_log] - DAU|8880|查看订单|1|17668.16
INFO 2025-03-12 16:41:48 [generate_log] - DAU|1622|评论商品|1|19865.87
INFO 2025-03-12 16:41:49 [generate_log] - DAU|1420|评论商品|1|18647.24 架构图
要求
对于json格式的access.log,要求分析使用设备,客户端IP地址的所属城市
- 分析设备 --logstash.filter.useragent
所属城市 使用remove_field进行过滤
- 分析设备 --logstash.filter.useragent
对于apps.log,要求分析出价格(可以排序),ID(可以排序),是否是svip,并且使用日志中的时间排序
- 价格 -- logstash.filter.mutate(转成数字)
- ID -- logstash.filter.mutate(转成数字)
- svip -- logstash.filter.mutate(转成布尔)
- 日志时间 -- logstash.filter.date 过滤出时间
其他
- 删除不常用的字段,保持美观
实现
1.filebeat.yaml
filebeat.inputs:
- type: filestream
paths:
#加上*,包括轮换日志
- /var/log/nginx/access.log*
parsers:
- ndjson:
keys_under_root: true
#output.console:
# pretty: true
output.logstash:
hosts: ["192.168.179.112:8888"]2.logstah.json
input {
beats {
type => "nginx-json-log"
port => 8888
}
tcp {
type => "app-log"
port => 9999
}
}
filter {
#通用处理
mutate {
#"@timestamp"最好不要删除,因为没有时间会导致索引模式无法选择@timestamp时间前缀甚至导致无法创建索引模式
remove_field => [ "agent", "host", "@version", "ecs", "tags","input", "log" ]
}
##########################
#对type判断后进行处理
if [type] == "nginx-json-log" {
#分析客户端设备
useragent {
source => "http_user_agent"
target => "客户端"
}
#分析客户端IP
geoip {
source => "clientip"
#这里的country_name是顶级字段geoip的下级字段,必须从geoip中才能引用country_name,不能直接引用country_name
add_field => { "COUNTRY" => "%{[geoip][country_name]}" }
remove_field => [ "geoip" ]
}
#############################
} else if [type] == "app-log" {
#经测试 data无法直接匹配该内容的日期与时间,猜可能需要将日期与时间单独放一个字段
#第一次分割日志
mutate {
split => { "message" => " " }
add_field => { "DAY" => "%{[message][2]}" }
add_field => { "TIME" => "%{[message][3]}" }
add_field => { "INFO" => "%{[message][4]}" }
add_field => { "timestamp" => "%{DAY}:%{TIME}" }
}
mutate {
#经测试,无法直接通过message[5]提取字段
#第二次分割日志
split => { "INFO" => "|" }
add_field => { "NAME" => "%{[INFO][0]}" }
add_field => { "ID" => "%{[INFO][5]}" }
add_field => { "COMMAND" => "%{[INFO][6]}" }
add_field => { "ISVIP" => "%{[INFO][7]}" }
add_field => { "PRICE" => "%{[INFO][8]}" }
}
date {
match => [ "timestamp", "yyyy-MM-dd:HH:mm:ss" ]
target => "@timestamp"
}
mutate {
convert => {
"ID" => "integer"
"ISVIP" => "boolean"
"PRICE" => "float"
}
}
#移除INFO与message
mutate {
remove_field => [ "INFO" ,"message","timestamp" ]
}
}
}
output {
#stdout { }
#注意,不可以直接在组件内进行if判断,只能在组件外进行判断
if [type] == "nginx-json-log" {
elasticsearch {
hosts => ["192.168.179.111:9200","192.168.179.112:9200","192.168.179.113:9200"]
index => "nginx-%{+YYYY.MM.dd}"
}
} else if [type] == "app-log" {
elasticsearch {
hosts => ["192.168.179.111:9200","192.168.179.112:9200","192.168.179.113:9200"]
index => "app-%{+YYYY.MM.dd}"
}
}
}3.测试
1.nginx访问日志access.log
浏览器访问本机80端口
2.app.log日志
#模仿程序通过tcp传输日志
cat /tmp/app.log |nc 192.168.179.112 9999
5.关键处理
- 关于date如何处理日期
发现date无法直接过滤日志中的日期与时间,所以我先分割然后将分割后的日期时间组合起来再匹配,然后覆盖@timestamp - 关于country_name的提前
观察到country_name是geoip的下级字段,因此无法直接提取,需要使用geoip才能引用 - 关于if的使用
if判断不可以再组件中使用,只能再组件外部进行判断 - 关于type的使用
type的作用与他的中文含义有些区别,或者他应该是标签的含义才对,用来区分输入的内容,适合结合if对输入的不同内容进行不同的处理
所有传过来的字段其实都可以进行判断,那么就应该可以在filebeat中添加tag或者field,然后直接在logstahs对tag或field进行判断也是可以的
补充
geoip模块无法解析内网ip即192.168开头的ip,会报错,建议批量改成公网IP
本文著作权归作者 [ wymm ] 享有,未经作者书面授权,禁止转载,封面图片来源于 [ 互联网 ] ,本文仅供个人学习、研究和欣赏使用。如有异议,请联系博主及时处理。




