Nginx日志入库到Doris

Nginx日志入库到Doris

前言

半年前在测试环境已进行验证,过程请看:https://199604.com/3220

这次大致流程与之前一样,部分sql进一步完善,且使用k8s上的logstash兼容doris吧,最近上生产的过程配置记录就这样,防止自己又忘了。

操作

过程文件参考官网文档:https://doris.apache.org/zh-CN/docs/observability/log

环境准备

参考:https://199604.com/3212快速部署

2台fe 4c16g

3台be 8c21g(其中一台与Doris Manager共用)

1台Doris Manager管理机

使用doris版本为2.1.10

image-20250721155638600

优化 FE 和 BE 配置

参考:https://doris.apache.org/zh-CN/docs/3.0/observability/log#%E7%AC%AC-3-%E6%AD%A5%E4%BC%98%E5%8C%96-fe-%E5%92%8C-be-%E9%85%8D%E7%BD%AE

# log 优化 FE 配置
# 开启单副本导入提升导入性能
enable_single_replica_load = true

max_running_txn_num_per_db=10000
streaming_label_keep_max_second = 3600
label_keep_max_second = 7200
enable_round_robin_create_tablet = true
tablet_rebalancer_type = partition
autobucket_min_buckets = 10
max_backend_heartbeat_failure_tolerance_count = 10
# log 优化 FE 配置



# log 优化 BE 配置
write_buffer_size = 1073741824
max_tablet_version_num = 20000
# cpu的1/4
max_cumu_compaction_threads = 2
inverted_index_compaction_enable = true
enable_segcompaction = false
enable_ordered_data_compaction = false
enable_compaction_priority_scheduling = false
total_permits_for_compaction_score = 200000
disable_storage_page_cache = true
inverted_index_searcher_cache_limit = 30%
inverted_index_cache_stale_sweep_time_sec = 3600
index_cache_entry_stay_time_after_lookup_s = 3600
enable_inverted_index_cache_on_cooldown = true
enable_write_index_searcher_cache = false
tablet_schema_cache_recycle_interval = 3600
segment_cache_capacity = 20000
inverted_index_ram_dir_enable = true
pipeline_executor_size = 4
doris_scanner_thread_pool_thread_num = 10
scan_thread_nice_value = 5
string_type_length_soft_limit_bytes = 10485760  
trash_file_expire_time_sec = 300
path_gc_check_interval_second = 900
path_scan_interval_second = 900
# 开启单副本导入提升导入性能
enable_single_replica_load = true
# log 优化 BE 配置

因暂时考虑冷热存储数据,因此未对存储参数进行优化。

建表

因暂时考虑冷热存储数据,且暂时未使用冷盘,因此与官方文档建表存在差别,可自行判断。

-- 修改保留日期:ALTER TABLE logstash_nginx_log SET ("dynamic_partition.start" = "-180");
-- 重命名旧表: ALTER TABLE logstash_nginx_log RENAME logstash_nginx_log_old;
--  删除旧表: DROP TABLE logstash_nginx_log_old;

