当前位置: 首页 > news >正文

如何给一个网站做定时的更新南昌市科协网站

如何给一个网站做定时的更新,南昌市科协网站,网络广告策划内容,wordpress论坛用户一、背景 在Canal推送数据变更信息至MQ#xff08;消息队列#xff09;时#xff0c;我们遇到了特定问题#xff0c;尤其是当消息体的大小超过了MQ所允许的最大限制。这种限制导致数据推送过程受阻#xff0c;需要相应的调整或处理。 二、解决方法 采用Canal自定义客户…一、背景 在Canal推送数据变更信息至MQ消息队列时我们遇到了特定问题尤其是当消息体的大小超过了MQ所允许的最大限制。这种限制导致数据推送过程受阻需要相应的调整或处理。 二、解决方法 采用Canal自定义客户端的方式将接收到的消息进行压缩处理并将消息转换为MQ消息格式。经过处理的消息将被推送到原有的MQ主题与原有消息处理兼容从而在提升数据处理效率的同时确保既有的消费者逻辑得以保持不变实现平滑过渡与升级。 三、例子 !-- canal 客户端的集成 -- dependencygroupIdcom.alibaba.otter/groupIdartifactIdcanal.client/artifactIdversion1.1.5/version /dependency !-- 客户端通信 消息传递 -- dependencygroupIdcom.alibaba.otter/groupIdartifactIdcanal.protocol/artifactIdversion1.1.5/version /dependency package cn.ewan.skyeye.log.audit.support.canal;import cn.hutool.json.JSONUtil; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.FlatMessage; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException;import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.net.InetSocketAddress; import java.util.*; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream;/*** Description:* Author: wulm* Date: 2024/6/14 10:11* Version:1.0*/ public class CustomCanalClient {private final CanalConnector connector;private final String FILTER_REGEX test_db.(test_table);public CustomCanalClient(String destination, String host, int port) {connector CanalConnectors.newSingleConnector(new InetSocketAddress(host, port),destination, , );connector.connect();connector.subscribe(FILTER_REGEX);}public void listen() throws InterruptedException {while (true) {// 一次获取的变更事件数量int batchSize 1000;Message message connector.getWithoutAck(batchSize);long batchId message.getId();EntryRowData[] entryRowData buildMessageData(message);if (entryRowData null) {continue;}ListFlatMessage flatMessages messageConverter(entryRowData, batchId);for (FlatMessage flatMessage : flatMessages){System.out.println(compress(flatMessage));}System.out.println(FlatMessage: JSONUtil.toJsonStr(flatMessages));// 确认消息已被处理connector.ack(batchId);// 暂停一秒避免循环过快Thread.sleep(1000);}}public static class EntryRowData {public CanalEntry.Entry entry;public CanalEntry.RowChange rowChange;}public static EntryRowData[] buildMessageData(Message message) {if (message.isRaw()) {ListByteString rawEntries message.getRawEntries();final EntryRowData[] datas new EntryRowData[rawEntries.size()];int i 0;for (ByteString byteString : rawEntries) {final int index i;try {CanalEntry.Entry entry CanalEntry.Entry.parseFrom(byteString);CanalEntry.RowChange rowChange CanalEntry.RowChange.parseFrom(entry.getStoreValue());datas[index] new EntryRowData();datas[index].entry entry;datas[index].rowChange rowChange;} catch (InvalidProtocolBufferException e) {throw new RuntimeException(e);}i;}return datas;} else {final EntryRowData[] datas new EntryRowData[message.getEntries().size()];int i 0;for (CanalEntry.Entry entry : message.getEntries()) {final int index i;try {CanalEntry.RowChange rowChange CanalEntry.RowChange.parseFrom(entry.getStoreValue());datas[index] new EntryRowData();datas[index].entry entry;datas[index].rowChange rowChange;} catch (InvalidProtocolBufferException e) {throw new RuntimeException(e);}i;}return datas;}}public static ListFlatMessage messageConverter(EntryRowData[] datas, long id) {ListFlatMessage flatMessages new ArrayList();for (EntryRowData entryRowData : datas) {CanalEntry.Entry entry entryRowData.entry;CanalEntry.RowChange rowChange entryRowData.rowChange;// 如果有分区路由,则忽略begin/end事件if (entry.getEntryType() CanalEntry.EntryType.TRANSACTIONBEGIN|| entry.getEntryType() CanalEntry.EntryType.TRANSACTIONEND) {continue;}// build flatMessageCanalEntry.EventType eventType rowChange.getEventType();FlatMessage flatMessage new FlatMessage(id);flatMessages.add(flatMessage);flatMessage.setDatabase(entry.getHeader().getSchemaName());flatMessage.setTable(entry.getHeader().getTableName());flatMessage.setIsDdl(rowChange.getIsDdl());flatMessage.setType(eventType.toString());flatMessage.setEs(entry.getHeader().getExecuteTime());flatMessage.setTs(System.currentTimeMillis());flatMessage.setSql(rowChange.getSql()); // flatMessage.setGtid(entry.getHeader().getGtid());if (!rowChange.getIsDdl()) {MapString, Integer sqlType new LinkedHashMap();MapString, String mysqlType new LinkedHashMap();ListMapString, String data new ArrayList();ListMapString, String old new ArrayList();SetString updateSet new HashSet();boolean hasInitPkNames false;for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {if (eventType ! CanalEntry.EventType.INSERT eventType ! CanalEntry.EventType.UPDATE eventType ! CanalEntry.EventType.DELETE) {continue;}MapString, String row new LinkedHashMap();ListCanalEntry.Column columns;if (eventType CanalEntry.EventType.DELETE) {columns rowData.getBeforeColumnsList();} else {columns rowData.getAfterColumnsList();}for (CanalEntry.Column column : columns) {if (!hasInitPkNames column.getIsKey()) {flatMessage.addPkName(column.getName());}sqlType.put(column.getName(), column.getSqlType());mysqlType.put(column.getName(), column.getMysqlType());if (column.getIsNull()) {row.put(column.getName(), null);} else {row.put(column.getName(), column.getValue());}// 获取update为true的字段if (column.getUpdated()) {updateSet.add(column.getName());}}hasInitPkNames true;if (!row.isEmpty()) {data.add(row);}if (eventType CanalEntry.EventType.UPDATE) {MapString, String rowOld new LinkedHashMap();for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {if (updateSet.contains(column.getName())) {if (column.getIsNull()) {rowOld.put(column.getName(), null);} else {rowOld.put(column.getName(), column.getValue());}}}// update操作将记录修改前的值old.add(rowOld);}}if (!sqlType.isEmpty()) {flatMessage.setSqlType(sqlType);}if (!mysqlType.isEmpty()) {flatMessage.setMysqlType(mysqlType);}if (!data.isEmpty()) {flatMessage.setData(data);}if (!old.isEmpty()) {flatMessage.setOld(old);}}}return flatMessages;}private static void printColumn(ListCanalEntry.Column columns) {for (CanalEntry.Column column : columns) {System.out.println(column.getName() : column.getValue());}}private String compress(FlatMessage flatMessage) {String decompressedJsonString null;try {String jsonString JSONUtil.toJsonStr(flatMessage);// 压缩JSON字符串ByteArrayOutputStream byteArrayOutputStream new ByteArrayOutputStream();GZIPOutputStream gzipOutputStream new GZIPOutputStream(byteArrayOutputStream);try {gzipOutputStream.write(jsonString.getBytes(UTF-8));} finally {gzipOutputStream.close();}byte[] compressedData byteArrayOutputStream.toByteArray();// 解压缩数据并还原为JSON字符串ByteArrayInputStream byteArrayInputStream new ByteArrayInputStream(compressedData);GZIPInputStream gzipInputStream new GZIPInputStream(byteArrayInputStream);ByteArrayOutputStream decompressedOutputStream new ByteArrayOutputStream();try {byte[] buffer new byte[1024];int len;while ((len gzipInputStream.read(buffer)) ! -1) {decompressedOutputStream.write(buffer, 0, len);}} finally {gzipInputStream.close();}decompressedJsonString decompressedOutputStream.toString(UTF-8);} catch (Exception e) {e.printStackTrace();}return decompressedJsonString;}public static void main(String[] args) throws InterruptedException {CustomCanalClient client new CustomCanalClient(example, 127.0.0.1, 11111);client.listen();} }
http://www.tj-hxxt.cn/news/132430.html

