Nginx日志入库到Doris
前言
半年前在测试环境已进行验证,过程请看:https://199604.com/3220
这次大致流程与之前一样,部分sql进一步完善,且使用k8s上的logstash
兼容doris吧,最近上生产的过程配置记录就这样,防止自己又忘了。
操作
过程文件参考官网文档:https://doris.apache.org/zh-CN/docs/observability/log
环境准备
参考:https://199604.com/3212
快速部署
2台fe 4c16g
3台be 8c21g(其中一台与Doris Manager共用)
1台Doris Manager管理机
使用doris版本为2.1.10
优化 FE 和 BE 配置
参考:https://doris.apache.org/zh-CN/docs/3.0/observability/log#%E7%AC%AC-3-%E6%AD%A5%E4%BC%98%E5%8C%96-fe-%E5%92%8C-be-%E9%85%8D%E7%BD%AE
# log 优化 FE 配置
# 开启单副本导入提升导入性能
enable_single_replica_load = true
max_running_txn_num_per_db=10000
streaming_label_keep_max_second = 3600
label_keep_max_second = 7200
enable_round_robin_create_tablet = true
tablet_rebalancer_type = partition
autobucket_min_buckets = 10
max_backend_heartbeat_failure_tolerance_count = 10
# log 优化 FE 配置
# log 优化 BE 配置
write_buffer_size = 1073741824
max_tablet_version_num = 20000
# cpu的1/4
max_cumu_compaction_threads = 2
inverted_index_compaction_enable = true
enable_segcompaction = false
enable_ordered_data_compaction = false
enable_compaction_priority_scheduling = false
total_permits_for_compaction_score = 200000
disable_storage_page_cache = true
inverted_index_searcher_cache_limit = 30%
inverted_index_cache_stale_sweep_time_sec = 3600
index_cache_entry_stay_time_after_lookup_s = 3600
enable_inverted_index_cache_on_cooldown = true
enable_write_index_searcher_cache = false
tablet_schema_cache_recycle_interval = 3600
segment_cache_capacity = 20000
inverted_index_ram_dir_enable = true
pipeline_executor_size = 4
doris_scanner_thread_pool_thread_num = 10
scan_thread_nice_value = 5
string_type_length_soft_limit_bytes = 10485760
trash_file_expire_time_sec = 300
path_gc_check_interval_second = 900
path_scan_interval_second = 900
# 开启单副本导入提升导入性能
enable_single_replica_load = true
# log 优化 BE 配置
因暂时考虑冷热存储数据,因此未对
存储
参数进行优化。
建表
因暂时考虑冷热存储数据,且暂时未使用冷盘,因此与官方文档建表存在差别,可自行判断。
-- 修改保留日期:ALTER TABLE logstash_nginx_log SET ("dynamic_partition.start" = "-180");
-- 重命名旧表: ALTER TABLE logstash_nginx_log RENAME logstash_nginx_log_old;
-- 删除旧表: DROP TABLE logstash_nginx_log_old;
CREATE TABLE logstash_nginx_log
(
`ts` DATETIME NOT NULL COMMENT '时间',
`args` TEXT COMMENT '请求参数',
`client_ip` varchar(128) COMMENT '客户端IP',
`real_client_ip` varchar(128) COMMENT '真实客户端IP',
`domain` varchar(256) COMMENT '域名',
`file_dir` TEXT COMMENT '文件目录',
`filetype` varchar(128) COMMENT '文件类型',
`geoip_city_name` TEXT COMMENT 'GeoIP城市名',
`geoip_coordinates` TEXT COMMENT 'GeoIP坐标',
`geoip_country_name` TEXT COMMENT 'GeoIP国家名',
`geoip_ip` varchar(128) COMMENT 'GeoIP IP',
`geoip_location_lat` DOUBLE COMMENT 'GeoIP纬度',
`geoip_location_lon` DOUBLE COMMENT 'GeoIP经度',
`geoip_region_name` TEXT COMMENT 'GeoIP区域名',
`host_name` varchar(256) COMMENT '主机名',
`http_user_agent` TEXT COMMENT 'HTTP用户代理',
`log_file_path` TEXT COMMENT '日志文件路径',
`log_offset` TEXT COMMENT '日志偏移量',
`protocol` varchar(128) COMMENT '协议',
`referer` TEXT COMMENT '引用页',
`request_body` TEXT COMMENT '请求体',
`request_length` int COMMENT '请求长度',
`request_method` varchar(64) COMMENT '请求方法',
`responsetime` FLOAT COMMENT '响应时间',
`server_ip` varchar(128) COMMENT '服务器IP',
`size` int COMMENT '响应大小',
`status` int COMMENT 'HTTP状态码',
`tags` TEXT COMMENT '标签',
`type` varchar(64) COMMENT '类型',
`ua_device` varchar(128) COMMENT 'UA设备',
`ua_name` varchar(128) COMMENT 'UA名称',
`ua_os` varchar(128) COMMENT 'UA操作系统',
`ua_os_full` varchar(256) COMMENT 'UA完整操作系统信息',
`ua_os_name` varchar(128) COMMENT 'UA操作系统名称',
`ua_os_version` varchar(128) COMMENT 'UA操作系统版本',
`ua_version` varchar(128) COMMENT 'UA版本',
`upstreamhost` TEXT COMMENT '上游主机',
`upstreamtime` FLOAT COMMENT '上游响应时间',
`url` TEXT COMMENT '请求URL',
`xff` TEXT COMMENT 'X-Forwarded-For',
INDEX idx_client_ip (client_ip) USING INVERTED,
INDEX idx_real_client_ip (real_client_ip) USING INVERTED,
INDEX idx_domain (domain) USING INVERTED,
INDEX idx_geoip_city_name (geoip_city_name) USING INVERTED,
INDEX idx_status (status) USING INVERTED,
INDEX idx_upstreamhost (upstreamhost) USING INVERTED
)
ENGINE = OLAP
DUPLICATE KEY(`ts`)
PARTITION BY RANGE(`ts`) ()
DISTRIBUTED BY RANDOM BUCKETS 6
PROPERTIES (
"compression" = "zstd", -- 使用 zstd 压缩算法提高数据压缩率。
"compaction_policy" = "time_series", -- 策略是为日志、时序等场景优化的策略
"dynamic_partition.enable" = "true", -- 是否开启动态分区特性
"dynamic_partition.create_history_partition" = "true", -- 自动建立历史分区
"dynamic_partition.time_unit" = "DAY", -- 动态分区调度的单位
"dynamic_partition.prefix" = "p", -- 动态创建的分区名前缀。
"dynamic_partition.start" = "-180", -- 动态分区的起始偏移
"dynamic_partition.end" = "1", -- 动态分区的结束偏移
"dynamic_partition.buckets" = "6", -- 动态创建的分区所对应的分桶数量,在分桶策略上,可以使⽤ RANDOM 进行随机分桶,分桶数量大致设置为集群磁盘总数的 2-3 倍。
"dynamic_partition.replication_num" = "2", -- 存算分离不需要
"replication_num" = "2" -- 存算分离不需要
);
采集日志
与之前未改变情况下,使用filebeat --> logstash
的版本都是:7.17.27
对接 filebeat
因filebeat影响不大,与elk之前配置一致。自行检查:output.logstash
下hosts配置
因为我这边需要使用logstash去配置访问ip地址的城市之类的。
[root@nginx109 filebeat]# cat filebeat.yml
name: "10.194.106.109"
tags: ["10.194.106.109","nginx"]
filebeat.inputs:
- type: log
enabled: true
paths:
- "/AppHome/nginx/logs/*.json"
fields:
filetype: nginx
fields_under_root: true
json.keys_under_root: true
json.overwrite_keys: true
json.add_error_key: true
output.logstash:
hosts: [ "10.196.69.1:35044" ]
retry.enabled: true # 启用重试机制
retry.initial: 1s # 初始重试间隔
retry.backoff.max: 60s # 最大重试间隔
retry.max: 5 # 最大重试次数
# 没有新日志采集后多长时间关闭文件句柄,默认5分钟,设置成1分钟,加快文件句柄关闭
close_inactive: 1m
# 传输了3h后没有传输完成的话就强行关闭文件句柄,这个配置项是解决以上案例问题的key point
close_timeout: 3h
# 这个配置项也应该配置上,默认值是0表示不清理,不清理的意思是采集过的文件描述在registry文件里永不清理,在运行一段时间后,registry会变大,可能会带来问题
clean_inactive: 72h
# 设置了clean_inactive后就需要设置ignore_older,且要保证ignore_older < clean_inactive
ignore_older: 70h
# 限制 CPU和内存资源
max_procs: 1 # 限制一个CPU核心,避免过多抢占业务资源
queue.mem.events: 256 # 存储于内存队列的事件数,排队发送 (默认4096)
queue.mem.flush.min_events: 128 # 小于 queue.mem.events ,增加此值可提高吞吐量 (默认值2048)
对接 Logstash
参考:
https://doris.apache.org/zh-CN/docs/ecosystem/logstash
可以从官网下载或者自行从源码编译 Logstash Doris output plugin。
- 从官网下载
- 不包含依赖的安装包 https://apache-doris-releases.oss-accelerate.aliyuncs.com/logstash-output-doris-1.0.0.gem
- 包含依赖的安装包 https://apache-doris-releases.oss-accelerate.aliyuncs.com/logstash-output-doris-1.0.0.zip
安装插件-二进制
普通安装
${LOGSTASH_HOME} 是 Logstash 的安装目录,运行它下面的 bin/logstash-plugin 命令安装插件
${LOGSTASH_HOME}/bin/logstash-plugin install logstash-output-doris-1.0.0.gem
Validating logstash-output-doris-1.0.0.gem
Installing logstash-output-doris
Installation successful
普通安装模式会自动安装插件依赖的 ruby 模块,对于网络不通的情况会卡住无法完成,这种情况需要下载包含依赖的zip安装包进行完全离线安装,注意需要用 file:// 指定本地文件系统。即下面配置
离线安装
${LOGSTASH_HOME}/bin/logstash-plugin install file:///tmp/logstash-output-doris-1.0.0.zip
Installing file: logstash-output-doris-1.0.0.zip
Resolving dependencies.........................
Install successful
k8s部署
yaml文件只是部分
apiVersion: apps/v1
kind: Deployment
metadata:
name: elk-logstash
namespace: logging
spec:
selector:
matchLabels:
app: logstash
replicas: 2
revisionHistoryLimit: 2
template:
metadata:
labels:
app: logstash
spec:
containers:
- name: logstash
image: reg-hub.gzeport.com/k8s-component/elk/logstash:7.17.28
command: [ "/bin/sh", "-c" ]
args:
- |
echo "Installing logstash-output-doris plugin..."
/usr/share/logstash/bin/logstash-plugin install file:///usr/share/logstash/logstash-output-doris-1.0.0.zip
if [ $? -ne 0 ]; then
echo "Failed to install logstash-output-doris plugin. Exiting..."
exit 1
fi
echo "logstash-output-doris plugin installed successfully."
echo "Starting logstash..."
/usr/share/logstash/bin/logstash -f /usr/share/logstash/pipeline/logstash.conf
Logstash 配置
修改${LOGSTASH_HOME}/config/logstash.yml
文件
http.host: "0.0.0.0"
log.level: warn
path.config: /opt/logstash-7.17.27/config/pipeline/
config.reload.automatic: true
config.reload.interval: 300s
#pipeline.batch.size: 500
#pipeline.batch.delay: 200
pipeline.batch.size: 1000000
pipeline.batch.delay: 10000
配置文件logstash_nginx_doris_log.conf
mkdir -p /opt/logstash-7.17.27/config/pipeline/
touch logstash_nginx_doris_log.conf
# 内容如下:
input {
beats {
type => "beat-nginx-logs"
port => 5044
codec => json
client_inactivity_timeout => 3600
}
}
filter {
if [type] == "beat-nginx-logs" {
# nginx 日志
# 优先从 X-Forwarded-For 中提取最左边的真实 IP
if [xff] != "" {
ruby {
code => "
if event.get('xff')
ips = event.get('xff').split(',').map(&:strip)
event.set('real_client_ip', ips[0])
end
"
}
} else {
mutate {
add_field => { "real_client_ip" => "%{client_ip}" }
}
}
# GeoIP 地理位置解析(统一使用 real_client_ip)
geoip {
target => "geoip"
source => "real_client_ip"
database => "/usr/share/logstash/GeoLite2-City.mmdb"
add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
remove_field => [
"[geoip][latitude]", "[geoip][longitude]",
"[geoip][country_code]", "[geoip][country_code2]", "[geoip][country_code3]",
"[geoip][timezone]", "[geoip][continent_code]", "[geoip][region_code]"
]
}
mutate {
convert => [ "size", "integer" ]
convert => [ "status", "integer" ]
convert => [ "responsetime", "float" ]
convert => [ "upstreamtime", "float" ]
convert => [ "[geoip][coordinates]", "float" ]
# 过滤 filebeat 没用的字段,这里过滤的字段要考虑好输出到es的,否则过滤了就没法做判断
remove_field => [ "ecs","agent","cloud","@version","input" ]
}
# 根据 http_user_agent来自动处理区分用户客户端系统与版本
useragent {
source => "http_user_agent"
target => "ua"
# 过滤useragent没用的字段
remove_field => [ "[ua][minor]","[ua][major]","[ua][build]","[ua][patch]","[ua][os_minor]","[ua][os_major]" ]
}
# 处理空值并设置默认值
ruby {
code => "
# 处理geoip空值
event.set('[geoip][ip]', event.get('[geoip][ip]') || '0.0.0.0')
event.set('[geoip][city_name]', event.get('[geoip][city_name]') || 'Unknown')
event.set('[geoip][coordinates]', event.get('[geoip][coordinates]') || [0.0, 0.0])
event.set('[geoip][country_name]', event.get('[geoip][country_name]') || 'Unknown')
event.set('[geoip][location][lat]', event.get('[geoip][location][lat]') || 0.0)
event.set('[geoip][location][lon]', event.get('[geoip][location][lon]') || 0.0)
event.set('[geoip][region_name]', event.get('[geoip][region_name]') || 'Unknown')
# 处理ua空值
event.set('[ua][device]', event.get('[ua][device]') || 'Other')
event.set('[ua][name]', event.get('[ua][name]') || 'Other')
event.set('[ua][os]', event.get('[ua][os]') || 'Other')
event.set('[ua][os_full]', event.get('[ua][os_full]') || 'Other')
event.set('[ua][os_name]', event.get('[ua][os_name]') || 'Other')
event.set('[ua][os_version]', event.get('[ua][os_version]') || 'Other')
event.set('[ua][version]', event.get('[ua][version]') || 'Other')
"
}
}
}
output {
if [type] == "beat-nginx-logs" {
# 输出到es
elasticsearch {
hosts => "elk-elasticsearch:9200"
user => "xxxx"
password => "xxxx"
index => "logstash-nginx-log-%{+yyyy.MM.dd}"
}
# 输出到doris
doris {
#http_hosts => ["${DORIS_HTTP_HOSTS}"]
http_hosts => ["http://a:8030","http://b:8030"]
user => "${DORIS_USER}"
password => "${DORIS_PASSWORD}"
db => "${DORIS_DB}"
table => "${DORIS_TABLE}"
headers => {
"format" => "json"
"read_json_by_line" => "true"
"load_to_single_tablet" => "true"
}
mapping => {
"ts" => "%{@timestamp}"
"args" => "%{args}"
"client_ip" => "%{client_ip}"
"real_client_ip" => "%{real_client_ip}"
"domain" => "%{domain}"
"file_dir" => "%{file_dir}"
"filetype" => "%{filetype}"
"geoip_ip" => "%{[geoip][ip]}"
"geoip_city_name" => "%{[geoip][city_name]}"
"geoip_coordinates" => "%{[geoip][coordinates]}"
"geoip_country_name" => "%{[geoip][country_name]}"
"geoip_location_lat" => "%{[geoip][location][lat]}"
"geoip_location_lon" => "%{[geoip][location][lon]}"
"geoip_region_name" => "%{[geoip][region_name]}"
"host_name" => "%{[host][name]}"
"http_user_agent" => "%{http_user_agent}"
"log_file_path" => "%{[log][file][path]}"
"log_offset" => "%{[log][offset]}"
"protocol" => "%{protocol}"
"referer" => "%{referer}"
"request_body" => "%{request_body}"
"request_length" => "%{request_length}"
"request_method" => "%{request_method}"
"responsetime" => "%{responsetime}"
"server_ip" => "%{server_ip}"
"size" => "%{size}"
"status" => "%{status}"
"tags" => "%{tags}"
"type" => "%{type}"
"ua_device" => "%{[ua][device]}"
"ua_name" => "%{[ua][name]}"
"ua_os" => "%{[ua][os]}"
"ua_os_full" => "%{[ua][os_full]}"
"ua_os_name" => "%{[ua][os_name]}"
"ua_os_version" => "%{[ua][os_version]}"
"ua_version" => "%{[ua][version]}"
"upstreamhost" => "%{upstreamhost}"
"upstreamtime" => "%{upstreamtime}"
"url" => "%{url}"
"xff" => "%{xff}"
}
log_request => true
log_speed_interval => 10
}
} else {
stdout { }
}
}
运行 Logstash(k8s自行修改对应yaml配置运行)
./bin/logstash -f /opt/logstash-7.17.27/config/pipeline/logstash_nginx_doris_log.conf
使用webui可视化日志分析
参考使用:
https://cn.selectdb.com/docs/enterprise/enterprise-core-guide/selectdb-webui-guide
因为webui包含类 Kibana Discover 的日志检索分析界面,提供直观、易用的探索式日志分析交互。
参考
1.https://doris.apache.org/zh-CN/docs/observability/log
2.https://cn.selectdb.com/docs/enterprise/enterprise-core-guide/selectdb-webui-guide
3.https://mp.weixin.qq.com/s/YB2_RxdNC21CdhuwX9f2qg