成功的网站应该具备哪些要素,凡客诚品网上购物,龙岩市城乡建设局网站进不去,网站关于我们的页面作者#xff1a;来自 Elastic Andre Luiz 将 Apache Kafka 与 Elasticsearch 集成的分步指南#xff0c;以便使用 Python、Docker Compose 和 Kafka Connect 实现高效的数据提取、索引和可视化。 在本文中#xff0c;我们将展示如何将 Apache Kafka 与 Elasticsearch 集成以…作者来自 Elastic Andre Luiz 将 Apache Kafka 与 Elasticsearch 集成的分步指南以便使用 Python、Docker Compose 和 Kafka Connect 实现高效的数据提取、索引和可视化。 在本文中我们将展示如何将 Apache Kafka 与 Elasticsearch 集成以进行数据提取和索引。我们将概述 Kafka、其生产者producers和消费者consumers的概念并创建一个日志索引其中将通过 Apache Kafka 接收和索引消息。该项目以 Python 实现代码可在 GitHub 上找到。 先决条件
Docker 和 Docker Compose确保你的机器上安装了 Docker 和 Docker Compose。Python 3.x运行生产者和消费者脚本。 Apache Kafka 简介
Apache Kafka 是一个分布式流媒体平台具有高可扩展性和可用性以及容错能力。在 Kafka 中数据管理通过主要组件进行
Broker/代理负责在生产者和消费者之间存储和分发消息。Zookeeper管理和协调 Kafka 代理控制集群的状态、分区领导者和消费者信息。Topics/主题发布和存储数据以供使用的渠道。Consumers 及 Producers/消费者和生产者生产者向主题发送数据而消费者则检索该数据。 这些组件共同构成了 Kafka 生态系统为数据流提供了强大的框架。 项目结构
为了理解数据提取过程我们将其分为几个阶段
基础设施配置/Infrastructure Provisioning设置 Docker 环境以支持 Kafka、Elasticsearch 和 Kibana。创建生产者/Producer Creation实现 Kafka 生产者将数据发送到日志主题。创建消费者/Consumer Creation开发 Kafka 消费者以读取和索引 Elasticsearch 中的消息。提取验证/Ingestion Validation验证和确认已发送和已使用的数据。 使用 Docker Compose 进行基础设施配置
我们利用 Docker Compose 来配置和管理必要的服务。下面你将找到 Docker Compose 代码它设置了 Apache Kafka、Elasticsearch 和 Kibana 集成所需的每项服务确保数据提取过程。
docker-compose.yml
version: 3services:zookeeper:image: confluentinc/cp-zookeeper:latestcontainer_name: zookeeperenvironment:ZOOKEEPER_CLIENT_PORT: 2181kafka:image: confluentinc/cp-kafka:latestcontainer_name: kafkadepends_on:- zookeeperports:- 9092:9092- 9094:9094environment:KAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST:${HOST_IP}:9092KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXTKAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXTKAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1elasticsearch:image: docker.elastic.co/elasticsearch/elasticsearch:8.15.1container_name: elasticsearch-8.15.1environment:- node.nameelasticsearch- xpack.security.enabledfalse- discovery.typesingle-node- ES_JAVA_OPTS-Xms512m -Xmx512mvolumes:- ./elasticsearch:/usr/share/elasticsearch/dataports:- 9200:9200kibana:image: docker.elastic.co/kibana/kibana:8.15.1container_name: kibana-8.15.1ports:- 5601:5601environment:ELASTICSEARCH_URL: http://elasticsearch:9200ELASTICSEARCH_HOSTS: [http://elasticsearch:9200]
你可以直接从 Elasticsearch Labs GitHub repo 访问该文件。 使用 Kafka 生产器发送数据
生产器负责将消息发送到日志主题。通过批量发送消息可以提高网络使用效率允许使用 batch_size 和 linger_ms 设置进行优化这两个设置分别控制批次的数量和延迟。配置 acksall 可确保消息持久存储这对于重要的日志数据至关重要。
producer KafkaProducer(bootstrap_servers[localhost:9092], # Specifies the Kafka server to connectvalue_serializerlambda x: json.dumps(x).encode(utf-8), # Serializes data as JSON and encodes it to UTF-8 before sendingbatch_size16384, # Sets the maximum batch size in bytes (here, 16 KB) for buffered messages before sendinglinger_ms10, # Sets the maximum delay (in milliseconds) before sending the batchacksall # Specifies acknowledgment level; all ensures message durability by waiting for all replicas to acknowledge
)def generate_log_message():levels [INFO, WARNING, ERROR, DEBUG]messages [User login successful,User login failed,Database connection established,Database connection failed,Service started,Service stopped,Payment processed,Payment failed]log_entry {level: random.choice(levels),message: random.choice(messages),timestamp: time.time()}return log_entrydef send_log_batches(topic, num_batches5, batch_size10):for i in range(num_batches):logger.info(fSending batch {i 1}/{num_batches})for _ in range(batch_size):log_message generate_log_message()producer.send(topic, valuelog_message)producer.flush()if __name__ __main__:topic logssend_log_batches(topic)producer.close()
当启动 producer 的时候会批量的向 topic 发送消息如下图
INFO:kafka.conn:Set configuration …
INFO:log_producer:Sending batch 1/5
INFO:log_producer:Sending batch 2/5
INFO:log_producer:Sending batch 3/5
INFO:log_producer:Sending batch 4/5 使用 Kafka Consumer 消费和索引数据
Consumer 旨在高效处理消息消费来自日志主题的批次并将其索引到 Elasticsearch 中。使用 auto_offset_resetlatest可确保 Consumer 开始处理最新消息忽略较旧的消息max_poll_records10 将批次限制为 10 条消息。使用 fetch_max_wait_ms2000Consumer 最多等待 2 秒以积累足够的消息然后再处理批次。
在其主循环中Consumer 消费日志消息、处理并将每个批次索引到 Elasticsearch 中确保持续的数据摄取。
consumer KafkaConsumer(logs, bootstrap_servers[localhost:9092],auto_offset_resetlatest, # Ensures reading from the latest offset if the group has no offset storedenable_auto_commitTrue, # Automatically commits the offset after processinggroup_idlog_consumer_group, # Specifies the consumer group to manage offset trackingmax_poll_records10, # Maximum number of messages per batchfetch_max_wait_ms2000 # Maximum wait time to form a batch (in ms)
)def create_bulk_actions(logs):for log in logs:yield {_index: logs,_source: {level: log[level],message: log[message],timestamp: log[timestamp]}}if __name__ __main__:try:print(Starting message processing…)while True:messages consumer.poll(timeout_ms1000) # Poll receive messages# process each batch messagesfor _, records in messages.items():logs [json.loads(record.value) for record in records]bulk_actions create_bulk_actions(logs)response helpers.bulk(es, bulk_actions)print(fIndexed {response[0]} logs.)except Exception as e:print(fErro: {e})finally:consumer.close()print(fFinish) 在 Kibana 中可视化数据
借助 Kibana我们可以探索和验证从 Kafka 提取并在 Elasticsearch 中编入索引的数据。通过访问 Kibana 中的开发工具你可以查看已编入索引的消息并确认数据符合预期。例如如果我们的 Kafka 生产者发送了 5 个批次每个批次 10 条消息我们应该在索引中看到总共 50 条记录。
要验证数据你可以在 Dev Tools 部分使用以下查询
GET /logs/_search
{query: {match_all: {}}
}
相应 此外Kibana 还提供了创建可视化和仪表板的功能可帮助使分析更加直观和具有交互性。下面你可以看到我们创建的一些仪表板和可视化示例它们以各种格式展示了数据增强了我们对所处理信息的理解。 使用 Kafka Connect 进行数据提取
Kafka Connect 是一种旨在促进数据源和目标接收器之间的集成的服务例如数据库或文件系统。它使用预定义的连接器来自动处理数据移动。在我们的例子中Elasticsearch 充当数据接收器。 使用 Kafka Connect我们可以简化数据提取过程无需手动将数据提取工作流实施到 Elasticsearch 中。借助适当的连接器Kafka Connect 允许将发送到 Kafka 主题的数据直接在 Elasticsearch 中编入索引只需进行最少的设置无需额外编码。 使用 Kafka Connect
要实现 Kafka Connect我们将 kafka-connect 服务添加到我们的 Docker Compose 设置中。此配置的一个关键部分是安装 Elasticsearch 连接器它将处理数据索引。
配置服务并创建 Kafka Connect 容器后将需要一个 Elasticsearch 连接器的配置文件。此文件定义基本参数例如
connection.urlElasticsearch 的连接 URL。topics连接器将监视的 Kafka 主题在本例中为 “logs”。type.nameElasticsearch 中的文档类型通常为 _doc。value.converter将 Kafka 消息转换为 JSON 格式。value.converter.schemas.enable指定是否应包含架构。schema.ignore 和 key.ignore在索引期间忽略 Kafka 架构和键的设置。
以下是在 Kafka Connect 中创建 Elasticsearch 连接器的 curl 命令
curl --location {{url}}/connectors \
--header Content-Type: application/json \
--data {name: elasticsearch-sink-connector,config: {connector.class: io.confluent.connect.elasticsearch.ElasticsearchSinkConnector,topics: logs,connection.url: http://elasticsearch:9200,type.name: _doc,value.converter: org.apache.kafka.connect.json.JsonConverter,value.converter.schemas.enable: false,schema.ignore: true,key.ignore: true}
}
通过此配置Kafka Connect 将自动开始提取发送到 “logs” 主题的数据并在 Elasticsearch 中对其进行索引。这种方法允许完全自动化的数据提取和索引而无需额外的编码从而简化整个集成过程。 结论
集成 Kafka 和 Elasticsearch 为实时数据提取和分析创建了一个强大的管道。本指南提供了一种构建强大数据提取架构的基础方法并在 Kibana 中实现无缝可视化和分析以适应未来更复杂的要求。
此外使用 Kafka Connect 使 Kafka 和 Elasticsearch 之间的集成更加简化无需额外的代码来处理和索引数据。Kafka Connect 使发送到特定主题的数据能够以最少的配置自动在 Elasticsearch 中编入索引。
想要获得 Elastic 认证了解下一次 Elasticsearch 工程师培训的时间
Elasticsearch 包含许多新功能可帮助你为你的用例构建最佳搜索解决方案。深入了解我们的示例笔记本以了解更多信息开始免费云试用或立即在本地机器上试用 Elastic。 原文How to ingest data to Elasticsearch through Kafka - Elasticsearch Labs 文章转载自: http://www.morning.rfldz.cn.gov.cn.rfldz.cn http://www.morning.qnywy.cn.gov.cn.qnywy.cn http://www.morning.qhydkj.com.gov.cn.qhydkj.com http://www.morning.wwznd.cn.gov.cn.wwznd.cn http://www.morning.fjzlh.cn.gov.cn.fjzlh.cn http://www.morning.rgsnk.cn.gov.cn.rgsnk.cn http://www.morning.ppllj.cn.gov.cn.ppllj.cn http://www.morning.jqmmf.cn.gov.cn.jqmmf.cn http://www.morning.skcmt.cn.gov.cn.skcmt.cn http://www.morning.lbgfz.cn.gov.cn.lbgfz.cn http://www.morning.xesrd.com.gov.cn.xesrd.com http://www.morning.wnbqy.cn.gov.cn.wnbqy.cn http://www.morning.tbkqs.cn.gov.cn.tbkqs.cn http://www.morning.gkjyg.cn.gov.cn.gkjyg.cn http://www.morning.lonlie.com.gov.cn.lonlie.com http://www.morning.nktgj.cn.gov.cn.nktgj.cn http://www.morning.xplng.cn.gov.cn.xplng.cn http://www.morning.gcqdp.cn.gov.cn.gcqdp.cn http://www.morning.qgcfb.cn.gov.cn.qgcfb.cn http://www.morning.xdpjs.cn.gov.cn.xdpjs.cn http://www.morning.xcjbk.cn.gov.cn.xcjbk.cn http://www.morning.gqwpl.cn.gov.cn.gqwpl.cn http://www.morning.tpqzs.cn.gov.cn.tpqzs.cn http://www.morning.qfdyt.cn.gov.cn.qfdyt.cn http://www.morning.btjyp.cn.gov.cn.btjyp.cn http://www.morning.kyytt.cn.gov.cn.kyytt.cn http://www.morning.glnxd.cn.gov.cn.glnxd.cn http://www.morning.lwygd.cn.gov.cn.lwygd.cn http://www.morning.pwdgy.cn.gov.cn.pwdgy.cn http://www.morning.rmxgk.cn.gov.cn.rmxgk.cn http://www.morning.thbkc.cn.gov.cn.thbkc.cn http://www.morning.khlxd.cn.gov.cn.khlxd.cn http://www.morning.zztkt.cn.gov.cn.zztkt.cn http://www.morning.tnktt.cn.gov.cn.tnktt.cn http://www.morning.rmfw.cn.gov.cn.rmfw.cn http://www.morning.mrckk.cn.gov.cn.mrckk.cn http://www.morning.yznsx.cn.gov.cn.yznsx.cn http://www.morning.dhqg.cn.gov.cn.dhqg.cn http://www.morning.tqdqc.cn.gov.cn.tqdqc.cn http://www.morning.dnqpq.cn.gov.cn.dnqpq.cn http://www.morning.kqpq.cn.gov.cn.kqpq.cn http://www.morning.ngpdk.cn.gov.cn.ngpdk.cn http://www.morning.jfbbq.cn.gov.cn.jfbbq.cn http://www.morning.cyysq.cn.gov.cn.cyysq.cn http://www.morning.yhglt.cn.gov.cn.yhglt.cn http://www.morning.pmftz.cn.gov.cn.pmftz.cn http://www.morning.lsfrc.cn.gov.cn.lsfrc.cn http://www.morning.rahllp.com.gov.cn.rahllp.com http://www.morning.lqjpb.cn.gov.cn.lqjpb.cn http://www.morning.rckmz.cn.gov.cn.rckmz.cn http://www.morning.zmzdx.cn.gov.cn.zmzdx.cn http://www.morning.fsbns.cn.gov.cn.fsbns.cn http://www.morning.mcjxq.cn.gov.cn.mcjxq.cn http://www.morning.qckwj.cn.gov.cn.qckwj.cn http://www.morning.qyfrd.cn.gov.cn.qyfrd.cn http://www.morning.kxymr.cn.gov.cn.kxymr.cn http://www.morning.flchj.cn.gov.cn.flchj.cn http://www.morning.dfltx.cn.gov.cn.dfltx.cn http://www.morning.bfsqz.cn.gov.cn.bfsqz.cn http://www.morning.nsrtvu.com.gov.cn.nsrtvu.com http://www.morning.pdmsj.cn.gov.cn.pdmsj.cn http://www.morning.hwnnm.cn.gov.cn.hwnnm.cn http://www.morning.kwwkm.cn.gov.cn.kwwkm.cn http://www.morning.gkmwk.cn.gov.cn.gkmwk.cn http://www.morning.ldqrd.cn.gov.cn.ldqrd.cn http://www.morning.ysnbq.cn.gov.cn.ysnbq.cn http://www.morning.mwwnz.cn.gov.cn.mwwnz.cn http://www.morning.wxccm.cn.gov.cn.wxccm.cn http://www.morning.rfmzs.cn.gov.cn.rfmzs.cn http://www.morning.pgggs.cn.gov.cn.pgggs.cn http://www.morning.tqldj.cn.gov.cn.tqldj.cn http://www.morning.kqyyq.cn.gov.cn.kqyyq.cn http://www.morning.plqkz.cn.gov.cn.plqkz.cn http://www.morning.kdgcx.cn.gov.cn.kdgcx.cn http://www.morning.rykmf.cn.gov.cn.rykmf.cn http://www.morning.pxbrg.cn.gov.cn.pxbrg.cn http://www.morning.ntqnt.cn.gov.cn.ntqnt.cn http://www.morning.mkyxp.cn.gov.cn.mkyxp.cn http://www.morning.c7493.cn.gov.cn.c7493.cn http://www.morning.mghgl.cn.gov.cn.mghgl.cn