CREATE TABLE logstash_nginx_log
(
  `ts` DATETIME NOT NULL COMMENT '时间',
  `args` TEXT COMMENT '请求参数',
  `client_ip` varchar(128) COMMENT '客户端IP',
  `real_client_ip` varchar(128) COMMENT '真实客户端IP',
  `domain` varchar(256) COMMENT '域名',
  `file_dir` TEXT COMMENT '文件目录',
  `filetype` varchar(128) COMMENT '文件类型',
  `geoip_city_name` TEXT COMMENT 'GeoIP城市名',
  `geoip_coordinates` TEXT COMMENT 'GeoIP坐标',
  `geoip_country_name` TEXT COMMENT 'GeoIP国家名',
  `geoip_ip` varchar(128) COMMENT 'GeoIP IP',
  `geoip_location_lat` DOUBLE COMMENT 'GeoIP纬度',
  `geoip_location_lon` DOUBLE COMMENT 'GeoIP经度',
  `geoip_region_name` TEXT COMMENT 'GeoIP区域名',
  `host_name` varchar(256) COMMENT '主机名',
  `http_user_agent` TEXT COMMENT 'HTTP用户代理',
  `log_file_path` TEXT COMMENT '日志文件路径',
  `log_offset` TEXT COMMENT '日志偏移量',
  `protocol` varchar(128)  COMMENT '协议',
  `referer` TEXT COMMENT '引用页',
  `request_body` TEXT COMMENT '请求体',
  `request_length` int COMMENT '请求长度',
  `request_method` varchar(64) COMMENT '请求方法',
  `responsetime` FLOAT COMMENT '响应时间',
  `server_ip` varchar(128) COMMENT '服务器IP',
  `size` int COMMENT '响应大小',
  `status` int COMMENT 'HTTP状态码',
  `tags` TEXT COMMENT '标签',
  `type` varchar(64) COMMENT '类型',
  `ua_device` varchar(128) COMMENT 'UA设备',
  `ua_name` varchar(128) COMMENT 'UA名称',
  `ua_os` varchar(128) COMMENT 'UA操作系统',
  `ua_os_full` varchar(256) COMMENT 'UA完整操作系统信息',
  `ua_os_name` varchar(128) COMMENT 'UA操作系统名称',
  `ua_os_version` varchar(128) COMMENT 'UA操作系统版本',
  `ua_version` varchar(128) COMMENT 'UA版本',
  `upstreamhost` TEXT COMMENT '上游主机',
  `upstreamtime` FLOAT COMMENT '上游响应时间',
  `url` TEXT COMMENT '请求URL',
  `xff` TEXT  COMMENT 'X-Forwarded-For',
  INDEX idx_client_ip (client_ip) USING INVERTED,
  INDEX idx_real_client_ip (real_client_ip) USING INVERTED,
  INDEX idx_domain (domain) USING INVERTED,
  INDEX idx_geoip_city_name (geoip_city_name) USING INVERTED,
  INDEX idx_status (status) USING INVERTED,
  INDEX idx_upstreamhost (upstreamhost) USING INVERTED
)
ENGINE = OLAP
DUPLICATE KEY(`ts`)
PARTITION BY RANGE(`ts`) ()
DISTRIBUTED BY RANDOM BUCKETS 6
PROPERTIES (
  "compression" = "zstd", -- 使用 zstd 压缩算法提高数据压缩率。
  "compaction_policy" = "time_series", -- 策略是为日志、时序等场景优化的策略
  "dynamic_partition.enable" = "true", -- 是否开启动态分区特性
  "dynamic_partition.create_history_partition" = "true", -- 自动建立历史分区
  "dynamic_partition.time_unit" = "DAY", -- 动态分区调度的单位
  "dynamic_partition.prefix" = "p", -- 动态创建的分区名前缀。
  "dynamic_partition.start" = "-180", -- 动态分区的起始偏移
  "dynamic_partition.end" = "1", -- 动态分区的结束偏移
  "dynamic_partition.buckets" = "6", -- 动态创建的分区所对应的分桶数量,在分桶策略上,可以使⽤ RANDOM 进行随机分桶,分桶数量大致设置为集群磁盘总数的 2-3 倍。
  "dynamic_partition.replication_num" = "2", -- 存算分离不需要
  "replication_num" = "2" -- 存算分离不需要
);

采集日志

与之前未改变情况下,使用filebeat --> logstash的版本都是:7.17.27

对接 filebeat

因filebeat影响不大,与elk之前配置一致。自行检查:output.logstash下hosts配置

因为我这边需要使用logstash去配置访问ip地址的城市之类的。

[root@nginx109 filebeat]# cat filebeat.yml
name: "10.194.106.109"

tags: ["10.194.106.109","nginx"]

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - "/AppHome/nginx/logs/*.json"
  fields:
    filetype: nginx
  fields_under_root: true

  json.keys_under_root: true
  json.overwrite_keys: true
  json.add_error_key: true

output.logstash:
  hosts: [ "10.196.69.1:35044" ]
  retry.enabled: true  # 启用重试机制
  retry.initial: 1s  # 初始重试间隔
  retry.backoff.max: 60s  # 最大重试间隔
  retry.max: 5  # 最大重试次数


# 没有新日志采集后多长时间关闭文件句柄,默认5分钟,设置成1分钟,加快文件句柄关闭
close_inactive: 1m

# 传输了3h后没有传输完成的话就强行关闭文件句柄,这个配置项是解决以上案例问题的key point
close_timeout: 3h

# 这个配置项也应该配置上,默认值是0表示不清理,不清理的意思是采集过的文件描述在registry文件里永不清理,在运行一段时间后,registry会变大,可能会带来问题
clean_inactive: 72h

# 设置了clean_inactive后就需要设置ignore_older,且要保证ignore_older < clean_inactive
ignore_older: 70h

# 限制 CPU和内存资源
max_procs: 1 # 限制一个CPU核心,避免过多抢占业务资源
queue.mem.events: 256 # 存储于内存队列的事件数,排队发送 (默认4096)
queue.mem.flush.min_events: 128 # 小于 queue.mem.events ,增加此值可提高吞吐量 (默认值2048)

对接 Logstash

参考:https://doris.apache.org/zh-CN/docs/ecosystem/logstash

可以从官网下载或者自行从源码编译 Logstash Doris output plugin。

  • 从官网下载
    • 不包含依赖的安装包 https://apache-doris-releases.oss-accelerate.aliyuncs.com/logstash-output-doris-1.0.0.gem
    • 包含依赖的安装包 https://apache-doris-releases.oss-accelerate.aliyuncs.com/logstash-output-doris-1.0.0.zip

