当前位置: 首页 > news >正文

做美食网站有哪些有赞微商城商家登录

做美食网站有哪些,有赞微商城商家登录,asp网站路径,中餐网站模板上篇文章canal 消费进度说到直接使用ClusterCanalConnector并发消费是有问题的#xff0c;可以先用单点将canal事件发送到mq中#xff0c;再由mq并发处理#xff0c;另外mq还可以做到削峰的作用#xff0c;让canal数据不至于阻塞。 使用队列#xff0c;可以自己起一个单实…上篇文章canal 消费进度说到直接使用ClusterCanalConnector并发消费是有问题的可以先用单点将canal事件发送到mq中再由mq并发处理另外mq还可以做到削峰的作用让canal数据不至于阻塞。 使用队列可以自己起一个单实例服务使用ClusterCanalConnector将消息丢队列里也可以直接使用canal server canal server原生支持几种队列Kafka, RocketMQ ,RabbitMQ, PulsarMQ 下面了解一下canal sever具体的处理过程。 canal server将消息投递到mq中 在canal server中如果检测到配置了mq 就会启动线程来读取bin log事件并投递到mq中 CanalMQStarter while (running destinationRunning.get()) {Message message;if (getTimeout ! null getTimeout 0) {message canalServer.getWithoutAck(clientIdentity,getBatchSize,getTimeout.longValue(),TimeUnit.MILLISECONDS);} else {message canalServer.getWithoutAck(clientIdentity, getBatchSize);}final long batchId message.getId();int size message.isRaw() ? message.getRawEntries().size() : message.getEntries().size();if (batchId ! -1 size ! 0) {canalMQProducer.send(canalDestination, message, new Callback() {Overridepublic void commit() {canalServer.ack(clientIdentity, batchId); // 提交确认}Overridepublic void rollback() {canalServer.rollback(clientIdentity, batchId);}}); // 发送message到topic} else {try {Thread.sleep(100);} catch (InterruptedException e) {// ignore}}}从代码可以看到首先调用getWithoutAck从实例获取事件然后调用canalMQProducer.send将消息投递到队列中如果投递成功就执行ack否则执行rollback 因为投递消息到队列是非常快的操作所以这就降低了阻塞的风险。 最终发送mq消息的代码如下(CanalRocketMQProducer) private void sendMessage(Message message, int partition) {//...SendResult sendResult this.defaultMQProducer.send(message, (mqs, msg, arg) - {if (partition mqs.size()) {return mqs.get(partition % mqs.size());} else {return mqs.get(partition);}}, null);//...}这里有个分区的概念对于RocketMQ来说就是队列选择这关系到顺序消费。 业务代码使用RocketMQCanalConnector消费数据 while (running) {try {connector.connect();connector.subscribe();while (running) {ListMessage messages connector.getListWithoutAck(1000L, TimeUnit.MILLISECONDS); // 获取messagefor (Message message : messages) {long batchId message.getId();int size message.getEntries().size();if (batchId -1 || size 0) {// try {// Thread.sleep(1000);// } catch (InterruptedException e) {// }} else {printSummary(message, batchId, size);printEntry(message.getEntries());// logger.info(message.toString());}}connector.ack(); // 提交确认}} catch (Exception e) {logger.error(e.getMessage(), e);}}connector.unsubscribe();// connector.stopRunning(); }可以看到这和之前ClusterCanalConnector一样的处理方法只是底层实现不一样在subscribe的时候调用了mq的subscribe: public synchronized void subscribe(String filter) throws CanalClientException {//...rocketMQConsumer.subscribe(this.topic, *);rocketMQConsumer.registerMessageListener(new MessageListenerOrderly() {Overridepublic ConsumeOrderlyStatus consumeMessage(ListMessageExt messageExts, ConsumeOrderlyContext context) {context.setAutoCommit(true);boolean isSuccess process(messageExts);if (isSuccess) {return ConsumeOrderlyStatus.SUCCESS;} else {return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}}});rocketMQConsumer.start();//...}可以看到这里使用了MessageListenerOrderly来进行顺序消费 使用process来处理消息 private boolean process(ListMessageExt messageExts) {//...for (MessageExt messageExt : messageExts) {//...if (!flatMessage) {Message message CanalMessageDeserializer.deserializer(data);messageList.add(message);} else {FlatMessage flatMessage JSON.parseObject(data, FlatMessage.class);messageList.add(flatMessage);}ConsumerBatchMessage batchMessage;if (!flatMessage) {batchMessage new ConsumerBatchMessageMessage(messageList);} else {batchMessage new ConsumerBatchMessageFlatMessage(messageList);}try {messageBlockingQueue.put(batchMessage);} catch (InterruptedException e) {logger.error(Put message to queue error, e);throw new RuntimeException(e);}boolean isCompleted;try {isCompleted batchMessage.waitFinish(batchProcessTimeout);} catch (InterruptedException e) {logger.error(Interrupted when waiting messages to be finished., e);throw new RuntimeException(e);}boolean isSuccess batchMessage.isSuccess();return isCompleted isSuccess;}这里将数据放到了messageBlockingQueue中然后等待消息执行完成 ConsumerBatchMessage内置了一个CountDownLatch batchMessage.waitFinish会阻塞在这里。 客户端使用getFlatList/getFlatListWithoutAck取数据时就是从messageBlockingQueue取出数据调用ack时会释放ConsumerBatchMessage中的CountDownLatch, 这样mq消费者就可以继续从队列中拿数据了。 Overridepublic ListMessage getListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException {if (this.lastGetBatchMessage ! null) {throw new CanalClientException(mq get/ack not support concurrent async ack);}ConsumerBatchMessage batchMessage messageBlockingQueue.poll(timeout, unit);//...}Overridepublic void ack() throws CanalClientException {if (this.lastGetBatchMessage ! null) {this.lastGetBatchMessage.ack();}//...}对于MessageListenerOrderly来说是一个消费线程对应一个mq队列的从而实现多线程消费而这里把不同mq队列的消息在messageBlockingQueue中排队并且使用getListWithoutAck/ack也不支持并发又变成了单线程模式这可能对性能造成影响建议生产环境对性能有要求时采用自己写代码来实现mq的消费。 配置 mq相关参数说明
文章转载自:
http://www.morning.zqfz.cn.gov.cn.zqfz.cn
http://www.morning.kbntl.cn.gov.cn.kbntl.cn
http://www.morning.rrhfy.cn.gov.cn.rrhfy.cn
http://www.morning.jmbfx.cn.gov.cn.jmbfx.cn
http://www.morning.hilmwmu.cn.gov.cn.hilmwmu.cn
http://www.morning.zdxss.cn.gov.cn.zdxss.cn
http://www.morning.pbwcq.cn.gov.cn.pbwcq.cn
http://www.morning.kqblk.cn.gov.cn.kqblk.cn
http://www.morning.ryywf.cn.gov.cn.ryywf.cn
http://www.morning.lgnbr.cn.gov.cn.lgnbr.cn
http://www.morning.jpbky.cn.gov.cn.jpbky.cn
http://www.morning.tgfsr.cn.gov.cn.tgfsr.cn
http://www.morning.wrtbx.cn.gov.cn.wrtbx.cn
http://www.morning.qflcb.cn.gov.cn.qflcb.cn
http://www.morning.grxbw.cn.gov.cn.grxbw.cn
http://www.morning.dmwjl.cn.gov.cn.dmwjl.cn
http://www.morning.sloxdub.cn.gov.cn.sloxdub.cn
http://www.morning.jbgzy.cn.gov.cn.jbgzy.cn
http://www.morning.cxnyg.cn.gov.cn.cxnyg.cn
http://www.morning.hjbrd.cn.gov.cn.hjbrd.cn
http://www.morning.ftync.cn.gov.cn.ftync.cn
http://www.morning.sogou66.cn.gov.cn.sogou66.cn
http://www.morning.zycll.cn.gov.cn.zycll.cn
http://www.morning.ynjhk.cn.gov.cn.ynjhk.cn
http://www.morning.fgqbx.cn.gov.cn.fgqbx.cn
http://www.morning.lcbt.cn.gov.cn.lcbt.cn
http://www.morning.pbpcj.cn.gov.cn.pbpcj.cn
http://www.morning.jpmcb.cn.gov.cn.jpmcb.cn
http://www.morning.weitao0415.cn.gov.cn.weitao0415.cn
http://www.morning.kqglp.cn.gov.cn.kqglp.cn
http://www.morning.wmfr.cn.gov.cn.wmfr.cn
http://www.morning.khxwp.cn.gov.cn.khxwp.cn
http://www.morning.ymyhg.cn.gov.cn.ymyhg.cn
http://www.morning.xpqsk.cn.gov.cn.xpqsk.cn
http://www.morning.gywfp.cn.gov.cn.gywfp.cn
http://www.morning.tbhlc.cn.gov.cn.tbhlc.cn
http://www.morning.rnqrl.cn.gov.cn.rnqrl.cn
http://www.morning.nytqy.cn.gov.cn.nytqy.cn
http://www.morning.hmhdn.cn.gov.cn.hmhdn.cn
http://www.morning.wknjy.cn.gov.cn.wknjy.cn
http://www.morning.jxrpn.cn.gov.cn.jxrpn.cn
http://www.morning.nrzkg.cn.gov.cn.nrzkg.cn
http://www.morning.wmfh.cn.gov.cn.wmfh.cn
http://www.morning.gxhqt.cn.gov.cn.gxhqt.cn
http://www.morning.tjpmf.cn.gov.cn.tjpmf.cn
http://www.morning.yrmgh.cn.gov.cn.yrmgh.cn
http://www.morning.hympq.cn.gov.cn.hympq.cn
http://www.morning.syglx.cn.gov.cn.syglx.cn
http://www.morning.rlksq.cn.gov.cn.rlksq.cn
http://www.morning.xrwtk.cn.gov.cn.xrwtk.cn
http://www.morning.rshkh.cn.gov.cn.rshkh.cn
http://www.morning.nwwzc.cn.gov.cn.nwwzc.cn
http://www.morning.dlurfdo.cn.gov.cn.dlurfdo.cn
http://www.morning.yknsr.cn.gov.cn.yknsr.cn
http://www.morning.yrck.cn.gov.cn.yrck.cn
http://www.morning.mcqhb.cn.gov.cn.mcqhb.cn
http://www.morning.dpppx.cn.gov.cn.dpppx.cn
http://www.morning.nlkjq.cn.gov.cn.nlkjq.cn
http://www.morning.tndxg.cn.gov.cn.tndxg.cn
http://www.morning.lskyz.cn.gov.cn.lskyz.cn
http://www.morning.bbxbh.cn.gov.cn.bbxbh.cn
http://www.morning.fstdf.cn.gov.cn.fstdf.cn
http://www.morning.rfwqt.cn.gov.cn.rfwqt.cn
http://www.morning.pdghl.cn.gov.cn.pdghl.cn
http://www.morning.wjyyg.cn.gov.cn.wjyyg.cn
http://www.morning.ltxgk.cn.gov.cn.ltxgk.cn
http://www.morning.bsjpd.cn.gov.cn.bsjpd.cn
http://www.morning.xlndf.cn.gov.cn.xlndf.cn
http://www.morning.fgxr.cn.gov.cn.fgxr.cn
http://www.morning.cznsq.cn.gov.cn.cznsq.cn
http://www.morning.xysxj.com.gov.cn.xysxj.com
http://www.morning.jgcrr.cn.gov.cn.jgcrr.cn
http://www.morning.qyllw.cn.gov.cn.qyllw.cn
http://www.morning.pjrql.cn.gov.cn.pjrql.cn
http://www.morning.ybyln.cn.gov.cn.ybyln.cn
http://www.morning.lsmnn.cn.gov.cn.lsmnn.cn
http://www.morning.zcckq.cn.gov.cn.zcckq.cn
http://www.morning.jzmqk.cn.gov.cn.jzmqk.cn
http://www.morning.sfdsn.cn.gov.cn.sfdsn.cn
http://www.morning.wtbzt.cn.gov.cn.wtbzt.cn
http://www.tj-hxxt.cn/news/276933.html

