当前位置: 首页 > news >正文

河北省建设集团有限公司网站做的网站怎样更新

河北省建设集团有限公司网站,做的网站怎样更新,免费做金融网站有哪些,WordPress附件图片弹窗目录 1、生产者消息发送流程 1.1、发送原理 2、异步发送 API 2.1、普通异步发送 2.2、带回调函数的异步发送 3、同步发送 API 4、生产者分区 4.1、分区的优势 4.2、生产者发送消息的分区策略 示例1#xff1a;将数据发往指定 partition 示例2#xff1a;有 key 的…目录 1、生产者消息发送流程 1.1、发送原理 2、异步发送 API 2.1、普通异步发送 2.2、带回调函数的异步发送 3、同步发送 API 4、生产者分区 4.1、分区的优势 4.2、生产者发送消息的分区策略 示例1将数据发往指定 partition  示例2有 key 的情况下将数据发送到Kafka 4.3、自定义分区器 5、生产者提高吞吐量 6、数据可靠性 7、数据去重 1、幂等性 8、生产者事务 1、事务原理 2、使用事务 9、数据的有序 注示例代码使用的语言是Python 1、生产者消息发送流程 1.1、发送原理 在消息发送的过程中涉及到了两个线程——main 线程和 Sender 线程。在 main 线程 中创建了一个双端队列 RecordAccumulator。main 线程将消息发送给 RecordAccumulator Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker。 参数说明 batch size只有数据积累到batch.size之后sender才会发送数据。默认16Klinger.ms如果数据迟迟未达到batch.sizesender等待linger.ms设置的时间到了之后在发送是数据。单位ms默认值为0ms表示没有延迟。acks 0生产者发送过来的数据不需要等待应答异步发送。1生产者发送过来的数据需要等待Leader收到后应该。-1all生产者发送过来的数据Leader和ISRIn-Sync Replicas队列里面所有的节点收齐数据后应答。注-1与all等价 2、异步发送 API 2.1、普通异步发送 示例创建 Kafka 生产者采用异步的方式发送到 Kafka Broker from kafka3 import KafkaProducerdef producer(topic: str, msg: str, partition0)::function: 生产者生产数据:param topic: 写入数据所在的topic:param msg: 写入的数据:param partition: 写入数据所在的分区:return:print(开始生产数据......)# 初始化生产者对象bootstrap_servers参数传入kafka集群# 将acks的值设为0acks0此方式也是异步的方式但是生产环境中不会这样使用因为存在数据丢失的风险# producer KafkaProducer(bootstrap_servers[170.22.70.174:9092, 170.22.70.178:9092, 170.22.70.179:9092], acks0)producer KafkaProducer(bootstrap_servers[170.22.70.174:9092, 170.22.70.178:9092, 170.22.70.179:9092])# 将发送消息转换成bytes类型编码使用utf-8future producer.send(topictopic, valuebytes(msg, utf-8), partitionpartition)producer.close()if __name__ __main__:msg this is profucer01topic firstproducer(topic, msg) 2.2、带回调函数的异步发送 回调函数会在 producer 收到 ack 时调用为异步调用该方法有两个参数分别是元 数据信息RecordMetadata和异常信息Exception如果 Exception 为 null说明消息发 送成功如果 Exception 不为 null说明消息发送失败。注意消息发送失败会自动重试不需要在回调函数中手动重试。 带回调函数的异步发送 回调函数会在 producer 收到 ack 时调用为异步调用该方法有两个参数分别是元数据信息RecordMetadata和异常信息Exception 如果 Exception 为 null说明消息发送成功如果 Exception 不为 null说明消息发送失败。from kafka3 import KafkaProducerdef producer(topic: str, msg: str, partition0)::function: 生产者生产数据:param topic: 写入数据所在的topic:param msg: 写入的数据:param partition: 写入数据所在的分区:return:print(开始生产数据......)# 定义发送成功的回调函数def on_send_success(record_metadata):print(消息成功发送到主题:, record_metadata.topic)print(分区:, record_metadata.partition)print(偏移量:, record_metadata.offset)# 定义发送失败的回调函数def on_send_error(excp):print(发送消息时出现错误:, excp)# 可以根据实际情况执行一些错误处理逻辑# 初始化生产者对象bootstrap_servers参数传入kafka集群producer KafkaProducer(bootstrap_servers[170.22.70.174:9092, 170.22.70.178:9092, 170.22.70.179:9092])# 将发送消息转换成bytes类型编码使用utf-8producer.send(topictopic, valuebytes(msg, utf-8), partitionpartition).add_callback(on_send_success).add_errback(on_send_error)producer.close() 3、同步发送 API 只需在异步发送的基础上再调用一下 get()方法即可。或者将acks的值设为allacksall此方式也是同步的方式。 from kafka3 import KafkaProducerdef producer(topic: str, msg: str, partition0)::function: 生产者生产数据:param topic: 写入数据所在的topic:param msg: 写入的数据:param partition: 写入数据所在的分区:return:print(开始生产数据......)# 初始化生产者对象bootstrap_servers参数传入kafka集群# 将acks的值设为allacksall此方式也是同步的方式.# producer KafkaProducer(bootstrap_servers[170.22.70.174:9092, 170.22.70.178:9092, 170.22.70.179:9092], acksall)producer KafkaProducer(bootstrap_servers[170.22.70.174:9092, 170.22.70.178:9092, 170.22.70.179:9092])# 将发送消息转换成bytes类型编码使用utf-8future producer.send(topictopic, valuebytes(msg, utf-8), partitionpartition)# 等待 Future 返回结果,设置超时时间为10秒future.get(timeout10)producer.close() 4、生产者分区 4.1、分区的优势 1、便于合理使用存储资源每个Partition在一个Broker上存储可以把海量的数据按照分区切割成一 块一块数据存储在多台Broker上。合理控制分区的任务可以实现负载均衡的效果。2、提高并行度生产者可以以分区为单位发送数据消费者可以以分区为单位进行消费数据。 4.2、生产者发送消息的分区策略 1、如果不指定分区会使用默认分区策略。默认分区策略如下 如果key存在的情况下将key的hash值与topic的partition进行取余得到partition值如果key不存在的情况下会随机选择一个分区 2、如果指明了分区那么将会把数据发送到指定分区 示例1将数据发往指定 partition  将所有数据发往分区 0 中。 # 指定分区 def producer_01(topic: str, msg: str, partition0)::function: 指定分区:param topic: 写入数据所在的topic:param msg: 写入的数据:param partition: 写入数据所在的分区:return:# 初始化生产者对象bootstrap_servers参数传入kafka集群producer KafkaProducer(bootstrap_servers[170.22.70.174:9092, 170.22.70.178:9092, 170.22.70.179:9092])# 将发送消息转换成bytes类型编码使用utf-8future producer.send(topictopic, valuebytes(msg, utf-8), partitionpartition)try:# 等待消息发送完成sendResult future.get(timeout10)print(f消息: {msg}\n所在的分区: {sendResult.partition}\n偏移量为: {sendResult.offset}\n)# 关闭生产producer.close()except KafkaError as e:print(f消息: {msg} 发送失败\n失败信息为: {e}\n)msg this is partition topic first for i in range(5):producer_01(topic, msgstr(i)) 示例2有 key 的情况下将数据发送到Kafka 没有指明 partition 值但有 key 的情况下将 key 的 hash 值与 topic 的 partition 数进行取 余得到 partition 值。 # 没有指明 partition 值但有 key 的情况下将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值。 def producer_02(topic: str, msg: str, key: str)::function: 指定分区:param topic: 写入数据所在的topic:param msg: 写入的数据:param key: 发送消息的key值:return:# 初始化生产者对象bootstrap_servers参数传入kafka集群producer KafkaProducer(bootstrap_servers[170.22.70.174:9092, 170.22.70.178:9092, 170.22.70.179:9092])# 将发送消息转换成bytes类型编码使用utf-8future producer.send(topictopic, keybytes(key, utf-8), valuebytes(msg, utf-8))try:# 等待消息发送完成sendResult future.get(timeout10)print(f消息: {msg}\n所在的分区: {sendResult.partition}\n偏移量为: {sendResult.offset}\n)# 关闭生产producer.close()except KafkaError as e:print(f消息: {msg} 发送失败\n失败信息为: {e}\n)msg this is partition topic first key a for i in range(5):producer_02(topic, msgstr(i), key) 4.3、自定义分区器 可以根据实际需要自定义实现分区器。示例自定义分区 发送过来的数据中如果包含 hello就发往 0 号分区不包含 hello就发往 1 号分区。 # 自定义分区 发送过来的数据中如果包含 hello就发往 0 号分区不包含 hello就发往 1 号分区。 def producer_03(topic: str, msg: str)::function: 自定义分区:param topic: 写入数据所在的topic:param msg: 写入的数据:return:# 自定义分区器def my_partitioner(msg):if hello in str(msg):return 0else:return 1# 初始化生产者对象bootstrap_servers参数传入kafka集群producer KafkaProducer(bootstrap_servers[170.22.70.174:9092, 170.22.70.178:9092, 170.22.70.179:9092])# 将发送消息转换成bytes类型编码使用utf-8future producer.send(topictopic, valuebytes(msg, utf-8), partitionmy_partitioner(msg))try:# 等待消息发送完成sendResult future.get(timeout10)print(f消息: {msg}\n所在的分区: {sendResult.partition}\n偏移量为: {sendResult.offset}\n)# 关闭生产producer.close()except KafkaError as e:print(f消息: {msg} 发送失败\n失败信息为: {e}\n)msg hello this is partition msg1 this is partition5、生产者提高吞吐量 实际工作中会根据实际的情况动态的调整生产者的吞吐量以适应实际需求调整吞吐量主要是通过调整以下参数实现 batch.size批次大小默认16klinger.ms等待时间修改为5-100mscompression.type压缩snappyRecordAccumulator缓冲区大小默认32m修改为64m 生产者提高吞吐量1、linger.ms等待时间修改为5-100ms2、compression.type压缩snappy3、RecordAccumulator缓冲区大小修改为64mfrom kafka3 import KafkaProducer from kafka3.errors import KafkaErrordef producer(topic: str, msg: str)::function: 生产者生产数据:param topic: 写入数据所在的topic:param msg: 写入的数据:return:# 初始化生产者对象bootstrap_servers参数传入kafka集群producer KafkaProducer(bootstrap_servers[170.22.70.174:9092, 170.22.70.178:9092, 170.22.70.179:9092],linger_ms5, # linger_ms设置为5mscompression_typesnappy, # 设置压缩类型为snappybuffer_memory64*1024*1024 # 设置缓冲区大小为64MB)# 将发送消息转换成bytes类型编码使用utf-8future producer.send(topictopic, valuebytes(msg, utf-8))try:# 等待消息发送完成sendResult future.get(timeout10)print(f消息: {msg}\n所在的分区: {sendResult.partition}\n偏移量为: {sendResult.offset}\n)# 关闭生产producer.close()except KafkaError as e:print(f消息: {msg} 发送失败\n失败信息为: {e}\n) 6、数据可靠性 说明数据的可靠性保证主要是通过acks的设置来保证的下面说明acks在不同取值下的数据可靠性情况 acks0时 因为生产者发送数据后就不管了所以当Leader或Follower发生异常时就会发生数据丢失。实际使用很少acks1时 因为生产者只需要等到Leader应答后就算完成本次发生了但是当Leader应答完成后还没有开始同步副本数据Leader此时挂掉新的Leader上线后并不会收到丢失数据因为生产者已经认为数据发送成功了这时就会发生数据丢失。实际使用一般用于传输普通日志acks-1时 因为生产者需要等到Leader和Follower都收到数据后才算完成本次数据传输所以可靠性高但是当分区副本只有1个或者ISR应答的最小副本设置为1此时和acks1时效果一样存在数据丢失的风险。实际使用对可靠性要求较高的场景中比如涉及到金钱相关的场景 综上分析要想使得数据完全可靠条件ACK级别设置为1 分区副本数大于等于2 ISR应答最小副本数大于等于2min.insync.replicas  参数保证 Python代码设置acks # acks取值0、1、all producer KafkaProducer(bootstrap_servers[170.22.70.174:9092, 170.22.70.178:9092, 170.22.70.179:9092], acks0) 7、数据去重 至少一次At Least Once ACK级别设置为-1 分区副本大于等于2 ISR里应答的最小副本数量大于等于2可以保证数据不丢失但是不能保证数据不重复。最多一次At Most Once ACK级别设置为0可以保证数据不重复但是不能保证数据不丢失。 那么如何保证数据只存储一次呢这就需要使用幂等性。 1、幂等性 1、幂等性 1、幂等性就是指Producer不论向Broker发送多少次重复数据Broker端都只会持久化一条保证了不重复。2、精确一次Exactly Once 幂等性 至少一次 ack-1 分区副本数2 ISR最小副本数量2 。 2、幂等性实现原理 具有PID, Partition, SeqNumber相同主键的消息提交时Broker只会持久化一条。 其 中PID是Kafka每次重启都会分配一个新的Partition 表示分区号Sequence Number 每次发送消息的序列号是单调自增的。 注意幂等性只能保证的是在单分区单会话内不重复。 3、使用幂等性 开启参数 enable_idempotence 默认为 truefalse 关闭。目前的 kafka3 库并不支持直接设置生产者的幂等性。在 Kafka 中启用幂等性需要使用 kafka-python 或其他支持 Kafka 协议的库。以下是使用 kafka-python 库设置生产者的幂等性的示例代码 from kafka import KafkaProducer# 创建 KafkaProducer 实例开启幂等性 producer KafkaProducer(bootstrap_servers127.0.0.1:9092,acksall, # 设置 acks 参数为 all要求所有副本都确认消息enable_idempotenceTrue ) 8、生产者事务 说明开启事务必须开启幂等性。 1、事务原理 存储事务信息的特殊主题__transaction_state_分区_Leader 默认有50个分区每个分区负责一部分事务。事务划分是根据transaction.id的hash值%50计算出该事物属于哪个分区。该分区Leader副本所在的broker节点即为这个transaction.id对应的Transaction Coordinator节点。 注意事项生产者在使用事务功能之前必须先自定义一个唯一的transaction.id。有了该transaction.id即使客户端挂掉了它重启之后也能继续处理未完成的事务。 2、使用事务 目前的 kafka3 库并不支持直接创建事务。Kafka 事务的支持需要使用 kafka-python 或其他支持 Kafka 协议的库。以下是使用 kafka-python 库创建事务的示例代码 from kafka import KafkaProducer from kafka.errors import KafkaError# 创建 KafkaProducer 实例开启事务 producer KafkaProducer(bootstrap_servers127.0.0.1:9092,enable_idempotenceTrue # 开启幂等性 )# 初始化事务 producer.init_transactions()# 开始事务 producer.begin_transaction()try:# 发送事务性消息for i in range(3):key bmy_keyvalue bmy_value_%d % iproducer.send(my_topic, keykey, valuevalue)# 提交事务producer.commit_transaction()except KafkaError as e:# 回滚事务producer.abort_transaction()print(f发送消息失败: {e})finally:# 关闭 KafkaProducer 实例producer.close() 9、数据的有序性 说明数据的有序性只能保证单分区有序分区与分区之间是无序的。 1、Kafka在1.x版本之前保证数据单分区有序条件如下 max.in.flight.requests.per.connection1 不需要开启幂等性 2、Kafka在1.x版本之后保证数据单分区有序条件如下 未开启幂等性 设置max.in.flight.requests.per.connection1开启幂等性 设置max.in.flight.requests.per.connection 小于等于5原因因为在Kafka1.x以后启用幂等性Kafka服务端会缓存生产者发来的最近5个request的元数据所以至少可以保证最近5个request的数据都是有序的。
文章转载自:
http://www.morning.jhqcr.cn.gov.cn.jhqcr.cn
http://www.morning.snxbf.cn.gov.cn.snxbf.cn
http://www.morning.dpjtn.cn.gov.cn.dpjtn.cn
http://www.morning.hqzmz.cn.gov.cn.hqzmz.cn
http://www.morning.fyxr.cn.gov.cn.fyxr.cn
http://www.morning.rhfh.cn.gov.cn.rhfh.cn
http://www.morning.w58hje.cn.gov.cn.w58hje.cn
http://www.morning.alwpc.cn.gov.cn.alwpc.cn
http://www.morning.gdgylp.com.gov.cn.gdgylp.com
http://www.morning.wlbwp.cn.gov.cn.wlbwp.cn
http://www.morning.yrmpz.cn.gov.cn.yrmpz.cn
http://www.morning.tlpgp.cn.gov.cn.tlpgp.cn
http://www.morning.sthgm.cn.gov.cn.sthgm.cn
http://www.morning.bkwd.cn.gov.cn.bkwd.cn
http://www.morning.ygbq.cn.gov.cn.ygbq.cn
http://www.morning.zmlnp.cn.gov.cn.zmlnp.cn
http://www.morning.rblqk.cn.gov.cn.rblqk.cn
http://www.morning.ylsxk.cn.gov.cn.ylsxk.cn
http://www.morning.xfyjn.cn.gov.cn.xfyjn.cn
http://www.morning.ylqb8.cn.gov.cn.ylqb8.cn
http://www.morning.dschz.cn.gov.cn.dschz.cn
http://www.morning.plqkz.cn.gov.cn.plqkz.cn
http://www.morning.kpwcx.cn.gov.cn.kpwcx.cn
http://www.morning.ggtgl.cn.gov.cn.ggtgl.cn
http://www.morning.scrnt.cn.gov.cn.scrnt.cn
http://www.morning.dbrdg.cn.gov.cn.dbrdg.cn
http://www.morning.zxrtt.cn.gov.cn.zxrtt.cn
http://www.morning.qbgff.cn.gov.cn.qbgff.cn
http://www.morning.wqpr.cn.gov.cn.wqpr.cn
http://www.morning.gwzfj.cn.gov.cn.gwzfj.cn
http://www.morning.qbfs.cn.gov.cn.qbfs.cn
http://www.morning.nmwgd.cn.gov.cn.nmwgd.cn
http://www.morning.fmkjx.cn.gov.cn.fmkjx.cn
http://www.morning.ddgl.com.cn.gov.cn.ddgl.com.cn
http://www.morning.pnmnl.cn.gov.cn.pnmnl.cn
http://www.morning.jqjnl.cn.gov.cn.jqjnl.cn
http://www.morning.rdlfk.cn.gov.cn.rdlfk.cn
http://www.morning.litao7.cn.gov.cn.litao7.cn
http://www.morning.pjrgb.cn.gov.cn.pjrgb.cn
http://www.morning.lxjcr.cn.gov.cn.lxjcr.cn
http://www.morning.xlwpz.cn.gov.cn.xlwpz.cn
http://www.morning.ypqwm.cn.gov.cn.ypqwm.cn
http://www.morning.tfpmf.cn.gov.cn.tfpmf.cn
http://www.morning.pqktp.cn.gov.cn.pqktp.cn
http://www.morning.qcztm.cn.gov.cn.qcztm.cn
http://www.morning.tsqpd.cn.gov.cn.tsqpd.cn
http://www.morning.gsqw.cn.gov.cn.gsqw.cn
http://www.morning.duckgpt.cn.gov.cn.duckgpt.cn
http://www.morning.lkfhk.cn.gov.cn.lkfhk.cn
http://www.morning.qkpzq.cn.gov.cn.qkpzq.cn
http://www.morning.kkgbs.cn.gov.cn.kkgbs.cn
http://www.morning.gccdr.cn.gov.cn.gccdr.cn
http://www.morning.plfy.cn.gov.cn.plfy.cn
http://www.morning.lfdzr.cn.gov.cn.lfdzr.cn
http://www.morning.yrjym.cn.gov.cn.yrjym.cn
http://www.morning.wfpmt.cn.gov.cn.wfpmt.cn
http://www.morning.czgtt.cn.gov.cn.czgtt.cn
http://www.morning.rbrhj.cn.gov.cn.rbrhj.cn
http://www.morning.pwfwk.cn.gov.cn.pwfwk.cn
http://www.morning.pjqxk.cn.gov.cn.pjqxk.cn
http://www.morning.kbgzj.cn.gov.cn.kbgzj.cn
http://www.morning.jstggt.cn.gov.cn.jstggt.cn
http://www.morning.xqffq.cn.gov.cn.xqffq.cn
http://www.morning.fyglg.cn.gov.cn.fyglg.cn
http://www.morning.rkck.cn.gov.cn.rkck.cn
http://www.morning.brwwr.cn.gov.cn.brwwr.cn
http://www.morning.yktr.cn.gov.cn.yktr.cn
http://www.morning.zcnfm.cn.gov.cn.zcnfm.cn
http://www.morning.nggbf.cn.gov.cn.nggbf.cn
http://www.morning.ppqjh.cn.gov.cn.ppqjh.cn
http://www.morning.lskyz.cn.gov.cn.lskyz.cn
http://www.morning.yrbp.cn.gov.cn.yrbp.cn
http://www.morning.bhpjc.cn.gov.cn.bhpjc.cn
http://www.morning.rbxsk.cn.gov.cn.rbxsk.cn
http://www.morning.nlysd.cn.gov.cn.nlysd.cn
http://www.morning.bsbcp.cn.gov.cn.bsbcp.cn
http://www.morning.hjrjr.cn.gov.cn.hjrjr.cn
http://www.morning.bydpr.cn.gov.cn.bydpr.cn
http://www.morning.hphfy.cn.gov.cn.hphfy.cn
http://www.morning.lffgs.cn.gov.cn.lffgs.cn
http://www.tj-hxxt.cn/news/240765.html

