• 搜索
  • 夜间模式
    ©2026  依刻学习 Theme by OneBlog

    依刻学习博客

    搜索
    标签
  • 首页>
  • 学习的一天>
  • 正文
  • ELFK实践(1)logstash组件

    2025年03月13日 14 阅读 0 评论 5571 字

    前言

    基于之前搭建的ELFK集群,继续学习logstash的使用

    环境准备
    1.本次仅使用一个节点完成
    2.已经完成nginx日志的json格式修改,示例如下

    {"@timestamp":"2025-03-12T17:54:07+08:00","host":"192.168.179.112","clientip":"192.168.179.1","SendBytes":0,"responsetime":0.000,"upstreamtime":"-","upstreamhost":"-","http_host":"192.168.179.112","uri":"/index.html","domain":"192.168.179.112","xff":"-","referer":"-","tcp_xff":"-","http_user_agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36 Edg/134.0.0.0","status":"304"}
    {"@timestamp":"2025-03-13T10:43:25+08:00","host":"192.168.179.112","clientip":"192.168.179.1","SendBytes":3650,"responsetime":0.000,"upstreamtime":"-","upstreamhost":"-","http_host":"192.168.179.112","uri":"/404.html","domain":"192.168.179.112","xff":"-","referer":"http://192.168.179.112/","tcp_xff":"-","http_user_agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36 Edg/134.0.0.0","status":"404"}

    3.模拟程序日志示例如下

    INFO 2025-03-12 15:58:34 [generate_log] - DAU|3758|查看订单|0|28736.17 
    INFO 2025-03-12 15:58:39 [generate_log] - DAU|5966|提交订单|0|15252.32 
    INFO 2025-03-12 16:41:38 [generate_log] - DAU|4363|搜索|1|23269.91 
    INFO 2025-03-12 16:41:42 [generate_log] - DAU|7485|搜索|1|15301.54 
    INFO 2025-03-12 16:41:46 [generate_log] - DAU|8880|查看订单|1|17668.16 
    INFO 2025-03-12 16:41:48 [generate_log] - DAU|1622|评论商品|1|19865.87 
    INFO 2025-03-12 16:41:49 [generate_log] - DAU|1420|评论商品|1|18647.24 

    架构图

    要求

    1. 对于json格式的access.log,要求分析使用设备,客户端IP地址的所属城市

      • 分析设备 --logstash.filter.useragent
        所属城市 使用remove_field进行过滤
    2. 对于apps.log,要求分析出价格(可以排序),ID(可以排序),是否是svip,并且使用日志中的时间排序

      • 价格 -- logstash.filter.mutate(转成数字)
      • ID -- logstash.filter.mutate(转成数字)
      • svip -- logstash.filter.mutate(转成布尔)
      • 日志时间 -- logstash.filter.date 过滤出时间
    3. 其他

      • 删除不常用的字段,保持美观

    实现

    1.filebeat.yaml

    filebeat.inputs:
    - type: filestream
      paths:
       #加上*,包括轮换日志
        - /var/log/nginx/access.log*
      parsers:
      - ndjson:
        keys_under_root: true
    #output.console:
    #  pretty: true
    output.logstash:
      hosts: ["192.168.179.112:8888"]

    2.logstah.json

    input {
      beats {
        type => "nginx-json-log"
        port => 8888
      }
      tcp {
        type => "app-log"
        port => 9999
      }
    }
    
    filter {
      #通用处理
      mutate {
        #"@timestamp"最好不要删除,因为没有时间会导致索引模式无法选择@timestamp时间前缀甚至导致无法创建索引模式
        remove_field => [  "agent", "host", "@version", "ecs", "tags","input", "log" ]
      }
    
    ##########################
    
      #对type判断后进行处理
      if [type] == "nginx-json-log" {
        
        #分析客户端设备
        useragent {
          source => "http_user_agent"
          target => "客户端"
        }
        #分析客户端IP
        geoip {
          source => "clientip"
    #这里的country_name是顶级字段geoip的下级字段,必须从geoip中才能引用country_name,不能直接引用country_name
          add_field => { "COUNTRY" => "%{[geoip][country_name]}" }
          remove_field => [ "geoip" ]    
        }
    
    #############################
    
      } else if [type] == "app-log" {
        #经测试 data无法直接匹配该内容的日期与时间,猜可能需要将日期与时间单独放一个字段
        #第一次分割日志
        mutate {
          split => { "message" => " " }
          add_field => { "DAY" => "%{[message][2]}" }
          add_field => { "TIME" => "%{[message][3]}" }
          add_field => { "INFO" => "%{[message][4]}" }
          add_field => { "timestamp" => "%{DAY}:%{TIME}" }
        }    
        mutate {
          #经测试,无法直接通过message[5]提取字段
          #第二次分割日志
          split => { "INFO" => "|" }
          add_field => { "NAME" => "%{[INFO][0]}" }
          add_field => { "ID" => "%{[INFO][5]}" }
          add_field => { "COMMAND" => "%{[INFO][6]}" }
          add_field => { "ISVIP" => "%{[INFO][7]}" }
          add_field => { "PRICE" => "%{[INFO][8]}" }
        }
        date {
          match => [ "timestamp", "yyyy-MM-dd:HH:mm:ss" ]
          target => "@timestamp"
        }
    
          
        mutate {
          convert => {
            "ID" => "integer"
            "ISVIP" => "boolean"
            "PRICE" => "float"
          }
        }
        #移除INFO与message
        mutate {
          remove_field => [ "INFO" ,"message","timestamp" ]
    
        }
      }
    }
    
    output {
    #stdout { }
    #注意,不可以直接在组件内进行if判断,只能在组件外进行判断
      if [type] == "nginx-json-log" {
        elasticsearch {
          hosts => ["192.168.179.111:9200","192.168.179.112:9200","192.168.179.113:9200"]
          index => "nginx-%{+YYYY.MM.dd}"
        }   
      } else if [type] == "app-log" {
          elasticsearch {
            hosts => ["192.168.179.111:9200","192.168.179.112:9200","192.168.179.113:9200"]
            index => "app-%{+YYYY.MM.dd}"
          }
        }
    }

    3.测试

    1.nginx访问日志access.log
    浏览器访问本机80端口
    2.app.log日志
    #模仿程序通过tcp传输日志
    cat   /tmp/app.log |nc 192.168.179.112  9999
    

    4.结果
    以下是输出到终端的测试内容

    kibana展示

    5.关键处理

    • 关于date如何处理日期
      发现date无法直接过滤日志中的日期与时间,所以我先分割然后将分割后的日期时间组合起来再匹配,然后覆盖@timestamp
    • 关于country_name的提前
      观察到country_name是geoip的下级字段,因此无法直接提取,需要使用geoip才能引用
    • 关于if的使用
      if判断不可以再组件中使用,只能再组件外部进行判断
    • 关于type的使用
      type的作用与他的中文含义有些区别,或者他应该是标签的含义才对,用来区分输入的内容,适合结合if对输入的不同内容进行不同的处理
      所有传过来的字段其实都可以进行判断,那么就应该可以在filebeat中添加tag或者field,然后直接在logstahs对tag或field进行判断也是可以的

    补充

    geoip模块无法解析内网ip即192.168开头的ip,会报错,建议批量改成公网IP

    本文著作权归作者 [ wymm ] 享有,未经作者书面授权,禁止转载,封面图片来源于 [ 互联网 ] ,本文仅供个人学习、研究和欣赏使用。如有异议,请联系博主及时处理。
    取消回复

    发表留言
    回复

    Copyright©2026  All Rights Reserved.  Load:0.023 s
    Theme by OneBlog V3.6.5
    夜间模式

    开源不易,请尊重作者版权,保留基本的版权信息。