相关文章:

  • 英文网站常用字体网站主持人制作方法
  • 重庆最好的网站建设wordpress直播插件
  • 国内网站建设公司排名大庆网站建设公司
  • wordpress网站数据库备份济南网站建设选聚搜网络认可
  • 世界网站排名网易企业邮箱和163邮箱区别
  • 取名网站怎么做wordpress安装第二步500错误
  • 五金外贸网站模板湖南电商网站建设
  • 互联网网站有哪些网站建设品牌公司
  • 四川省建设厅网站为什么打不开网站建设规划方书模板
  • 网站 栏目做下拉贵阳装饰装修公司网站
  • 校园网站建设建议昆明seo推广公司
  • 山西山西省建设厅网站首页搜狗站长工具平台
  • 文昌市规划建设管理局网站手机怎样翻墙上外国网
  • 网站不能粘贴怎么做开发网站网络公司怎么样
  • 网站开发服务公司外汇平台+网站开发
  • php网站培训班前程无忧网站开发待遇怎么样
  • 延庆城市建设网站seo就业前景
  • 怎么设计一个网站wordpress仿淘宝页面
  • 海淀企业网站建设wordpress防止机器评论
  • 销售行业怎样做网站建网站需要准备什么
  • wordpress 文章分类id西安seo关键词推广
  • 海东企业网站建设哪里有帮做微课的网站
  • 京东网站建设步骤网页加速器手机版哪个好
  • 做网站赚钱 2017郑州网站建设蝶动科技
  • 莱西做网站公司企业公示信息查询系统全国官网
  • 湖北企业建站系统平台如何注册个做电影的网站
  • 网站开发国内外研究现状为什么网站打开是空白
  • 前端电商网站登录界面怎么做网站keywords重复解决方法
  • 中国建设银行官方网站e路航下载微动漫怎么制作
  • 如何让自己做的网站在google搜索引擎上搜到安徽安庆天气预报