安徽徐州网站建设公司,网站开发报价文件,微信网站建设平台,阿里巴巴网站建设1. logstash脚本编写#xff08;采用单文件对应单表实例#xff09;
新建脚本文件夹
cd /usr/local/logstash mkdir sql cd sql vim 表名称.conf #如: znyw_data_gkb_logstash.conf
建立文件夹#xff0c;保存资源文件更新Id
mkdir -p /data/logstash/data/last_r…1. logstash脚本编写采用单文件对应单表实例
新建脚本文件夹
cd /usr/local/logstash mkdir sql cd sql vim 表名称.conf #如: znyw_data_gkb_logstash.conf
建立文件夹保存资源文件更新Id
mkdir -p /data/logstash/data/last_run_metadata
脚本JDBC插件参数说明 增加新脚本步骤修改statement 查询执行SQL脚本对应数据库表可以通过代码工具生成。地址
增量同步字段建议使用更新时间字段tracking_column updateDate tracking_column_type timestamp类型成对出现
修改数据存储文件路径红色为索引名称每个脚本对应一个。last_run_metadata_path xxx_last_update_time.txt
同步频率可以通过在线表达式进行生成在线Cron表达式生成器
第一步要做的事情配置logstasht同步脚本文件内容如下
input {jdbc {type jdbc# 数据库连接地址jdbc_connection_string jdbc:mysql://172.168.9.131:3306/gffp_om?characterEncodingUTF-8autoReconnecttrueserverTimezoneCTT# 数据库连接账号密码jdbc_user znygjdbc_password hTORRp86!7# MySQL依赖包路径jdbc_driver_library /usr/local/logstash/mysql-connector-java-8.0.22.jar# the name of the driver class for mysqljdbc_driver_class com.mysql.cj.jdbc.Driver# 数据库重连尝试次数connection_retry_attempts 5# 判断数据库连接是否可用默认false不开启jdbc_validate_connection true# 数据库连接可用校验超时时间默认3600Sjdbc_validation_timeout 3600# 开启分页查询默认false不开启jdbc_paging_enabled true# 单次分页查询条数默认100000,若字段较多且更新频率较高建议调低此值jdbc_page_size 500# statement为查询数据sql如果sql较复杂建议配通过statement_filepath配置sql文件的存放路径# sql_last_value为内置的变量存放上次查询结果中最后一条数据tracking_column的值此处即为ModifyTime# statement_filepath mysql/jdbc.sqlstatement select * from xxx WHERE id :sql_last_value# 是否将字段名转换为小写默认true如果有数据序列化、反序列化需求建议改为falselowercase_column_names false# Value can be any of: fatal,error,warn,info,debug默认infosql_log_level warn## 是否记录上次执行结果true表示会将上次执行结果的tracking_column字段的值保存到last_run_metadata_path指定的文件中record_last_run true# 需要记录查询结果某字段的值时此字段为true否则默认tracking_column为timestamp的值use_column_value true# 需要记录的字段用于增量同步需是数据库字段tracking_column id# Value can be any of: numeric,timestampDefault value is numerictracking_column_type numeric# record_last_run上次数据存放位置last_run_metadata_path /data/logstash/data/last_run_metadata/_last_id.txt# 是否清除last_run_metadata_path的记录需要增量同步时此字段必须为falseclean_run false## 同步频率(分 时 天 月 年)默认每分钟同步一次schedule * * * * *}
}filter {}
output {elasticsearch {# 配置ES集群地址hosts [172.168.9.135:9200,172.168.9.136:9200,172.168.9.137:9200]# 索引名字必须小写index A# 数据唯一索引建议使用数据库KeyIDdocument_id %{id}}
}第二部要做的事情根据_mapping.json初始化模版在kibana中执行
PUT /_template/索引名称 与模板名称保持一致。
模板内容如下
PUT /_template/A_temp A
{A template: A_temp.json ,order: 1,
# 索引配置设置这里需要注意 分片数在索引创建的时候使用这个索引一旦创建了后就不能再修改想改只能删索引重建了副本数可以热更新其他参数可以关闭索引更新内容。settings: {number_of_shards: 5,number_of_replicas: 1,max_result_window: 65536,max_inner_result_window: 10000,translog.durability: request,translog.sync_interval: 3s,auto_expand_replicas: false,analysis.analyzer.default.type: ik_max_word,analysis.search_analyzer.default.type: ik_smart,shard.check_on_startup: false,codec: default,store.type: niofs},mappings: {#这个需要注意一下主要是做模板验证用的详细参考ES template模板相关内容。dynamic: true,dynamic_date_formats: [yyyy-MM-ddTHH:mm:ss.SSSZ,yyyy-MM-ddTHH:mm:ss.SSS0800,yyyy-MM-ddTHH:mm:ssZ,yyyy-MM-ddTHH:mm:ss0800,yyyy-MM-ddTHH:mm:ss,yyyy-MM-dd HH:mm:ss,yyyy-MM-dd],properties: {timestamp: {type: date},version: {type: integer},_class: {type: text,fields: {keyword: {type: keyword,ignore_above: 256}}},#这里的是对应数据库字段一一对于格式可以指定特殊的一些使用我提供了一个工具大家在文章中找一下。id: {type: long}}}
}
这里想重点说几个需要注意的地方
1.logstash同步脚本中index名称与索引名称、模板名称保持一致 看蓝色的A,三个在上面的模板文件和脚本文件中注意这三个A是给大家标记用的文件中不需要。
2.看索引模板的代码中红色代码块是需要注意的地方我在文件中写了注释注意这是json参数使用时把注释删掉。
2.数据同步
2.1单表同步
#编写一个单表实例测试如下启动日志会实时打印便于调试错误 bin/logstash -f test-new.conf
注意: ES模板名称与logstash脚本index名称保存一致 检查_mapping是否与template模板一致字段类型长度是否与模板一致同步完成后在kibana中执行
如GET idx_znyw_data_gkb_logstash/_mapping
此时会多出几个模板中没有定义select查询不包含的字段
有坑的地方重点标准一下
timestamp,version, _class, type(这里有个大坑type属于logstashmysql实体不能存在此字段官方也没有解决方案)
检查数据
检查时间类型等是否与mysql中数据同步一致
脚本同步时配置的时间字段包含具体时间信息的以UTC时间
与数据库保持一致一种操作是同步时8处理一种是不处理因为ES这块是UTC时间ES提供的JAVA版本API也是UTC处理这块时间会进行转换。
那么需要注意的地方是什么
8操作后所有界面上显示的时间均为转换后时区时间具体减价看服务所在的时区界面上显示的时间指的是Kibana或者ES HEAD查询出的时间那么需要注意的是什么呢在java中查询时你需要注意这个8操作。主要是查询参数但是得到的结果你会发现还是不准确。
我推荐的操作是怎么操作呢我建议不要在数据同步时做什么特殊处理这块特别绕一会8一会不8那么怎么操作呢我建议在VO对象中做格式转换使用注解进行view层的数据转换给用户显示正确的时间查询时别用vo对象做反射哦否则参数也会根据注解进行修改查询参数就不对了。 2.2多文件配置 修改配置文件,这里还可以修改path.config: “/map/es/soft/logstash-7.6.2/config/myconfig/*.conf” 不推荐这种方式但是注意一下这是一个大坑与机制有关我没有扒代码只是把自己的推断写下来logstash 本次任务未执行完成有异常会保存当前任务执行状态包括配置文件内容类似存储同步ID序列也就是说如果批量执行过程中有任务未完成此时logstash会把包括配置文件的状态一起保存。此时想修改批处理为单个执行修改配置文件后不起作用即便指定单个文件运行也会把所有的一起执行只能把同步脚本做迁移保留一个来运行或者删除其他脚本保留一个当记录的任务执行成功后此时会释放重新加载配置文件在生成环境中遇到了修改配置文件不起作用任务重新执行完logstash才重新加载配置文件
vim /usr/local/logstash/config/pipelines.yml 追加配置所有脚本文件 – pipeline.id: znyw_data_gkb_logstash path.config: “/usr/local/logstash/sql/znyw_data_gkb_logstash.conf” – pipeline.id: znyw_data_gkb_logstash1 path.config: “/usr/local/logstash/sql/znyw_data_gkb_logstash1.conf”
启动检查数据同步
nohup ./bin/logstash
tail -f /usr/local/logstash/nohup.out ps#查看执行实时情况更详细排错
tail -f /data/logstash/logs/logstash-plain.log #查看任务执行情况查看执行结果
3. 问题及解决方案
3.1″path.data” setting.
异常问题
Logstash could not be started because there is already another instance using the configured data directory. If you wish to run multiple instances, you must change the “path.data” setting.
查看是否存在正在运行的进程
ps aux|grep logstash
kill -9 73803
3.2、数据同步后ES没有数据 output.elasticsearch模块的index必须是全小写
3.3、增量同步后last_run_metadata_path文件内容不改变 如果lowercase_column_names配置的不是false那么tracking_column字段配置的必须是全小写。
3.4、提示找不到jdbc_driver_library
2032 com.mysql.jdbc.Driver not loaded.
Are you sure youve included the correct jdbc driver in :jdbc_driver_library? 检测配置的地址是否正确如果是linux环境注意路径分隔符是“/”而不是“\”。
3.5、数据丢失 statement配置的sql中如果比较字段使用的是大于“”可能存在数据丢失。
假设当同步完成后last_run_metadata_path存放的时间为2019-01-30 20:45:30而这时候新入库一条数据的更新时间也为2019-01-30 20:45:30那么这条数据将无法同步。
解决方案将比较字段使用 大于等于“”。
3.6、数据重复更新 上一个问题“数据丢失”提供的解决方案是比较字段使用“大于等于”但这时又会产生新的问题。
假设当同步完成后last_run_metadata_path存放的时间为2019-01-30 20:45:30而数据库中更新时间最大值也为2019-01-30 20:45:30那么这些数据将重复更新直到有更新时间更大的数据出现。
当上述特殊数据很多且长期没有新的数据更新时会导致大量的数据重复同步到ES。
何时会出现以上情况呢①比较字段非“自增”②比较字段是程序生成插入。
解决方案
①比较字段自增保证不重复或重复概率极小比如使用自增ID或者数据库的timestamp这样就能避免大部分异常情况了
②如果确实存在大量程序插入的数据其更新时间相同且可能长期无数据更新可考虑定期更新数据库中的一条测试数据避免最大值有大量数据。 3.7 logstash突然报Unable to connect to database. Tried 1 times
检查一下上服务部署的机器用命令
telnet 172.168.9.131 3306
若报错显示MySql Host is blocked because of many connection errors; unblock with ‘mysqladmin flush-hosts’
2、使用mysqladmin flush-hosts 命令清理一下hosts文件不知道mysqladmin在哪个目录下可以使用命令查找whereis mysqladmin
① 在查找到的目录下使用命令修改/usr/local/bin/mysqladmin flush-hosts -uroot -p 备注 其中端口号用户名密码都可以根据需要来添加和修改 配置有master/slave主从数据库的要把主库和从库都修改一遍的我就吃了这个亏明明很容易的几条命令结果折腾了大半天 第二步也可以在数据库中进行命令如下flush hosts;
4. 最后一定要做的事情
4.1 索引副本数配置
因环境节点数量不同所以特殊索引副本数量需要设置如下索引进行动态修改
idx_znyg_devicemodel idx_znyg_devicetype idx_znyg_sysdevtype idx_znyg_sysinfo
在kibana中执行如下代码修改副本数量参数为副本数量为节点数-1如下操作还可以修改索引settings相关属性除了分片数量不能动态修改外只有副本数量可以热更新其他属性需要关闭索引后进行修改。
PUT /idx_znyg_sysinfo/_settings { “index”:{ “number_of_replicas”: 2 } }
4.2服务健康状态检查任务配置
crontab -e 0 * * * /bin/sh /opt/logstash/logstash-7.1.1/logstash_health.sh
提供一个索引配置工具自己写的包括生产SQL语句显示表数据量生成ruby关于时间字段的脚步批量创建更新ES模板,但是模板需要事先写好。