环境设计专业资料网站,沧州网站建设制作,网站如何做三端适配,在淘宝上做的网站要转出作者#xff1a;来自 Elastic Andre Luiz
使用 Apache Camel 将数据提取到 Elasticsearch 的过程将搜索引擎的稳健性与集成框架的灵活性相结合。在本文中#xff0c;我们将探讨 Apache Camel 如何简化和优化将数据提取到 Elasticsearch。为了说明此功能#xff0c;我们将实…作者来自 Elastic Andre Luiz
使用 Apache Camel 将数据提取到 Elasticsearch 的过程将搜索引擎的稳健性与集成框架的灵活性相结合。在本文中我们将探讨 Apache Camel 如何简化和优化将数据提取到 Elasticsearch。为了说明此功能我们将实现一个入门应用程序逐步演示如何配置和使用 Apache Camel 将数据发送到 Elasticsearch。 什么是 Apache Camel
Apache Camel 是一个开源集成框架可简化不同系统的连接使开发人员可以专注于业务逻辑而不必担心系统通信的复杂性。Camel 的核心概念是 “routes - 路由”它定义了消息从源到目的地所遵循的路径可能包括转换、验证和过滤等中间步骤。 Apache Camel 架构 Camel 使用 “components- 组件” 连接不同的系统和协议例如数据库和消息传递服务并使用 “endpoints- 端点” 表示消息的入口点和出口点。这些概念提供了模块化和灵活的设计使配置和管理复杂集成变得更加容易高效且可扩展。 使用 Elasticsearch 和 Apache Camel
我们将演示如何配置一个简单的 Java 应用程序该应用程序使用 Apache Camel 将数据导入 Elasticsearch 集群。还将介绍使用 Apache Camel 中定义的路由在 Elasticsearch 中创建、更新和删除数据的过程。 1. 添加依赖项
配置此集成的第一步是将必要的依赖项添加到项目的 pom.xml 文件中。这将包括 Apache Camel 和 Elasticsearch 库。我们将使用新的 Java API 客户端库因此我们必须导入 camel-elasticsearch 组件并且版本必须与 camel-core 库相同。
如果你想使用 Java 低级 Rest 客户端则必须使用 Elasticsearch 低级 Rest 客户端组件。
dependencygroupIdorg.apache.camel/groupIdartifactIdcamel-core/artifactIdversion4.7.0/version
/dependencydependencygroupIdorg.apache.camel/groupIdartifactIdcamel-elasticsearch/artifactIdversion4.7.0/version
/dependencydependencygroupIdorg.apache.camel/groupIdartifactIdcamel-jackson/artifactIdversion4.7.0/version
/dependencydependencygroupIdco.elastic.clients/groupIdartifactIdelasticsearch-java/artifactIdversion8.14.3/version
/dependency2. 配置和运行 Camel 上下文
配置首先使用 DefaultCamelContext 类创建一个新的 Camel 上下文该类是定义和执行路由的基础。接下来我们配置 Elasticsearch 组件这将允许 Apache Camel 与 Elasticsearch 集群交互。ESlasticsearchComponent 实例配置为连接到地址 localhost:9200这是本地 Elasticsearch 集群的默认地址。对于需要身份验证的环境设置你应该阅读有关如何配置组件和启用基本身份验证的文档称为 “Configure the component and enable basic authentication - 配置组件和启用基本身份验证”。
public class ESComponent {public static ElasticsearchComponent getInstance() {var elasticsearch new ElasticsearchComponent();elasticsearch.setHostAddresses(localhost:9200);return elasticsearch;}public static String getName() {return elasticsearch;}
}然后将该组件添加到 Camel 上下文中使得定义的路由能够使用该组件在 Elasticsearch 中执行操作。
try (var context new DefaultCamelContext()) {context.addComponent(ESComponent.getName(), ESComponent.getInstance());context.addRoutes(new OperationBulkRoute());context.start();
}随后将路由添加到上下文中。我们将创建用于批量索引、更新和删除文档的路由。 3. 配置 Camel 路由 数据索引
我们将配置的第一个路由用于数据索引。我们将使用包含电影目录的 JSON 文件。路由将配置为读取位于 src/main/resources/movies.json 的文件将 JSON 内容反序列化为 Java 对象然后应用聚合策略将多条消息合并为一条从而允许在 Elasticsearch 中进行批量操作。配置了每条消息 500 个项目的大小即批量将一次索引 500 部电影。
路由 Elasticsearch 操作 bulk
String URI_BULK_OPERATION String.format(elasticsearch://elasticsearch?operation%sindexName%s,IndexOperationConfig.BULK_OPERATION,INDEX_NAME);public class OperationBulkRoute extends RouteBuilder {private static final Log log LogFactory.getLog(OperationBulkRoute.class);private static final int BULK_SIZE 500;Overridepublic void configure() {from(file:src/main/resources?fileNamemovies.jsonnooptrue).routeId(route-bulk-ingest).unmarshal().json().split(body()).aggregate(constant(true), new BulkAggregationStrategy()).completionSize(BULK_SIZE).to(URI_BULK_OPERATION).process(exchange - {var body exchange.getIn().getBody(String.class);log.info(String.format(Response: %s, body));}).end();}
}这批文档将被发送到 Elasticsearch 的批量操作端点。这种方法可确保处理大量数据时的效率和速度。 数据更新
下一个路由是更新文档。我们在上一步中索引了一些电影现在我们将创建新的路由通过参考代码搜索文档然后更新评级字段。
我们设置了一个 Camel 上下文 (DefaultCamelContext)其中注册了一个 Elasticsearch 组件并添加了一个自定义路由 IngestionRoute。操作首先通过 ProducerTemplate 发送文档代码然后从 direct:update-ingestion 端点启动路由。
try (var context new DefaultCamelContext()) {context.addComponent(ESComponent.getName(), ESComponent.getInstance());context.addRoutes(new IngestionRoute());context.start();ProducerTemplate producerTemplate context.createProducerTemplate();producerTemplate.sendBody(direct:update-ingestion, documentCode);Thread.sleep(5000);
}接下来我们有 IngestionRoute它是此流程的输入端点。该路由执行几个流水线操作。首先在 Elasticsearch 中进行搜索以按代码 (direct:search-by-id) 定位文档其中 SearchByCodeProcessor 根据代码组装查询。然后检索到的文档由 UpdateRatingProcessor 处理它将结果转换为 Movie 对象将电影评级movie rating更新为特定值并准备将更新后的文档发送回 Elasticsearch 进行更新。
public class IngestionRoute extends RouteBuilder {private static final Log log LogFactory.getLog(IngestionRoute.class);Overridepublic void configure() throws Exception {from(direct:update-ingestion).pipeline().to(direct:search-by-id).to(URI_SEARCH_OPERATION).to(direct:update-rating).to(URI_UPDATE_OPERATION).process(exchange - {var body exchange.getIn().getBody(String.class);log.info(String.format(Response: %s, body));}).end();from(direct:search-by-id).process(new SearchByCodeProcessor());from(direct:update-rating).process(new UpdateRatingProcessor());}
}SearchByCodeProcessor 处理器仅配置为执行搜索查询
public class SearchByCodeProcessor implements Processor {Overridepublic void process(Exchange exchange) throws Exception {var code exchange.getIn().getBody();String query {\n \query\: {\n \term\: {\n \code\: {\n \value\: code \n }\n }\n }\n };exchange.setProperty(document_code, code);exchange.getIn().setBody(query);}
}UpdateRatingProcessor 处理器负责更新评级字段。
public class UpdateRatingProcessor implements Processor {private final ObjectMapper objectMapper;public UpdateRatingProcessor() {this.objectMapper new ObjectMapper();this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);}Overridepublic void process(Exchange exchange) throws Exception {HitsMetadata response exchange.getIn().getBody(HitsMetadata.class);var code Long.parseLong(exchange.getProperty(document_code).toString());if (response ! null response.hits() ! null) {var documents parseToMovies(response);var optionalMovie documents.stream().filter(document - code (document.getSource().getCode())).findAny();optionalMovie.ifPresent(document - {document.getSource().setRating(13.0);MapString, Object updateMap new HashMap();updateMap.put(doc, document.getSource());exchange.getIn().setHeader(indexId, document.getId());exchange.getIn().setBody(updateMap);});}}数据删除
最后配置删除文档的路由。在这里我们将使用文档的 ID 删除文档。在 Elasticsearch 中要删除文档我们需要知道文档标识符、存储文档的索引并执行删除请求。在 Apache Camel 中我们将通过创建新路由来执行此操作如下所示。
路由从 direct:op-delete 端点开始该端点作为入口点。当需要删除文档时将在消息正文中收到其标识符 (_id)。然后路由使用 simple(${body}) 将 indexId 标头设置为该标识符的值这会从消息正文中提取 _id。
public class OperationDeleteRoute extends RouteBuilder {private static final Log log LogFactory.getLog(OperationDeleteRoute.class);Overridepublic void configure() {from(direct:op-delete).routeId(route-delete).setHeader(indexId, simple(${body})).to(URI_DELETE_OPERATION).process(exchange - {var body exchange.getIn().getBody(String.class);log.info(String.format(Response: %s, body));}).end();;}
}String URI_DELETE_OPERATION String.format(elasticsearch://elasticsearch?operation%sindexName%s,IndexOperationConfig.DELETE_OPERATION,INDEX_NAME);最后消息被定向到URI_DELETE_OPERATION指定的端点该端点连接到 Elasticsearch 以执行相应索引中的文档删除操作。 现在我们已经创建了路由我们可以创建一个 Camel 上下文DefaultCamelContext它被配置为包含 Elasticsearch 组件。
try (var context new DefaultCamelContext()) {context.addComponent(ESComponent.getName(), ESComponent.getInstance());context.addRoutes(new OperationDeleteRoute());context.start();ProducerTemplate producerTemplate context.createProducerTemplate();producerTemplate.sendBody(direct:op-delete, documentId);
}接下来将 OperationDeleteRoute 类定义的删除路由delete route添加到上下文中。初始化上下文后使用 ProducerTemplate 将应删除的文档的标识符传递给 direct:op-delete 端点从而触发删除路由。 结论
Apache Camel 与 Elasticsearch 之间的集成允许实现强大而高效的数据提取利用 Camel 的灵活性来定义可以处理不同数据操作场景例如索引、更新和删除的路由。通过此设置你可以以可扩展的方式编排和自动化复杂流程确保你的数据在 Elasticsearch 中得到有效管理。此示例演示了如何将这些工具一起使用来创建高效且适应性强的数据提取解决方案。 参考资料
Apache CamelApache Camel 架构聚合 Apache Camel文件组件Elasticsearch 组件 准备好自己尝试一下了吗开始免费试用。 想要获得 Elastic 认证吗了解下一期 Elasticsearch 工程师培训何时开始 原文https://www.elastic.co/search-labs/blog/elasticsearch-apache-camel-ingest-data 文章转载自: http://www.morning.rwbh.cn.gov.cn.rwbh.cn http://www.morning.lhhdy.cn.gov.cn.lhhdy.cn http://www.morning.qrdkk.cn.gov.cn.qrdkk.cn http://www.morning.rrqbm.cn.gov.cn.rrqbm.cn http://www.morning.yqpck.cn.gov.cn.yqpck.cn http://www.morning.cgbgc.cn.gov.cn.cgbgc.cn http://www.morning.mzcsp.cn.gov.cn.mzcsp.cn http://www.morning.yfcyh.cn.gov.cn.yfcyh.cn http://www.morning.kqnwy.cn.gov.cn.kqnwy.cn http://www.morning.rjkfj.cn.gov.cn.rjkfj.cn http://www.morning.mbbgk.com.gov.cn.mbbgk.com http://www.morning.nbiotank.com.gov.cn.nbiotank.com http://www.morning.nzmhk.cn.gov.cn.nzmhk.cn http://www.morning.qxwgx.cn.gov.cn.qxwgx.cn http://www.morning.tbplf.cn.gov.cn.tbplf.cn http://www.morning.dcdhj.cn.gov.cn.dcdhj.cn http://www.morning.yfpnl.cn.gov.cn.yfpnl.cn http://www.morning.itvsee.com.gov.cn.itvsee.com http://www.morning.kqnwy.cn.gov.cn.kqnwy.cn http://www.morning.rjxwq.cn.gov.cn.rjxwq.cn http://www.morning.mkygc.cn.gov.cn.mkygc.cn http://www.morning.dxzcr.cn.gov.cn.dxzcr.cn http://www.morning.mfbzr.cn.gov.cn.mfbzr.cn http://www.morning.wdlyt.cn.gov.cn.wdlyt.cn http://www.morning.kdgcx.cn.gov.cn.kdgcx.cn http://www.morning.mqxzh.cn.gov.cn.mqxzh.cn http://www.morning.drkk.cn.gov.cn.drkk.cn http://www.morning.zffn.cn.gov.cn.zffn.cn http://www.morning.klyyd.cn.gov.cn.klyyd.cn http://www.morning.hhfwj.cn.gov.cn.hhfwj.cn http://www.morning.cxryx.cn.gov.cn.cxryx.cn http://www.morning.kmldm.cn.gov.cn.kmldm.cn http://www.morning.sgnxl.cn.gov.cn.sgnxl.cn http://www.morning.gkmwk.cn.gov.cn.gkmwk.cn http://www.morning.xbbrh.cn.gov.cn.xbbrh.cn http://www.morning.bnrff.cn.gov.cn.bnrff.cn http://www.morning.zpnfc.cn.gov.cn.zpnfc.cn http://www.morning.mbfkt.cn.gov.cn.mbfkt.cn http://www.morning.nwzcf.cn.gov.cn.nwzcf.cn http://www.morning.hphrz.cn.gov.cn.hphrz.cn http://www.morning.zdzgf.cn.gov.cn.zdzgf.cn http://www.morning.mspkz.cn.gov.cn.mspkz.cn http://www.morning.ygwbg.cn.gov.cn.ygwbg.cn http://www.morning.hpkgm.cn.gov.cn.hpkgm.cn http://www.morning.rkfh.cn.gov.cn.rkfh.cn http://www.morning.gyqnp.cn.gov.cn.gyqnp.cn http://www.morning.kqfdrqb.cn.gov.cn.kqfdrqb.cn http://www.morning.cwznh.cn.gov.cn.cwznh.cn http://www.morning.rdlfk.cn.gov.cn.rdlfk.cn http://www.morning.qlpq.cn.gov.cn.qlpq.cn http://www.morning.hwljx.cn.gov.cn.hwljx.cn http://www.morning.rswfj.cn.gov.cn.rswfj.cn http://www.morning.jyznn.cn.gov.cn.jyznn.cn http://www.morning.zrkws.cn.gov.cn.zrkws.cn http://www.morning.qnxzx.cn.gov.cn.qnxzx.cn http://www.morning.lmmh.cn.gov.cn.lmmh.cn http://www.morning.rglzy.cn.gov.cn.rglzy.cn http://www.morning.ljcf.cn.gov.cn.ljcf.cn http://www.morning.hhpkb.cn.gov.cn.hhpkb.cn http://www.morning.ppbrq.cn.gov.cn.ppbrq.cn http://www.morning.rwyd.cn.gov.cn.rwyd.cn http://www.morning.gnghp.cn.gov.cn.gnghp.cn http://www.morning.drggr.cn.gov.cn.drggr.cn http://www.morning.zrbpx.cn.gov.cn.zrbpx.cn http://www.morning.fslrx.cn.gov.cn.fslrx.cn http://www.morning.xrct.cn.gov.cn.xrct.cn http://www.morning.ftzll.cn.gov.cn.ftzll.cn http://www.morning.zkdmk.cn.gov.cn.zkdmk.cn http://www.morning.bwznl.cn.gov.cn.bwznl.cn http://www.morning.zmyhn.cn.gov.cn.zmyhn.cn http://www.morning.jnbsx.cn.gov.cn.jnbsx.cn http://www.morning.lyhrg.cn.gov.cn.lyhrg.cn http://www.morning.knjj.cn.gov.cn.knjj.cn http://www.morning.tqbqb.cn.gov.cn.tqbqb.cn http://www.morning.mnwsy.cn.gov.cn.mnwsy.cn http://www.morning.hxcrd.cn.gov.cn.hxcrd.cn http://www.morning.kzxlc.cn.gov.cn.kzxlc.cn http://www.morning.sjpbh.cn.gov.cn.sjpbh.cn http://www.morning.nstml.cn.gov.cn.nstml.cn http://www.morning.jlnlr.cn.gov.cn.jlnlr.cn