相关文章:

  • 门户网站建设服务收费工业设计 做自己的网站 知乎
  • 信息产业部icp备案中心网站新乡商城网站建设哪家好
  • 合肥的网站建设公司哪家好下述不属于网页制作工具
  • 仪器仪表网站制作徐州优化网站建设
  • 单页网站网址建筑必看六个网站
  • 霸州网站开发网站建设中英语如何说
  • 凯里网站建设哪家专业有个找人做任务赚返佣的网站
  • 设计素材网站哪个最好用wordpress后台二次开发
  • 设计得好的美食网站wordpress电影分享
  • jsp网站开发实例教程网络管理系统密码
  • 做那种事情的网站青岛科技网站建设
  • 学校网站建设成功织梦网站图片代码
  • 移动互联与网站开发编辑网站教程
  • 公章电子版在线制作网站四川建设工程信息网官网
  • 性价比最高网站建设电话正规的网站建设公
  • 网站 设计 工具网站建设中英版
  • 中恒建设职业技术培训学校网站网络营销首先要做什么
  • 学ui设计网站微转app是用网站做的吗
  • 大学生网站建设例题答案设计好的网站
  • 网站排名优化化快排优化广告设计有哪些
  • 数据库网站建设教程免费域名备案
  • 大型网站过程网站初期内容
  • 海南省建设网站首页网站流量的重要性
  • 做网站项目主要技术北京市网上服务平台登录
  • 西安百度网站快速优化网站用小程序
  • 网站seo关键词排名优化wordpress 四亩
  • 教你做文案的网站推荐网站设计资源
  • 青州网站建设青州上海天华建筑设计有限公司合肥分公司
  • 免费做苗木网站软件开发项目经理职责
  • designer怎么做网站企业邮箱与个人邮箱有什么区别