安装插件-二进制

普通安装

${LOGSTASH_HOME} 是 Logstash 的安装目录,运行它下面的 bin/logstash-plugin 命令安装插件

${LOGSTASH_HOME}/bin/logstash-plugin install logstash-output-doris-1.0.0.gem

Validating logstash-output-doris-1.0.0.gem
Installing logstash-output-doris
Installation successful

普通安装模式会自动安装插件依赖的 ruby 模块,对于网络不通的情况会卡住无法完成,这种情况需要下载包含依赖的zip安装包进行完全离线安装,注意需要用 file:// 指定本地文件系统。即下面配置

离线安装
${LOGSTASH_HOME}/bin/logstash-plugin install file:///tmp/logstash-output-doris-1.0.0.zip

Installing file: logstash-output-doris-1.0.0.zip
Resolving dependencies.........................
Install successful
k8s部署

yaml文件只是部分

apiVersion: apps/v1
kind: Deployment
metadata:
  name: elk-logstash
  namespace: logging
spec:
  selector:
    matchLabels:
      app: logstash
  replicas: 2
  revisionHistoryLimit: 2
  template:
    metadata:
      labels:
        app: logstash
    spec:
      containers:
        - name: logstash
          image: reg-hub.gzeport.com/k8s-component/elk/logstash:7.17.28
          command: [ "/bin/sh", "-c" ]
          args:
            - |
              echo "Installing logstash-output-doris plugin..."
              /usr/share/logstash/bin/logstash-plugin install file:///usr/share/logstash/logstash-output-doris-1.0.0.zip
              if [ $? -ne 0 ]; then
                echo "Failed to install logstash-output-doris plugin. Exiting..."
                exit 1
              fi
              echo "logstash-output-doris plugin installed successfully."
              echo "Starting logstash..."
              /usr/share/logstash/bin/logstash -f /usr/share/logstash/pipeline/logstash.conf

Logstash 配置

修改${LOGSTASH_HOME}/config/logstash.yml 文件

http.host: "0.0.0.0"
log.level: warn
path.config: /opt/logstash-7.17.27/config/pipeline/
config.reload.automatic: true
config.reload.interval: 300s
#pipeline.batch.size: 500
#pipeline.batch.delay: 200
pipeline.batch.size: 1000000
pipeline.batch.delay: 10000

配置文件logstash_nginx_doris_log.conf

mkdir -p /opt/logstash-7.17.27/config/pipeline/
touch logstash_nginx_doris_log.conf