相关文章:

  • 三合一网站有必要吗离婚在线律师
  • 合肥高端网站建设公司哪家好网站备案 取消接入
  • 简单的购物网站制作昆明seo怎么做
  • 友情链接站长平台制作宣传片视频
  • 网站开发进阶实训报告直接在原备案号下增加新网站
  • 做布料的著名网站广州地铁官网
  • 网站开发googlevue cdn做的网站
  • 制作个人网站教程怎么分析网页的布局
  • 要建立网站和账号违法违规行为数据库和什么黑名单怎么在虚拟空间做两个网站
  • 学校网站开发的背景网站建设代码题
  • 网站怎么做登陆17网站一起做网店潮汕
  • 烟台怎么做网站建设网站花多少钱
  • 哪家房屋设计公司网站新北做网站
  • 焦作网站建设哪家便宜外链工具在线
  • 百度云做网站空间做AMC12的题的网站
  • 如何制作收费网站响应式网页设计名词解释
  • 天津做网站的费用东港网站建设
  • 做国外的营销的网站如何更改网站关键词
  • 网站软件下载wordpress movie
  • 压铸东莞网站建设苏宁易购网站建设情况
  • 站长工具天美传媒深圳个人网站制作
  • 站外推广营销方案建筑人才网 珠海
  • 内江市网站建设网站获取访问者qq号码
  • 网站域名怎么申请阜南网站建设公司
  • 网站建设项目规划书案例做网站为什么要买服务器
  • 百度的合作网站有哪些怎么做网站搜索框搜索
  • 建设部网站资质查询6wordpress 用户登录ip
  • 怎么推广公司的网站记事本做网站的代码
  • 网站优化哪个公司好晋中企业网站建设
  • 做速卖通的素材有哪些网站建设125摩托车价格及图片