搭建网站干什么,河北网站推广优化,北京公司注册地址政策,wordpress 织梦什么是ELK#xff1f;
ELK是Elasticsearch、Logstash、Kibana三大开源框架首字母大写简称(后来出现的filebeat属于beats家族中的一员#xff0c;可以用来替代logstash的数据收集功能#xff0c;比较轻量级)#xff0c;也被称为Elastic Stack。 Filebeat
Filebeat是用于转…什么是ELK
ELK是Elasticsearch、Logstash、Kibana三大开源框架首字母大写简称(后来出现的filebeat属于beats家族中的一员可以用来替代logstash的数据收集功能比较轻量级)也被称为Elastic Stack。 Filebeat
Filebeat是用于转发和集中日志数据的轻量级传送工具。Filebeat监视您指定的日志文件或位置收集日志事件并将它们转发到Elasticsearch或 Logstash进行索引。Filebeat的工作方式如下启动Filebeat时它将启动一个或多个输入这些输入将在为日志数据指定的位置中查找。对于Filebeat所找到的每个日志Filebeat都会启动收集器。每个收集器都读取单个日志以获取新内容并将新日志数据发送到libbeatlibbeat将聚集事件并将聚集的数据发送到为Filebeat配置的输出。
Logstash
Logstash是免费且开放的服务器端数据处理管道能够从多个来源采集数据转换数据然后将数据发送到您最喜欢的“存储库”中。Logstash能够动态地采集、转换和传输数据不受格式或复杂度的影响。利用Grok从非结构化数据中派生出结构从IP地址解码出地理坐标匿名化或排除敏感字段并简化整体处理过程。
ElasticSearch
Elasticsearch是Elastic Stack核心的分布式搜索和分析引擎,是一个基于Lucene、分布式、通过Restful方式进行交互的近实时搜索平台框架。Elasticsearch为所有类型的数据提供近乎实时的搜索和分析。无论您是结构化文本还是非结构化文本数字数据或地理空间数据Elasticsearch都能以支持快速搜索的方式有效地对其进行存储和索引。
Kibana
Kibana是一个针对Elasticsearch的开源分析及可视化平台用来搜索、查看交互存储在Elasticsearch索引中的数据。使用Kibana可以通过各种图表进行高级数据分析及展示。并且可以为 Logstash 和 ElasticSearch 提供的日志分析友好的 Web 界面可以汇总、分析和搜索重要数据日志。还可以让海量数据更容易理解。它操作简单基于浏览器的用户界面可以快速创建仪表板dashboard实时显示Elasticsearch查询动态。
Logstash入门
使用Docker-Compose启动Logstash服务其中docker-compose.yml文件如下 version: 3.1 # 服务配置 services: logstash: container_name: logstash-7.17.0 image: docker.elastic.co/logstash/logstash:7.17.0 volumes: - ./logstash/data:/usr/share/logstash/data - ./logstash/pipeline:/usr/share/logstash/pipeline networks: - elk_net # 网络配置 networks: elk_net: driver: bridge 示例配置1标准输入、输出
每隔10秒输出字符串Hello from Logstash! input { heartbeat { interval 10 message Hello from Logstash! } } output { stdout { codec rubydebug } } 示例配置2读取文件
读取文件内容(文件的最后一行将不会读取)示例文件为test.log hello world! From Shanghai To Beijing this is a log for test in logstash! 配置文件如下 input { file { path /usr/share/logstash/data/test.log start_position beginning } } output { stdout { codec rubydebug } } 读取结果如下顺序已打乱 示例配置3Grok插件
Grok为正则表达式Logstashv4.4.3已内置120种表达式参考网址为https://github.com/logstash-plugins/logstash-patterns-core/tree/main/patterns.
读取Nginx日志文件并使用Grok进行过滤。Nginx文件示例如下 112.195.209.90 - - [20/Feb/2018:12:12:14 0800] GET / HTTP/1.1 200 190 - Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Mobile Safari/537.36 - 配置文件如下 input { file { path /usr/share/logstash/data/test.log start_position beginning } } filter { grok { match { message %{COMBINEDAPACHELOG} } } } output { stdout { codec rubydebug } } 解析结果为 Kibana在Dev Tools中内置了Grok Debugger(调试器) 示例解析配置4多行读取插件 multiline
对 multiline 插件来说有三个设置比较重要negate、pattern 和 what。 • pattern: 类型是string要匹配的正则表达式 • negate: 类型是boolean默认false否定正则表达式 • what: 必须设置可以为 previous 或 next 如果正则表达式匹配了那么该事件是属于下一个或是前一个事件
multiline插件可以多行读取。示例文件内容如下(注意最后一行为空行) [Aug/08/08 14:54:03] hello world [Aug/08/09 14:54:04] hello logstash hello best practice hello raochenlin [Aug/08/10 14:54:05] the end 配置文件 input { file { path /usr/share/logstash/data/test.log start_position beginning codec multiline { pattern ^\[ negate true what previous } } } output { stdout { codec rubydebug } } 解析结果如下 { message [Aug/08/08 14:54:03] hello world, timestamp 2023-12-23T10:41:20.884Z, path /usr/share/logstash/data/test.log, host b62820accf76, version 1 } { message [Aug/08/09 14:54:04] hello logstash\n hello best practice\n hello raochenlin, path /usr/share/logstash/data/test.log, tags [ [0] multiline ], timestamp 2023-12-23T10:44:24.846Z, host b62820accf76, version 1 } 由于参数what设置为previous因此只解析出两条数据。当what设置为next时可解析出三条数据但解析结果有变化如下 { message [Aug/08/08 14:54:03] hello world, path /usr/share/logstash/data/test.log, timestamp 2023-12-23T10:49:23.395Z, host 492dfb254e78, version 1 } { message hello best practice\n hello raochenlin\n[Aug/08/10 14:54:05] the end, timestamp 2023-12-23T10:49:23.415Z, path /usr/share/logstash/data/test.log, host 492dfb254e78, version 1, tags [ [0] multiline ] } { message [Aug/08/09 14:54:04] hello logstash, path /usr/share/logstash/data/test.log, timestamp 2023-12-23T10:49:23.414Z, host 492dfb254e78, version 1 } ELK搭建简单示例
结合Logstash, ElasticSearch与Kibana将data文件夹中的以log结尾的文件逐行导入至ElasticSearch中。
logstash.conf配置如下 input { file { path /usr/share/logstash/data/*.log start_position beginning } } output { stdout { codec rubydebug } elasticsearch { hosts [http://elasticsearch:9200] index test_log action index } } docker-compose.yml文件如下 version: 3.1 # 服务配置 services: logstash: container_name: logstash-7.17.0 image: docker.elastic.co/logstash/logstash:7.17.0 volumes: - ./logstash/config/logstash.yml:/usr/share/logstash/logstash.yml - ./logstash/data:/usr/share/logstash/data - ./logstash/pipeline:/usr/share/logstash/pipeline networks: - elk_net depends_on: - elasticsearch elasticsearch: container_name: elasticsearch-7.17.0 image: elasticsearch:7.17.0 environment: - ES_JAVA_OPTS-Xms1024m -Xmx1024m - http.host0.0.0.0 - node.nameelastic01 - cluster.namecluster_elasticsearch - discovery.typesingle-node ports: - 9200:9200 - 9300:9300 volumes: - ./es/plugins:/usr/share/elasticsearch/plugins - ./es/data:/usr/share/elasticsearch/data networks: - elk_net kibana: container_name: kibana-7.17.0 image: kibana:7.17.0 ports: - 5601:5601 networks: - elk_net depends_on: - elasticsearch # 网络配置 networks: elk_net: driver: bridge ELK日志系统实战
我们来设计这样一个日志系统其中Logstash可将Flask运行过程中的日志进行收集并导入至ElasticSearch中再使用Kibana进行数据分析。
Flask服务
使用Flask构建简单的web服务代码如下
# -*- coding: utf-8 -*-
# file: server.py
# time: 2023/12/23 19:17
import time
import random
from flask import Flask, Response
import logginglogging.basicConfig(filename../logstash/data/flask.log,levellogging.DEBUG,format%(asctime)s-%(filename)s-%(funcName)s-%(levelname)s-%(message)s)
logger logging.getLogger()app Flask(elk_test)app.route(/)
def index():t1 time.time()logger.info(fapi_endpoint: /, status: 200, cost_time: {(time.time() - t1) * 1000})return Hello index, 200app.route(/io_task)
def io_task():t1 time.time()time.sleep(2)logger.info(fapi_endpoint: /io_task, status: 200, cost_time: {(time.time() - t1) * 1000})return IO bound task finish!, 200app.route(/cpu_task)
def cpu_task():t1 time.time()for i in range(10000):n i*i*ilogger.info(fapi_endpoint: /cpu_task, status: 200, cost_time: {(time.time() - t1) * 1000})return CPU bound task finish!, 200app.route(/random_sleep)
def random_sleep():t1 time.time()time.sleep(random.randint(0, 5))logger.info(fapi_endpoint: /random_sleep, status: 200, cost_time: {(time.time() - t1) * 1000})return random sleep, 200app.route(/random_status)
def random_status():t1 time.time()status_code random.choice([200] * 6 [300, 400, 400, 500])logger.info(fapi_endpoint: /random_status, status: {status_code}, cost_time: {(time.time() - t1) * 1000})return Response(random status, statusstatus_code)if __name__ __main__:app.run(host0.0.0.0, port5000, debugTrue)
使用下面的shell脚本进行HTTP请求模拟 TIMES5 for i in $(eval echo {1..$TIMES}) do siege -c 1 -r 10 http://localhost:5000/ siege -c 1 -r 5 http://localhost:5000/io_task siege -c 1 -r 5 http://localhost:5000/cpu_task siege -c 1 -r 3 http://localhost:5000/random_sleep siege -c 1 -r 10 http://localhost:5000/random_status sleep 5 done 日志记录在flask.log文件中。
ELK搭建
对上述日志搭建ELKdocker-compose.yml同上述ELK搭建简单示例logstash.conf改动如下 input { file { path /usr/share/logstash/data/flask.log start_position beginning } } filter { # 只对cost_time所在列进行解析 if cost_time in [message] { grok { match { message %{TIMESTAMP_ISO8601:request_finish_time}-%{WORD:script}.py-%{WORD:module}-%{LOGLEVEL:loglevel}-api_endpoint: %{DATA:api_endpoint}, status: %{NUMBER:status:int}, cost_time: %{NUMBER:cost_time:float} } } # 使用mutate过滤器替换字符 mutate { # 替换空格为T gsub [ request_finish_time, , T ] # 替换逗号为点 gsub [ request_finish_time, ,, . ] } # 使用date过滤器解析和格式化日期 date { match [ request_finish_time, ISO8601 ] } } else { drop { } } } output { stdout { codec rubydebug } elasticsearch { hosts [http://elasticsearch:9200] index flask_log action index } } 只对有cost_time所在的行进行解析其它行丢弃导入至ElasticSearch中的flask_log这个索引中。
数据分析
对上述的五个API Endpoint进行请求占比分析饼图如下 同时对cost_time进行数据分析其平均值90, 95, 99分位数如下表 上述的日志记录方式还有待改进比如记录程序报错信息使用json字段解析而不是Grok表达式会更容易些。