# 内容如下:
    input {
      beats {
        type => "beat-nginx-logs"
        port => 5044
        codec => json
        client_inactivity_timeout => 3600
      }
    }

    filter {
      if [type] == "beat-nginx-logs" {
        # nginx 日志
        # 优先从 X-Forwarded-For 中提取最左边的真实 IP
        if [xff] != "" {
          ruby {
            code => "
              if event.get('xff')
                ips = event.get('xff').split(',').map(&:strip)
                event.set('real_client_ip', ips[0])
              end
            "
          }
        } else {
          mutate {
            add_field => { "real_client_ip" => "%{client_ip}" }
          }
        }

        # GeoIP 地理位置解析(统一使用 real_client_ip)
        geoip {
          target => "geoip"
          source => "real_client_ip"
          database => "/usr/share/logstash/GeoLite2-City.mmdb"
          add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
          add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
          remove_field => [
            "[geoip][latitude]", "[geoip][longitude]",
            "[geoip][country_code]", "[geoip][country_code2]", "[geoip][country_code3]",
            "[geoip][timezone]", "[geoip][continent_code]", "[geoip][region_code]"
          ]
        }

        mutate {
          convert => [ "size", "integer" ]
          convert => [ "status", "integer" ]
          convert => [ "responsetime", "float" ]
          convert => [ "upstreamtime", "float" ]
          convert => [ "[geoip][coordinates]", "float" ]
          # 过滤 filebeat 没用的字段,这里过滤的字段要考虑好输出到es的,否则过滤了就没法做判断
          remove_field => [ "ecs","agent","cloud","@version","input" ]
        }

        # 根据 http_user_agent来自动处理区分用户客户端系统与版本
        useragent {
          source => "http_user_agent"
          target => "ua"
          # 过滤useragent没用的字段
          remove_field => [ "[ua][minor]","[ua][major]","[ua][build]","[ua][patch]","[ua][os_minor]","[ua][os_major]" ]
        }

        # 处理空值并设置默认值
        ruby {
          code => "
            # 处理geoip空值
            event.set('[geoip][ip]', event.get('[geoip][ip]') || '0.0.0.0')
            event.set('[geoip][city_name]', event.get('[geoip][city_name]') || 'Unknown')
            event.set('[geoip][coordinates]', event.get('[geoip][coordinates]') || [0.0, 0.0])
            event.set('[geoip][country_name]', event.get('[geoip][country_name]') || 'Unknown')
            event.set('[geoip][location][lat]', event.get('[geoip][location][lat]') || 0.0)
            event.set('[geoip][location][lon]', event.get('[geoip][location][lon]') || 0.0)
            event.set('[geoip][region_name]', event.get('[geoip][region_name]') || 'Unknown')
            # 处理ua空值
            event.set('[ua][device]', event.get('[ua][device]') || 'Other')
            event.set('[ua][name]', event.get('[ua][name]') || 'Other')
            event.set('[ua][os]', event.get('[ua][os]') || 'Other')
            event.set('[ua][os_full]', event.get('[ua][os_full]') || 'Other')
            event.set('[ua][os_name]', event.get('[ua][os_name]') || 'Other')
            event.set('[ua][os_version]', event.get('[ua][os_version]') || 'Other')
            event.set('[ua][version]', event.get('[ua][version]') || 'Other')
          "
        }

      }
    }

    output {
      if [type] == "beat-nginx-logs" {
        # 输出到es
        elasticsearch {
          hosts => "elk-elasticsearch:9200"
          user => "xxxx"
          password => "xxxx"
          index => "logstash-nginx-log-%{+yyyy.MM.dd}"
        }

        # 输出到doris
        doris {
          #http_hosts => ["${DORIS_HTTP_HOSTS}"]
          http_hosts => ["http://a:8030","http://b:8030"]
          user => "${DORIS_USER}"
          password => "${DORIS_PASSWORD}"
          db => "${DORIS_DB}"
          table => "${DORIS_TABLE}"

          headers => {
            "format" => "json"
            "read_json_by_line" => "true"
            "load_to_single_tablet" => "true"
          }

          mapping => {
            "ts" => "%{@timestamp}"
            "args" => "%{args}"
            "client_ip" => "%{client_ip}"
            "real_client_ip" => "%{real_client_ip}"
            "domain" => "%{domain}"
            "file_dir" => "%{file_dir}"
            "filetype" => "%{filetype}"
            "geoip_ip" => "%{[geoip][ip]}"
            "geoip_city_name" => "%{[geoip][city_name]}"
            "geoip_coordinates" => "%{[geoip][coordinates]}"
            "geoip_country_name" => "%{[geoip][country_name]}"
            "geoip_location_lat" => "%{[geoip][location][lat]}"
            "geoip_location_lon" => "%{[geoip][location][lon]}"
            "geoip_region_name" => "%{[geoip][region_name]}"
            "host_name" => "%{[host][name]}"
            "http_user_agent" => "%{http_user_agent}"
            "log_file_path" => "%{[log][file][path]}"
            "log_offset" => "%{[log][offset]}"
            "protocol" => "%{protocol}"
            "referer" => "%{referer}"
            "request_body" => "%{request_body}"
            "request_length" => "%{request_length}"
            "request_method" => "%{request_method}"
            "responsetime" => "%{responsetime}"
            "server_ip" => "%{server_ip}"
            "size" => "%{size}"
            "status" => "%{status}"
            "tags" => "%{tags}"
            "type" => "%{type}"
            "ua_device" => "%{[ua][device]}"
            "ua_name" => "%{[ua][name]}"
            "ua_os" => "%{[ua][os]}"
            "ua_os_full" => "%{[ua][os_full]}"
            "ua_os_name" => "%{[ua][os_name]}"
            "ua_os_version" => "%{[ua][os_version]}"
            "ua_version" => "%{[ua][version]}"
            "upstreamhost" => "%{upstreamhost}"
            "upstreamtime" => "%{upstreamtime}"
            "url" => "%{url}"
            "xff" => "%{xff}"
          }
          log_request => true
          log_speed_interval => 10
        }

      } else {
        stdout { }
      }
    }
运行 Logstash(k8s自行修改对应yaml配置运行)
./bin/logstash -f /opt/logstash-7.17.27/config/pipeline/logstash_nginx_doris_log.conf

使用webui可视化日志分析

参考使用:https://cn.selectdb.com/docs/enterprise/enterprise-core-guide/selectdb-webui-guide

因为webui包含类 Kibana Discover 的日志检索分析界面,提供直观、易用的探索式日志分析交互。

image-20250721161659444

参考

1.https://doris.apache.org/zh-CN/docs/observability/log

2.https://cn.selectdb.com/docs/enterprise/enterprise-core-guide/selectdb-webui-guide

3.https://mp.weixin.qq.com/s/YB2_RxdNC21CdhuwX9f2qg

暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