Nginx日志从es替换到Doris记录
前言
生产现在Nginx日志,都是通过filebeat+logstash收集与转换到ES上,并通过Grafana进行可视化展示,部署详情查看:ELK-分析Nginx日志和Grafana可视化展示
现在存在一些问题在于,ES存的数据暂时未进行压缩,且其他同事学习成本也会增大(暂时只有我在维护),于是想着对es进行替换,期间也验证过ck数据库,但后面看到doris出了日志存储与分析这篇白皮书文章,就开始打算折腾一下能不能es替换到Doris。没想到经过几天研究与折腾验证还是可行的。而已只是简单修改logstash
配置即可入库,影响面也很小。
操作
过程参考官方文档:
https://doris.apache.org/zh-CN/docs/log-storage-analysis
环境准备
参考:https://199604.com/3212
快速部署
因保密需要,只能展示测试环境,我测试环境,1台fe 2c6g,3台be 2c4g(其中一台与fe共用),1台Doris Manager管理机
优化 FE 和 BE 配置
参考:
https://doris.apache.org/zh-CN/docs/log-storage-analysis#%E7%AC%AC-3-%E6%AD%A5%E4%BC%98%E5%8C%96-fe-%E5%92%8C-be-%E9%85%8D%E7%BD%AE
因暂时考虑冷热存储数据,因此未对存储
参数进行优化。
建表
因暂时考虑冷热存储数据,且暂时未使用云盘,因此与官方文档建表有出入,可自行判断。
CREATE DATABASE log_db;
USE log_db;
CREATE TABLE logstash_nginx_log
(
`ts` DATETIME,
`args` TEXT,
`client_ip` TEXT,
`domain` TEXT,
`file_dir` TEXT,
`filetype` TEXT,
`geoip.city_name` TEXT,
`geoip.coordinates` TEXT,
`geoip.country_name` TEXT,
`geoip.ip` TEXT,
`geoip.location.lat` DOUBLE,
`geoip.location.lon` DOUBLE,
`geoip.region_name` TEXT,
`host.name` TEXT,
`http_user_agent` TEXT,
`log.file.path` TEXT,
`log.offset` TEXT,
`protocol` TEXT,
`referer` TEXT,
`request_body` TEXT,
`request_length` int,
`request_method` TEXT,
`responsetime` FLOAT,
`server_ip` TEXT,
`size` int,
`status` int,
`tags` TEXT,
`type` TEXT,
`ua.device` TEXT,
`ua.name` TEXT,
`ua.os` TEXT,
`ua.os_full` TEXT,
`ua.os_name` TEXT,
`ua.os_version` TEXT,
`ua.version` TEXT,
`upstreamhost` TEXT,
`upstreamtime` FLOAT,
`url` TEXT,
`xff` TEXT
)
ENGINE = OLAP
DUPLICATE KEY(`ts`)
PARTITION BY RANGE(`ts`) ()
DISTRIBUTED BY RANDOM BUCKETS 30
PROPERTIES (
"compression" = "zstd", -- 使用 zstd 压缩算法提高数据压缩率。
"compaction_policy" = "time_series", -- 策略是为日志、时序等场景优化的策略
"dynamic_partition.enable" = "true", -- 是否开启动态分区特性
"dynamic_partition.create_history_partition" = "true", -- 自动建立历史分区
"dynamic_partition.time_unit" = "DAY", -- 动态分区调度的单位
"dynamic_partition.prefix" = "p", -- 动态创建的分区名前缀。
"dynamic_partition.start" = "-100", -- 动态分区的起始偏移
"dynamic_partition.end" = "1", -- 动态分区的结束偏移
"dynamic_partition.buckets" = "30", -- 动态创建的分区所对应的分桶数量。
"dynamic_partition.replication_num" = "2", -- 存算分离不需要
"replication_num" = "2" -- 存算分离不需要
);
采集日志
后面使用filebeat logstash
的版本都是:7.17.27
**对接 filebeat **
因filebeat影响不大,与elk之前配置一致。自行检查:output.logstash
下hosts配置
name: "192.168.111.105"
tags: ["192.168.111.105","nginx"]
filebeat.inputs:
- type: log
enabled: true
paths:
- "/AppHome/components/nginx/logs/access.log"
fields:
filetype: nginx
fields_under_root: true
json.keys_under_root: true
json.overwrite_keys: true
json.add_error_key: true
output.logstash:
hosts: ["192.168.111.150:5044"]
# 没有新日志采集后多长时间关闭文件句柄,默认5分钟,设置成1分钟,加快文件句柄关闭
close_inactive: 1m
# 传输了3h后没有传输完成的话就强行关闭文件句柄,这个配置项是解决以上案例问题的key point
close_timeout: 3h
# 这个配置项也应该配置上,默认值是0表示不清理,不清理的意思是采集过的文件描述在registry文件里永不清理,在运行一段时间后,registry会变大,可能会带来问题
clean_inactive: 72h
# 设置了clean_inactive后就需要设置ignore_older,且要保证ignore_older < clean_inactive
ignore_older: 70h
# 限制 CPU和内存资源
max_procs: 1 # 限制一个CPU核心,避免过多抢占业务资源
queue.mem.events: 256 # 存储于内存队列的事件数,排队发送 (默认4096)
queue.mem.flush.min_events: 128 # 小于 queue.mem.events ,增加此值可提高吞吐量 (默认值2048)
运行:/AppHome/filebeat-7.17.27-linux-x86_64/filebeat -e -c filebeat.yml
对接 Logstash
参考:
https://doris.incubator.apache.org/zh-CN/docs/ecosystem/logstash
可以从官网下载或者自行从源码编译 Logstash Doris output plugin。
- 从官网下载
- 不包含依赖的安装包 https://apache-doris-releases.oss-accelerate.aliyuncs.com/logstash-output-doris-1.0.0.gem
- 包含依赖的安装包 https://apache-doris-releases.oss-accelerate.aliyuncs.com/logstash-output-doris-1.0.0.zip
安装插件
- 普通安装
${LOGSTASH_HOME} 是 Logstash 的安装目录,运行它下面的 bin/logstash-plugin 命令安装插件
${LOGSTASH_HOME}/bin/logstash-plugin install logstash-output-doris-1.0.0.gem
Validating logstash-output-doris-1.0.0.gem
Installing logstash-output-doris
Installation successful
普通安装模式会自动安装插件依赖的 ruby 模块,对于网络不通的情况会卡住无法完成,这种情况下可以下载包含依赖的zip安装包进行完全离线安装,注意需要用 file:// 指定本地文件系统。
- 离线安装
${LOGSTASH_HOME}/bin/logstash-plugin install file:///tmp/logstash-output-doris-1.0.0.zip
Installing file: logstash-output-doris-1.0.0.zip
Resolving dependencies.........................
Install successful
Logstash 配置
修改${LOGSTASH_HOME}/config/logstash.yml
文件
http.host: "0.0.0.0"
path.config: /opt/logstash-7.17.27/config/pipeline/
config.reload.automatic: true
config.reload.interval: 300s
#pipeline.batch.size: 500
#pipeline.batch.delay: 200
pipeline.batch.size: 1000000
pipeline.batch.delay: 10000
path.config: /opt/logstash-7.17.27/config/pipeline/
为LOGSTASH的服务目录下,自行修改
新增配置文件logstash_nginx_doris_log.conf
mkdir -p /opt/logstash-7.17.27/config/pipeline/
touch logstash_nginx_doris_log.conf
# 内容如下:
input {
beats {
type => "beat-nginx-logs"
port => 5044
codec => json
client_inactivity_timeout => 3600
}
}
filter {
if [type] == "beat-nginx-logs" {
# nginx 日志
if [xff] != "" {
geoip {
target => "geoip"
source => "xff"
database => "/usr/share/logstash/GeoLite2-City.mmdb"
add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
remove_field => ["[geoip][latitude]", "[geoip][longitude]", "[geoip][country_code]", "[geoip][country_code2]", "[geoip][country_code3]", "[geoip][timezone]", "[geoip][continent_code]", "[geoip][region_code]"]
}
} else {
geoip {
target => "geoip"
source => "client_ip"
database => "/usr/share/logstash/GeoLite2-City.mmdb"
add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
remove_field => ["[geoip][latitude]", "[geoip][longitude]", "[geoip][country_code]", "[geoip][country_code2]", "[geoip][country_code3]", "[geoip][timezone]", "[geoip][continent_code]", "[geoip][region_code]"]
}
}
mutate {
convert => [ "size", "integer" ]
convert => [ "status", "integer" ]
convert => [ "responsetime", "float" ]
convert => [ "[geoip][coordinates]", "float" ]
remove_field => [ "ecs", "agent", "cloud", "@version", "input" ]
}
useragent {
source => "http_user_agent"
target => "ua"
remove_field => [ "[ua][minor]", "[ua][major]", "[ua][build]", "[ua][patch]", "[ua][os_minor]", "[ua][os_major]" ]
}
# 处理空值并设置默认值
ruby {
code => "
event.set('[geoip][ip]', event.get('[geoip][ip]') || '')
event.set('[geoip][city_name]', event.get('[geoip][city_name]') || 'Unknown')
event.set('[geoip][coordinates]', event.get('[geoip][coordinates]') || [0.0, 0.0])
event.set('[geoip][country_name]', event.get('[geoip][country_name]') || 'Unknown')
event.set('[geoip][location][lat]', event.get('[geoip][location][lat]') || 0.0)
event.set('[geoip][location][lon]', event.get('[geoip][location][lon]') || 0.0)
event.set('[geoip][region_name]', event.get('[geoip][region_name]') || 'Unknown')
"
}
# 处理空值并设置默认值
# if ![geoip][ip] {
# mutate {
# add_field => { "[geoip][ip]" => "" }
# }
# }
# if ![geoip][city_name] {
# mutate {
# add_field => { "[geoip][city_name]" => "Unknown" }
# }
# }
# if ![geoip][coordinates] {
# mutate {
# add_field => { "[geoip][coordinates]" => [0.0, 0.0] }
# }
# }
# if ![geoip][country_name] {
# mutate {
# add_field => { "[geoip][country_name]" => "Unknown" }
# }
# }
# if ![geoip][location][lat] {
# mutate {
# add_field => { "[geoip][location][lat]" => 0.0 }
# }
# }
# if ![geoip][location][lon] {
# mutate {
# add_field => { "[geoip][location][lon]" => 0.0 }
# }
# }
# if ![geoip][region_name] {
# mutate {
# add_field => { "[geoip][region_name]" => "Unknown" }
# }
# }
}
}
output {
if [type] == "beat-nginx-logs" {
doris {
http_hosts => ["http://192.168.111.151:8030"]
user => "root"
password => "root@123"
db => "log_db"
table => "logstash_nginx_log"
headers => {
"format" => "json"
"read_json_by_line" => "true"
"load_to_single_tablet" => "true"
}
mapping => {
"ts" => "%{@timestamp}"
"args" => "%{args}"
"client_ip" => "%{client_ip}"
"domain" => "%{domain}"
"file_dir" => "%{file_dir}"
"filetype" => "%{filetype}"
"geoip.ip" => "%{[geoip][ip]}"
"geoip.city_name" => "%{[geoip][city_name]}"
"geoip.coordinates" => "%{[geoip][coordinates]}"
"geoip.country_name" => "%{[geoip][country_name]}"
"geoip.location.lat" => "%{[geoip][location][lat]}"
"geoip.location.lon" => "%{[geoip][location][lon]}"
"geoip.region_name" => "%{[geoip][region_name]}"
"host.name" => "%{[host][name]}"
"http_user_agent" => "%{http_user_agent}"
"log.file.path" => "%{[log][file][path]}"
"log.offset" => "%{[log][offset]}"
"protocol" => "%{protocol}"
"referer" => "%{referer}"
"request_body" => "%{request_body}"
"request_length" => "%{request_length}"
"request_method" => "%{request_method}"
"responsetime" => "%{responsetime}"
"server_ip" => "%{server_ip}"
"size" => "%{size}"
"status" => "%{status}"
"tags" => "%{tags}"
"type" => "%{type}"
"ua.device" => "%{[ua][device]}"
"ua.name" => "%{[ua][name]}"
"ua.os" => "%{[ua][os]}"
"ua.os_full" => "%{[ua][os_full]}"
"ua.os_name" => "%{[ua][os_name]}"
"ua.os_version" => "%{[ua][os_version]}"
"ua.version" => "%{[ua][version]}"
"upstreamhost" => "%{upstreamhost}"
"upstreamtime" => "%{upstreamtime}"
"url" => "%{url}"
"xff" => "%{xff}"
}
log_request => true
log_speed_interval => 10
}
} else {
stdout { }
}
}
运行 Logstash
./bin/logstash -f /opt/logstash-7.17.27/config/pipeline/logstash_nginx_doris_log.conf
查询和分析日志
使用webui可视化日志分析
因为webui包含类 Kibana Discover 的日志检索分析界面,提供直观、易用的探索式日志分析交互。
至此,替换es数据库,成功把elk模式替换到filebeat+logstash,入到doris数据库,并通过webui替换Kibana Discover 的日志检索分析。
参考
1.https://doris.incubator.apache.org/zh-CN/docs/ecosystem/logstash
2.https://doris.apache.org/zh-CN/docs/log-storage-analysis