校园网站开发的目的,网站蓝色配色,零售管理系统哪个软件好,新手做网站教程学习目标#xff1a;三栖合一架构师
本文是《大数据Flink学习圣经》 V1版本#xff0c;是 《尼恩 大数据 面试宝典》姊妹篇。
这里特别说明一下#xff1a;《尼恩 大数据 面试宝典》5个专题 PDF 自首次发布以来#xff0c; 已经汇集了 好几百题#xff0c;大量的大厂面试…学习目标三栖合一架构师
本文是《大数据Flink学习圣经》 V1版本是 《尼恩 大数据 面试宝典》姊妹篇。
这里特别说明一下《尼恩 大数据 面试宝典》5个专题 PDF 自首次发布以来 已经汇集了 好几百题大量的大厂面试干货、正货 。 《尼恩 大数据 面试宝典》面试题集合 将变成大数据学习和面试的必读书籍。
于是尼恩架构团队 趁热打铁推出 《大数据Flink学习圣经》《大数据HBASE学习圣经》
《大数据Flink学习圣经》 后面会不断升级不断 迭代 变成大数据领域 学习和面试的必读书籍
最终帮助大家成长为 三栖合一架构师进大厂拿高薪。 《尼恩 架构笔记》《尼恩高并发三部曲》《尼恩Java面试宝典》的PDF请到公号【技术自由圈】取 “Java大数据” 双栖架构成功案例
成功案例1
惊天大逆袭失业4个月3年小伙1个月喜提架构Offer而且是大龄跨行超级牛
成功案例2
极速拿offer阿里P6被裁后极速上岸1个月内喜提2优质offer(含滴滴) 文章目录 学习目标三栖合一架构师“Java大数据” 双栖架构成功案例1. 什么是Flink大数据分布式计算分布式存储分布式存储分布式文件系统 批处理和流处理批处理Batch Processing流处理Stream Processing 开源大数据技术HadoopYARNYet Another Resource NegotiatorSparkFlink 2. Flink 部署Master-Worker架构兼容性 Standalone集群的特点Standalone集群的部署方式Docker部署flink简单集群1、在服务器创建flink目录2、docker-compose.yml脚本创建3、启动flink4、浏览器上查看页面dashboard 3. Flink快速应用实操1 单词统计案例批数据1.1 需求1.2 代码实现 实操2 单词统计案例流数据2.1 需求2.2 代码实现 Flink程序开发的流程总结 4. Flink分布式架构与核心组件Flink作业提交过程Flink核心组件Flink组件栈作业执行阶段 5. Flink 开发开发环境搭建Flink连接器Source基于本地集合的Source基于文件的Source读取本地文件读取HDFS文件数据读取CSV文件数据读取压缩文件遍历目录读取kafka 自定义source Sink基于本地集合的sink基于文件的sink将数据写入本地文件将数据写入HDFS Flink APImapflatMapmapPartitionfilterreducereduceGroupaggregatedistinctjoinunionconnectrebalancehashPartitionsortPartition 窗口时间概念窗口程序滚动窗口滑动窗口会话窗口基于数量窗口 触发器清除器 6. Flink程序本地执行和集群执行6.1. 本地执行6.2. 集群执行 7. Flink的广播变量8. Flink的累加器9. Flink的分布式缓存说在后面作者介绍参考推荐阅读 1. 什么是Flink
大数据
大数据Big Data是指规模庞大、结构多样且速度快速增长的数据集合。这些数据集合通常包含传统数据库管理系统无法有效处理的数据具有高度的复杂性和挑战性。大数据的主要特点包括三个维度**三V**即Volume数据量大、Variety数据多样性、Velocity数据速度。
数据量大Volume大数据的最明显特征之一是其庞大的数据量。传统的数据处理方法和工具在处理这种规模的数据时可能会变得低效或不可行。数据多样性Variety大数据不仅包括结构化数据如表格数据还包括半结构化数据如JSON、XML和非结构化数据如文本、图像、音频、视频等。这些数据可能来自不同的源头和不同的格式。数据速度快Velocity大数据往往以高速率产生、流动和累积。这要求数据处理系统能够实时或近实时地处理数据以便从中获取有价值的信息。
分布式计算
随着计算机技术的发展和数据规模的增大单台计算机的处理能力和存储容量逐渐变得有限无法满足大数据处理的要求。为了应对这一挑战分布式计算应运而生它利用多台计算机组成集群将计算任务分割成多个子任务并在不同的计算节点上并行执行从而提高计算效率和处理能力。
分布式计算的核心思想是将大问题划分为小问题将任务分发给多个计算节点并行执行最后将结果合并得到最终的解。这种方式有效地解决了单台计算机无法处理大规模数据和高并发计算的问题。同时分布式计算还具有良好的可扩展性可以根据数据量的增加灵活地扩展集群规模以应对不断增长的数据挑战。
分布式计算的概念听起来很高深其背后的思想却十分朴素即分而治之又称为分治法Divide and Conquer。分治法是一种解决问题的算法设计策略它将一个问题分解成多个相同或相似的子问题然后分别解决这些子问题最后将子问题的解合并起来得到原问题的解。分治法常用于解决复杂问题尤其是在大数据处理中可以将大规模的数据集合分割成更小的部分然后分别处理这些部分最后合并结果。
在处理大数据问题时可以使用分治法的思想来提高效率和可扩展性以下是一些应用分治法处理大数据问题的示例 MapReduce 模式分治法的经典应用是 MapReduce 模式它将大规模的数据集合分为多个小块每个小块由不同的计算节点进行处理然后将结果合并。这种方法适用于批处理任务如数据清洗、转换、聚合等。 并行计算将大规模的计算任务分解成多个小任务分配给不同的计算节点并行处理最后合并结果。这适用于需要大量计算的问题如数值模拟、图算法等。 分布式排序将大规模数据集合分割成多个部分每个部分在不同的计算节点上进行排序然后使用合并排序算法将这些有序部分合并为整体有序的数据集合。 分区和分片在分布式存储系统中可以将数据分区和分片存储在不同的节点上通过分区键或哈希函数将数据分配到不同的存储节点上从而实现数据的分布式存储和管理。 分布式机器学习将大规模的机器学习任务分解成多个子任务在分布式计算环境中分别进行训练然后合并模型参数如分布式随机梯度下降算法。 数据分割和合并对于需要频繁访问的大数据集合可以将数据分割成多个小块每个小块存储在不同的存储节点上然后根据需要进行合并以减少数据访问的开销。 分治法在大数据处理中的应用不仅有助于提高处理效率还可以充分利用分布式计算和存储资源从而更好地应对大数据量和复杂性。然而在应用分治法时需要考虑合适的数据分割策略、任务调度、结果合并等问题以确保分治法的正确性和性能。
然而分布式计算也带来了一些挑战如数据一致性、通信开销、任务调度等问题需要综合考虑各种因素来设计和优化分布式系统。同时分布式计算也需要开发者具备分布式系统设计和调优的知识和技能以确保系统的性能和稳定性。
分布式存储
当数据量巨大且单机存储已无法满足需求时分布式存储和分布式文件系统成为处理大数据的关键技术。下面我会详细介绍分布式存储和分布式文件系统的概念、特点和常见的实现。
分布式存储
分布式存储是将数据分散存储在多个节点上以提供高容量、高性能、高可靠性和可扩展性的数据存储解决方案。每个节点都可以通过网络访问数据并且多个节点协同工作来处理数据请求。分布式存储的核心目标是解决单机存储的瓶颈同时提供高可靠性和可用性。
分布式存储的特点包括
横向扩展性可以通过增加节点来扩展存储容量和性能适应不断增长的数据量和负载。高可靠性和容错性数据在多个节点上冗余存储当某个节点出现故障时数据依然可用不会丢失。数据分布和复制数据按照一定策略分布在不同节点上数据的复制确保了数据的可用性和容错性。并发访问和高性能支持多个客户端同时访问数据实现高并发和更好的性能。灵活的数据模型支持多种数据类型和访问方式如文件系统、对象存储、键值存储等。
分布式文件系统
分布式文件系统是一种特殊类型的分布式存储主要用于存储和管理文件数据。它提供了类似于传统单机文件系统的接口但是在底层实现上数据被分散存储在多个节点上。分布式文件系统能够自动处理数据的分布、复制、一致性和故障恢复等问题。
常见的分布式文件系统特点包括
命名空间和路径分布式文件系统通过路径来访问文件类似于传统文件系统的目录结构。数据分布和复制文件被切分成块并分散存储在多个节点上同时进行数据复制以实现冗余和高可用性。一致性和数据一致性模型分布式文件系统需要保证数据的一致性不同节点上的数据副本需要保持同步。访问控制和权限管理提供用户和应用程序访问控制和权限管理功能确保数据安全性。高性能分布式文件系统通常优化了数据的读写性能以满足大数据场景的需求。扩展性可以通过增加节点来扩展存储容量和性能。
常见的分布式文件系统包括
Hadoop HDFSHadoop Distributed File SystemHadoop生态系统中的分布式文件系统适用于大数据存储。Ceph开源的分布式存储系统提供块存储、文件系统和对象存储。GlusterFS开源的分布式文件系统可以线性扩展存储容量和性能。
总之分布式存储和分布式文件系统在大数据时代扮演着重要角色帮助我们存储、管理和访问海量的数据解决了传统单机存储无法应对的挑战。
批处理和流处理
批处理和流处理是大数据处理领域中常见的两种数据处理模式用于不同类型的数据处理需求。下面将详细介绍这两种模式并给出相关的应用场景示例。
批处理Batch Processing
批处理是指将一批数据集合在一起在一个固定的时间间隔内对这批数据进行处理和分析。批处理通常适用于数据量较大、处理周期较长、要求高一致性的场景。
特点
数据被集中处理适合周期性分析和报告生成。数据被切分成小块每个小块在一个作业中被处理。数据处理时间较长不适合实时性要求高的场景。
应用场景示例
离线数据分析对历史数据进行分析从中发现趋势、模式和规律用于业务决策。例如销售数据分析、用户行为分析。批量推荐系统基于用户历史行为数据定期生成推荐结果。例如电影推荐、商品推荐。数据清洗和预处理对大规模数据进行清洗、过滤和预处理提高数据质量和可用性。例如清理无效数据、填充缺失值。大规模ETLExtract, Transform, Load将数据从源系统中抽取出来经过转换和加工后加载到目标系统。例如数据仓库的构建。
流处理Stream Processing
流处理是指在数据生成的时候立即进行处理实现数据的实时处理和分析。流处理通常适用于数据实时性要求高、需要快速响应的场景。
特点
数据是实时流动的需要快速处理和响应。数据是持续不断地到达需要实时计算和分析。可能会遇到延迟和数据乱序等问题。
应用场景示例
实时监控和告警对实时数据进行监控和分析及时发现异常并触发告警。例如网络流量监控、系统性能监控。实时数据分析对流式数据进行实时分析从中提取有价值的信息。例如实时点击流分析、实时市场行情分析。实时推荐系统基于用户实时行为数据实时生成推荐结果。例如新闻推荐、广告推荐。实时数据仓库构建实时数据仓库将实时数据集成、加工和分析。例如实时销售数据分析、实时用户行为分析。
总之批处理和流处理分别适用于不同类型的数据处理需求根据业务需求和实时性要求选择合适的处理模式。
开源大数据技术 当谈论大数据处理时Hadoop、YARN、Spark和Flink都是重要的技术。它们都属于大数据领域的分布式计算框架但在功能和使用方式上有所不同。
Hadoop
Hadoop是一个开源的分布式存储和计算框架最初由Apache开发用于处理大规模数据集。Hadoop的核心组件包括 Hadoop Distributed File SystemHDFSHDFS是一种分布式文件系统用于存储大规模数据。它将数据分成多个块并将这些块分散存储在集群中的不同节点上。HDFS支持高可靠性、冗余存储和数据复制。 MapReduceMapReduce是Hadoop的计算模型用于处理分布式数据。它将计算任务分成Map和Reduce两个阶段分布在集群中的节点上并行执行。Map阶段负责数据的拆分和处理Reduce阶段负责数据的汇总和计算。
YARNYet Another Resource Negotiator
YARN是Hadoop的资源管理器它负责集群资源的管理和分配。YARN将集群资源划分为容器Containers并分配给不同的应用程序。这种资源的隔离和管理允许多个应用程序同时在同一个Hadoop集群上运行从而提高了资源利用率和集群的多租户能力。
Spark
Apache Spark是一个通用的分布式计算引擎旨在提供高性能、易用性和多功能性。与传统的Hadoop MapReduce相比Spark具有更快的执行速度因为它将数据加载到内存中并进行内存计算。Spark支持多种计算模式包括批处理、交互式查询、流处理和机器学习。
Spark的主要特点和组件包括 RDDResilient Distributed DatasetRDD是Spark的核心数据抽象表示分布式的数据集。RDD支持并行操作和容错性可以在计算过程中重新计算丢失的分区。 Spark SQLSpark SQL是用于处理结构化数据的组件支持SQL查询和操作。它能够将RDD和传统的数据源如Hive无缝集成。 Spark StreamingSpark Streaming是用于处理实时流数据的模块支持微批处理模式。它能够将实时数据流分割成小批次并进行处理。 MLlibMLlib是Spark的机器学习库提供了常见的机器学习算法和工具用于训练和评估模型。 GraphXGraphX是Spark的图计算库用于处理图数据和图算法。
Flink
Apache Flink是一个流式处理引擎和分布式批处理框架具有低延迟、高吞吐量和容错性。Flink支持流批一体化能够实现实时流处理和批处理作业的无缝切换。它的核心特点包括
DataStream APIFlink的DataStream API用于处理实时流数据支持事件时间处理、窗口操作和状态管理。它能够处理高吞吐量的实时数据流。DataSet APIFlink的DataSet API用于批处理作业类似于Hadoop的MapReduce。它支持丰富的操作符和优化技术。Stateful Stream ProcessingFlink支持有状态的流式处理可以在处理过程中保存和管理状态。这对于实现复杂的数据处理逻辑很有用。Event Time ProcessingFlink支持事件时间处理能够处理乱序事件并准确计算窗口操作的结果。Table API和SQLFlink提供了Table API和SQL查询使开发人员可以使用类似SQL的语法来查询和分析数据。可以连接大数据生态圈各类组件包括Kafka、Elasticsearch、JDBC、HDFS和Amazon S3可以运行在Kubernetes、YARN、Mesos和独立Standalone集群上。
Flink在流处理上的几个主要优势如下: 真正的流计算引擎Flink具有更好的streaming计算模型,可以进行非常高效的状态运算和窗口操作。Spark Streaming仍然是微批处理引擎。 更低延迟Flink可以实现毫秒级的低延迟处理,而Spark Streaming延迟较高。 更好的容错机制Flink支持更细粒度的状态管理和检查点机制,可以实现精确一次的状态一致性语义。Spark较难做到确保exactly once。 支持有限数据流和无限数据流Flink可处理有开始和结束的有限数据流,也能处理无限不断增长的数据流。Spark Streaming更适合有限数据集。 更易统一批处理和流处理Flink提供了DataStream和DataSet API,可以轻松统一批处理和流处理。Spark需要联合Spark SQL使用。 更优秀的内存管理Flink具有自己的内存管理,可以根据不同查询优化内存使用。Spark依赖Hadoop YARN进行资源调度。 更高性能在部分场景下,Flink拥有比Spark Streaming更高的吞吐和低的延迟。
总体来说Flink作为新一代流处理引擎在延迟、容错、易用性方面优于Spark Streaming。但Spark生态更加完善,也在努力减小与Flink的差距。需要根据具体场景选择最优的框架。
总的来说Flink在流处理领域的优势主要体现在事件时间处理、低延迟、精确一次语义和状态管理等方面。这些特性使得Flink在处理实时流数据时能够更好地满足复杂的业务需求特别是对于需要高准确性和可靠性的应用场景。
2. Flink 部署
Apache Flink在1.7版本中进行了重大的架构重构引入了Master-Worker架构这使得Flink能够更好地适应不同的集群基础设施包括Standalone、Hadoop YARN和Kubernetes等。下面会详细介绍一下Flink 1.7版本引入的Master-Worker架构以及其在不同集群基础设施中的适应性。
Master-Worker架构
Flink 1.7版本中引入的Master-Worker架构是为了解决之前版本中存在的一些问题如资源管理、高可用性等。在这个架构中Flink将任务管理和资源管理分离引入了JobManager和ResourceManager两个主要角色。 JobManager负责接受和调度任务维护任务的状态和元数据信息还负责处理容错机制。JobManager分为两种JobManager高可用模式和StandaloneJobManager非高可用模式。 ResourceManager负责管理集群中的资源包括分配任务的资源、维护资源池等。
这种架构的优势在于解耦任务的管理和资源的管理使得Flink能够更好地适应不同的集群环境和基础设施。
兼容性
Flink的Master-Worker架构设计使其能够兼容几乎所有主流信息系统的基础设施包括 Standalone集群在Standalone模式下Flink的JobManager和ResourceManager都运行在同一个进程中适用于简单的开发和测试场景。 Hadoop YARN集群Flink可以部署在现有的Hadoop YARN集群上通过ResourceManager与YARN ResourceManager进行交互实现资源管理。 Kubernetes集群Flink还支持在Kubernetes集群中部署通过Kubernetes提供的资源管理能力来管理任务和资源。
这种兼容性使得Flink可以灵活地在不同的集群环境中运行满足不同场景下的需求。
总之Flink在1.7版本中引入的Master-Worker架构使其在资源管理、高可用性等方面有了更好的表现同时也使得Flink能够更好地适应各种不同的集群基础设施包括Standalone、Hadoop YARN和Kubernetes等。这为Flink的部署和使用带来了更多的灵活性和选择性。
Standalone集群是Apache Flink中一种简单的部署模式适用于开发、测试和小规模应用场景。下面我将详细介绍Standalone集群的特点以及部署方式。
Standalone集群的特点 简单部署Standalone集群是Flink的最简单部署模式之一不需要依赖其他集群管理工具可以在单个机器上部署。 资源共享Standalone集群中的JobManager和TaskManager共享同一份资源例如内存和CPU。这使得资源管理相对简单但也可能在资源竞争时影响任务的性能。 适用于开发和测试Standalone集群适用于开发和测试阶段可以在本地机器上模拟Flink集群环境方便开发人员进行调试和测试。 不支持高可用性Standalone集群默认情况下不支持高可用性即不具备故障恢复和任务迁移的能力。如果需要高可用性可以通过运行多个JobManager实例来实现。
Standalone集群的部署方式 安装Flink首先需要下载并安装Flink。可以从官方网站下载预编译的二进制文件解压到指定目录。也可以从以下网站下载 apache-flink安装包下载_开源镜像站-阿里云 (aliyun.com) 配置Flink进入Flink的安装目录修改conf/flink-conf.yaml配置文件。主要配置项包括jobmanager.rpc.address和taskmanager.numberOfTaskSlots等。 启动JobManager打开终端进入Flink安装目录执行以下命令启动JobManager ./bin/start-cluster.sh启动TaskManager打开终端进入Flink安装目录执行以下命令启动TaskManager ./bin/taskmanager.sh start提交作业使用Flink客户端工具提交作业。可以使用以下命令提交JAR文件中的作业 ./bin/flink run -c your.main.Class ./path/to/your.jar停止集群可以使用以下命令停止整个Standalone集群 ./bin/stop-cluster.sh总之Standalone集群是一个简单且易于部署的Flink集群模式适用于开发、测试和小规模应用场景。然而由于其资源共享和不支持高可用性的特点不适合部署在生产环境中。
下面提供利用Docker部署flink standalone简单集群。
Docker部署flink简单集群
Flink程序可以作为集群内的分布式系统运行也可以以独立模式或在YARN、Mesos、基于Docker的环境和其他资源管理框架下进行部署。
1、在服务器创建flink目录
mkdir flink目录的结构如下 2、docker-compose.yml脚本创建
docker 容器的编排文件具体如下 3、启动flink
1后台运行
一般推荐生产环境下使用该选项。
docker-compose up -d2前台运行
控制台将会同时打印所有容器的输出信息可以很方便进行调试。
docker-compose up4、浏览器上查看页面dashboard
访问web界面
http://cdh1:8081/3. Flink快速应用
通过一个单词统计的案例快速上手应用Flink进行流处理Streaming和批处理Batch
实操1 单词统计案例批数据
1.1 需求
统计一个文件中各个单词出现的次数把统计结果输出到文件
步骤 1、读取数据源 2、处理数据源
a、将读到的数据源文件中的每一行根据空格切分
b、将切分好的每个单词拼接1
c、根据单词聚合将相同的单词放在一起
d、累加相同的单词单词后面的1进行累加
3、保存处理结果
1.2 代码实现
引入依赖
!--flink核心包--
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion1.7.2/version
/dependency
!--flink流处理包--
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java_2.12/artifactIdversion1.7.2/versionscopeprovided/scope
/dependency Java程序
package com.crazymaker.bigdata.wordcount.batch;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;/*** 1、读取数据源* 2、处理数据源* a、将读到的数据源文件中的每一行根据空格切分* b、将切分好的每个单词拼接1* c、根据单词聚合将相同的单词放在一起* d、累加相同的单词单词后面的1进行累加* 3、保存处理结果*/
public class WordCountJavaBatch {public static void main(String[] args) throws Exception {String inputPathD:\\data\\input\\hello.txt;String outputPathD:\\data\\output\\hello.txt;//获取flink的运行环境ExecutionEnvironment executionEnvironment ExecutionEnvironment.getExecutionEnvironment();DataSetString text executionEnvironment.readTextFile(inputPath);FlatMapOperatorString, Tuple2String, Integer wordOndOnes text.flatMap(new SplitClz());//0代表第1个元素UnsortedGroupingTuple2String, Integer groupedWordAndOne wordOndOnes.groupBy(0);//1代表第1个元素AggregateOperatorTuple2String, Integer out groupedWordAndOne.sum(1);out.writeAsCsv(outputPath, \n, ).setParallelism(1);//设置并行度executionEnvironment.execute();//人为调用执行方法}static class SplitClz implements FlatMapFunctionString,Tuple2String,Integer{public void flatMap(String s, CollectorTuple2String, Integer collector) throws Exception {String[] s1 s.split( );for (String word:s1) {collector.collect(new Tuple2String,Integer(word,1));//发送到下游}}}
}源文件的内容 统计的结果 实操2 单词统计案例流数据
nc
netcat:
flink开发时候经常用socket作为source使用linux/mac环境开发可以在终端中开启 nc -l 9000(开启netcat程序作为服务端发送数据)
nc是netcat的缩写有着网络界的瑞士军刀美誉。因为它短小精悍、功能实用被设计为一个简单、可靠的网络工具。
nc作用
数据传输文件传输机器之间网络测速
2.1 需求
Socket模拟实时发送单词
使用Flink实时接收数据对指定时间窗口内如5s的数据进行聚合统计每隔1s汇总计算一次并且把时间窗口内计算结果打印出来。
2.2 代码实现
package com.crazymaker.bigdata.wordcount.stream;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;/*** Socket模拟实时发送单词使用Flink实时接收数据*/
public class WordCountStream {public static void main(String[] args) throws Exception {// 监听的ip和端口号以main参数形式传入约定第一个参数为ip第二个参数为端口
// String ip args[0];String ip 127.0.0.1;
// int port Integer.parseInt(args[1]);int port 9000;// 获取Flink流执行环境StreamExecutionEnvironment streamExecutionEnvironment StreamExecutionEnvironment.getExecutionEnvironment();// 获取socket输入数据DataStreamSourceString textStream streamExecutionEnvironment.socketTextStream(ip, port, \n);SingleOutputStreamOperatorTuple2String, Long tuple2SingleOutputStreamOperator textStream.flatMap(new FlatMapFunctionString, Tuple2String, Long() {public void flatMap(String s, CollectorTuple2String, Long collector) throws Exception {String[] splits s.split(\\s);for (String word : splits) {collector.collect(Tuple2.of(word, 1l));}}});SingleOutputStreamOperatorTuple2String, Long word tuple2SingleOutputStreamOperator.keyBy(0).sum(1);// 打印数据word.print();// 触发任务执行streamExecutionEnvironment.execute(wordcount stream process);}
}Flink程序开发的流程总结
Flink程序开发的流程总结如下
1获得一个执行环境
2加载/创建初始化数据
3指定数据操作的算子
4指定结果数据存放位置
5调用execute()触发执行程序
注意Flink程序是延迟计算的只有最后调用execute()方法的时候才会真正触发执行程序
4. Flink分布式架构与核心组件
Flink作业提交过程
standalone模式下的作业提交过程如下 在一个作业提交前Master和TaskManager等进程需要先被启动。
我们可以在Flink主目录中执行脚本来启动这些进程
bin/start-cluster.sh。
Master和TaskManager被启动后TaskManager需要将自己注册给Master中的ResourceManager。
这个初始化和资源注册过程发生在单个作业提交前我们称之为第0步。
接下来我们将逐步解析Flink作业的提交过程具体步骤如下
① 用户编写应用程序代码并使用Flink客户端Client提交该作业。通常这些程序会使用Java或Scala语言编写并调用Flink API 构建出逻辑视图。这些代码以及相关配置文件被编译并打包然后被提交至Master节点的Dispatcher形成一个应用作业Application。
② Dispatcher接收到提交的作业后会启动一个JobManager该JobManager负责协调这个作业的各项任务。
③ JobManager向ResourceManager申请所需的作业资源这些资源可能包括CPU、内存等。
④ 由于在前面的步骤中TaskManager已经向ResourceManager注册了可供使用的资源这时处于空闲状态的TaskManager将被分配给JobManager。
⑤ JobManager将用户作业中的逻辑视图转化为物理执行图如图3-3所示该图显示了作业被并行化后的执行过程。JobManager将计算任务分配并部署到多个TaskManager上。此时一个Flink作业正式开始执行。
在计算任务执行过程中TaskManager可能会与其他TaskManager交换数据使用特定的数据交换策略。同时TaskManager还会将任务的状态信息传递给JobManager这些状态信息包括任务的启动、执行和终止状态以及快照的元数据等。
Flink核心组件
在这个作业提交流程的基础上我们可以更详细地介绍涉及的各个组件的功能和角色 Client客户端用户通常使用Flink提供的客户端工具如位于Flink主目录下的bin目录中的命令行工具来提交作业。客户端会对用户提交的Flink作业进行预处理并将作业提交到Flink集群中。在提交作业时客户端需要配置一些必要的参数例如使用Standalone集群还是YARN集群等。整个作业会被打包成一个JAR文件DataStream API会被转换成一个JobGraph该图类似于逻辑视图如图3-2所示。 Dispatcher调度器Dispatcher可以接收多个作业每次接收作业时会为该作业分配一个JobManager。Dispatcher通过提供表述性状态转移REST式的接口使用超文本传输协议HTTP来对外提供服务。 JobManager作业管理器JobManager是单个Flink作业的协调者。每个作业都有一个对应的JobManager负责管理。JobManager将客户端提交的JobGraph转化为ExecutionGraph该图类似于并行物理执行图如图3-3所示。JobManager会向ResourceManager申请所需的资源。一旦获取足够的资源JobManager会将ExecutionGraph及其计算任务分发到多个TaskManager上。此外JobManager还管理多个TaskManager包括收集作业状态信息、生成检查点、必要时进行故障恢复等。 ResourceManager资源管理器Flink可以在Standalone、YARN、Kubernetes等环境中部署而不同环境对计算资源的管理模式有所不同。为了解决资源分配问题Flink引入了ResourceManager模块。在Flink中计算资源的基本单位是TaskManager上的任务槽位Slot。ResourceManager的主要职责是从资源提供方如YARN获取计算资源。当JobManager需要计算资源时ResourceManager会将空闲的Slot分配给JobManager。在计算任务结束后ResourceManager会回收这些空闲Slot。 TaskManager任务管理器TaskManager是实际执行计算任务的节点。一般来说一个Flink作业会分布在多个TaskManager上执行每个TaskManager提供一定数量的Slot。当一个TaskManager启动后相关的Slot信息会被注册到ResourceManager中。当Flink作业提交后ResourceManager会将空闲的Slot分配给JobManager。一旦JobManager获取了空闲Slot它会将具体的计算任务部署到这些Slot上并在这些Slot上执行。在执行过程中TaskManager可能需要与其他TaskManager进行数据交换因此需要进行必要的数据通信。总之TaskManager负责具体计算任务的执行它会在启动时将Slot资源向ResourceManager注册。
Flink组件栈 部署层 Local模式Flink支持本地模式包括单节点SingleNode和单虚拟机SingleJVM模式。在SingleNode模式中JobManager和TaskManager运行在同一个节点上在SingleJVM模式中所有角色都在同一个JVM中运行。Cluster模式Flink可以部署在Standalone、YARN、Mesos和Kubernetes集群上。Standalone集群需要配置JobManager和TaskManager的节点然后通过Flink提供的脚本启动。YARN、Mesos和Kubernetes集群提供了更强大的资源管理和集群扩展能力。Cloud模式Flink还可以部署在各大云平台上如AWS、谷歌云和阿里云使用户能够在云环境中灵活地部署和运行作业。 运行时层 运行时层是Flink的核心组件支持分布式执行和处理。该层负责将用户提交的作业转化为任务并分发到相应的JobManager和TaskManager上执行。运行时层还涵盖了检查点和故障恢复机制确保作业的容错性和稳定性。 API层 Flink的API层提供了DataStream API和DataSet API分别用于流式处理和批处理。这两个API允许开发者使用各种操作符和转换来处理数据包括转换、连接、聚合、窗口等计算任务。 上层工具 在API层之上Flink提供了一些工具来扩展其功能 复杂事件处理CEP面向流处理的库用于检测和处理复杂的事件模式。图计算库Gelly面向批处理的图计算库用于执行图算法。Table API和SQL针对SQL用户和关系型数据处理场景的接口允许使用SQL语法和表操作处理流和批数据。PyFlink针对Python用户的接口使其能够使用Flink进行数据处理目前主要基于Table API。
综上所述Flink在不同层次上提供了丰富的组件和工具支持流式处理和批处理以及与不同环境本地、集群、云的无缝集成使开发者能够灵活地构建和部署大规模数据处理应用程序。
作业执行阶段 在Apache Flink中数据流作业的执行过程可以划分为多个阶段从逻辑视图到物理执行图的转换。这个过程包括从StreamGraph到JobGraph再到ExecutionGraph最终映射到实际的物理执行图。下面详细说明这个过程 StreamGraph逻辑视图StreamGraph是用户编写的流处理应用程序的逻辑表示。它包含了数据流的转换操作、算子之间的关系、事件时间处理策略、容错配置等。StreamGraph是用户定义的数据流拓扑是一种高级抽象用户可以通过DataStream API构建StreamGraph。 JobGraph作业图JobGraph是从StreamGraph派生而来的表示一个具体的作业执行计划。在JobGraph中StreamGraph中的逻辑算子被映射为具体的物理算子且有明确的执行顺序和任务间的依赖关系。JobGraph还包含了资源配置、任务并行度、优化选项等信息。JobGraph是从逻辑视图转向物理执行的关键步骤。 ExecutionGraph执行图ExecutionGraph是JobGraph的执行时表示它是实际执行计划的核心。在ExecutionGraph中JobGraph中的每个任务都会被映射到一个具体的执行任务每个任务可以包含一个或多个子任务这些子任务被映射到不同的TaskManager上。ExecutionGraph还负责维护作业的执行状态以及任务之间的调度和通信。 物理执行图ExecutionGraph被映射到实际的物理执行图即在TaskManager集群上真正执行的任务拓扑。物理执行图包括了任务的并行执行、数据交换、任务状态管理等细节它是作业在分布式环境中实际运行的体现。
总结起来StreamGraph到JobGraph到ExecutionGraph的转换是Flink作业执行计划的关键步骤。从逻辑视图到物理执行图的转换过程考虑了作业的拓扑结构、资源分配、任务调度等方面的问题确保了作业可以在分布式环境中高效执行。这一系列转换过程使得用户可以通过高层次的抽象来描述作业逻辑而Flink框架会负责将其转化为可执行的任务图实现数据流的处理和计算。
5. Flink 开发
Flink 应用程序结构主要包含三部分,Source/Transformation/Sink,如下图所示 Source: 数据源Flink 在流处理和批处理上的 source 大概有 4 类
基于本地集合的 source基于文件的 source基于网络套接字的 source自定义的 source。自定义的 source 常见的有 Apache kafka、Amazon Kinesis Streams、RabbitMQ、Twitter Streaming API、Apache NiFi 等当然你也可以定义自己的 source。
Transformation数据转换的各种操作有 Map / FlatMap / Filter / KeyBy / Reduce / Fold / Aggregations / Window / WindowAll / Union / Window join / Split / Select / Project 等操作很多可以将数据转换计算成你想要的数据。
Sink接收器Flink 将转换计算后的数据发送的地点 你可能需要存储下来Flink 常见的 Sink 大概有如下几类
写入文件打印输出写入 socket自定义的 sink 。自定义的 sink 常见的有 Apache kafka、RabbitMQ、MySQL、ElasticSearch、Apache Cassandra、Hadoop FileSystem 等同理你也可以定义自己的 Sink。
开发环境搭建
pom.xml
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.lagou/groupIdartifactIdflinkdemo/artifactIdversion1.0-SNAPSHOT/versiondependencies!--flink核心包--dependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion1.7.2/version/dependency!--flink流处理包--dependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java_2.12/artifactIdversion1.7.2/version!--scopeprovided/scope--/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-redis_2.11/artifactIdversion1.1.5/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-hadoop-compatibility_2.12/artifactIdversion1.7.2/version/dependencydependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-common/artifactIdversion2.7.2/version/dependencydependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-hdfs/artifactIdversion2.7.2/version/dependencydependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-client/artifactIdversion2.7.2/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka_2.12/artifactIdversion1.7.2/version/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.73/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-runtime-web_2.12/artifactIdversion1.7.2/version/dependency/dependenciesbuildplugins!-- 打jar插件 --plugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-shade-plugin/artifactIdversion2.4.3/versionexecutionsexecutionphasepackage/phasegoalsgoalshade/goal/goalsconfigurationfiltersfilterartifact*:*/artifactexcludesexcludeMETA-INF/*.SF/excludeexcludeMETA-INF/*.DSA/excludeexcludeMETA-INF/*.RSA/exclude/excludes/filter/filters/configuration/execution/executions/plugin/plugins/build/projectFlink连接器
在实际生产环境中数据通常分布在各种不同的系统中包括文件系统、数据库、消息队列等。Flink作为一个大数据处理框架需要与这些外部系统进行数据交互以实现数据的输入、处理和输出。在Flink中Source和Sink是两个关键模块它们扮演着与外部系统进行数据连接和交互的重要角色被统称为外部连接器Connector。 Source数据源Source是Flink作业的输入模块用于从外部系统中读取数据并将其转化为Flink的数据流。Source负责实现与不同数据源的交互逻辑将外部数据源的数据逐条或批量读取到Flink的数据流中以便后续的数据处理。常见的Source包括从文件中读取数据、从消息队列如Kafka、RabbitMQ中消费数据、从数据库中读取数据等。 Sink数据接收器Sink是Flink作业的输出模块用于将Flink计算的结果输出到外部系统中。Sink负责实现将Flink数据流中的数据写入到外部数据源以便后续的持久化存储、展示或其他处理。Sink的实现需要考虑数据的可靠性、一致性以及可能的事务性要求。常见的Sink包括将数据写入文件、将数据写入数据库、将数据写入消息队列等。
外部连接器在Flink中的作用非常关键它们使得Flink作业可以与各种不同类型的数据源和数据目的地进行交互实现了数据的流入和流出。这种灵活的连接机制使得Flink在处理大数据时能够更好地集成已有的系统和数据实现复杂的数据流处理和分析任务。
Source
Flink在批处理中常见的source主要有两大类。
基于本地集合的sourceCollection-based-source基于文件的sourceFile-based-source
基于本地集合的Source
在Flink中最常见的创建本地集合的DataSet方式有三种。
使用env.fromElements()这种方式也支持Tuple自定义对象等复合形式。使用env.fromCollection(),这种方式支持多种Collection的具体类型。使用env.generateSequence(),这种方法创建基于Sequence的DataSet。
使用方式如下:
package com.demo.broad;import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.DataSet;import java.util.ArrayList;
import java.util.List;
import java.util.ArrayDeque;
import java.util.Stack;
import java.util.stream.Stream;public class BatchFromCollection {public static void main(String[] args) throws Exception {// 获取flink执行环境ExecutionEnvironment env ExecutionEnvironment.getExecutionEnvironment();// 0.用element创建DataSet(fromElements)DataSetString ds0 env.fromElements(spark, flink);ds0.print();// 1.用Tuple创建DataSet(fromElements)DataSetTuple2Integer, String ds1 env.fromElements(new Tuple2(1, spark),new Tuple2(2, flink));ds1.print();// 2.用Array创建DataSetDataSetString ds2 env.fromCollection(new ArrayListString() {{add(spark);add(flink);}});ds2.print();// 3.用ArrayDeque创建DataSetDataSetString ds3 env.fromCollection(new ArrayDequeString() {{add(spark);add(flink);}});ds3.print();// 4.用List创建DataSetDataSetString ds4 env.fromCollection(new ArrayListString() {{add(spark);add(flink);}});ds4.print();// 5.用ArrayList创建DataSetDataSetString ds5 env.fromCollection(new ArrayListString() {{add(spark);add(flink);}});ds5.print();// 6.用List创建DataSetDataSetString ds6 env.fromCollection(new ArrayListString() {{add(spark);add(flink);}});ds6.print();// 7.用List创建DataSetDataSetString ds7 env.fromCollection(new ArrayListString() {{add(spark);add(flink);}});ds7.print();// 8.用Stack创建DataSetDataSetString ds8 env.fromCollection(new StackString() {{add(spark);add(flink);}});ds8.print();// 9.用Stream创建DataSetStream相当于lazy List避免在中间过程中生成不必要的集合DataSetString ds9 env.fromCollection(Stream.of(spark, flink));ds9.print();// 10.用List创建DataSetDataSetString ds10 env.fromCollection(new ArrayListString() {{add(spark);add(flink);}});ds10.print();// 11.用HashSet创建DataSetDataSetString ds11 env.fromCollection(new HashSetString() {{add(spark);add(flink);}});ds11.print();// 12.用Iterable创建DataSetDataSetString ds12 env.fromCollection(new ArrayListString() {{add(spark);add(flink);}});ds12.print();// 13.用ArrayList创建DataSetDataSetString ds13 env.fromCollection(new ArrayListString() {{add(spark);add(flink);}});ds13.print();// 14.用Stack创建DataSetDataSetString ds14 env.fromCollection(new StackString() {{add(spark);add(flink);}});ds14.print();// 15.用HashMap创建DataSetDataSetTuple2Integer, String ds15 env.fromCollection(new HashMapInteger, String() {{put(1, spark);put(2, flink);}}.entrySet());ds15.print();// 16.用Range创建DataSetDataSetInteger ds16 env.fromCollection(IntStream.rangeClosed(1, 8).boxed().collect(Collectors.toList()));ds16.print();// 17.用generateSequence创建DataSetDataSetLong ds17 env.generateSequence(1, 9);ds17.print();}
}基于文件的Source Flink支持直接从外部文件存储系统中读取文件的方式来创建Source数据源,Flink支持的方式有以下几种:
读取本地文件数据读取HDFS文件数据读取CSV文件数据读取压缩文件遍历目录
下面分别介绍每个数据源的加载方式:
读取本地文件
package com.demo.batch;import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;public class BatchFromFile {public static void main(String[] args) throws Exception {// 使用readTextFile读取本地文件// 初始化环境ExecutionEnvironment environment ExecutionEnvironment.getExecutionEnvironment();// 加载数据DataSetString datas environment.readTextFile(data.txt);// 触发程序执行datas.print();}
}读取HDFS文件数据
package com.demo.batch;import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;public class BatchFromFile {public static void main(String[] args) throws Exception {// 使用readTextFile读取本地文件// 初始化环境ExecutionEnvironment environment ExecutionEnvironment.getExecutionEnvironment();// 加载数据DataSetString datas environment.readTextFile(hdfs://node01:8020/README.txt);// 触发程序执行datas.print();}
}读取CSV文件数据
package com.demo.batch;import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.common.functions.MapFunction;public class BatchFromCsvFile {public static void main(String[] args) throws Exception {// 初始化环境ExecutionEnvironment env ExecutionEnvironment.getExecutionEnvironment();// 用于映射CSV文件的POJO classpublic static class Student {public int id;public String name;public Student() {}public Student(int id, String name) {this.id id;this.name name;}Overridepublic String toString() {return Student( id , name );}}// 读取CSV文件DataSetStudent csvDataSet env.readCsvFile(./data/input/student.csv).ignoreFirstLine().pojoType(Student.class, id, name);csvDataSet.print();}
}读取压缩文件
对于以下压缩类型不需要指定任何额外的inputformat方法flink可以自动识别并且解压。但是压缩文件可能不会并行读取可能是顺序读取的这样可能会影响作业的可伸缩性。
压缩格式扩展名并行化DEFLATE.deflatenoGZIP.gz .gzipnoBzip2.bz2noXZ.xzno
package com.demo.batch;import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;public class BatchFromCompressFile {public static void main(String[] args) throws Exception {// 初始化环境ExecutionEnvironment env ExecutionEnvironment.getExecutionEnvironment();// 加载数据DataSetString result env.readTextFile(D:\\BaiduNetdiskDownload\\hbase-1.3.1-bin.tar.gz);// 触发程序执行result.print();}
}遍历目录
flink支持对一个文件目录内的所有文件包括所有子目录中的所有文件的遍历访问方式。
对于从文件中读取数据当读取数个文件夹的时候嵌套的文件默认是不会被读取的只会读取第一个文件其他的都会被忽略。所以我们需要使用recursive.file.enumeration进行递归读取
package com.demo.batch;import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;public class BatchFromCompressFile {public static void main(String[] args) throws Exception {// 初始化环境ExecutionEnvironment env ExecutionEnvironment.getExecutionEnvironment();// 加载数据DataSetString result env.readTextFile(D:\\BaiduNetdiskDownload\\hbase-1.3.1-bin.tar.gz);// 触发程序执行result.print();}
}读取kafka
public class StreamFromKafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();Properties properties new Properties();properties.setProperty(bootstrap.servers,teacher2:9092);FlinkKafkaConsumerString consumer new FlinkKafkaConsumerString(mytopic2, new SimpleStringSchema(), properties);DataStreamSourceString data env.addSource(consumer);SingleOutputStreamOperatorTuple2String, Integer wordAndOne data.flatMap(new FlatMapFunctionString, Tuple2String, Integer() {public void flatMap(String s, CollectorTuple2String, Integer collector) throws Exception {for (String word : s.split( )) {collector.collect(Tuple2.of(word, 1));}}});SingleOutputStreamOperatorTuple2String, Integer result wordAndOne.keyBy(0).sum(1);result.print();env.execute();}
}
自定义source
private static class SimpleSource
implements SourceFunctionTuple2String, Integer { private int offset 0; private boolean isRunning true; Override public void run(SourceContextTuple2String, Integer ctx) throws Exception { while (isRunning) { Thread.sleep(500); ctx.collect(new Tuple2( offset, offset)); offset; if (offset 1000) { isRunning false; } } } Override public void cancel() { isRunning false; }
}
自定义Source从0开始计数将数字发送到下游在主逻辑中调用这个Source。
DataStreamTuple2String, Integer countStream env.addSource(new SimpleSource()); Sink
flink在批处理中常见的sink
基于本地集合的sinkCollection-based-sink基于文件的sinkFile-based-sink
基于本地集合的sink
目标:
基于下列数据分别进行打印输出error输出collect()
(19, zhangsan, 178.8),
(17, lisi, 168.8),
(18, wangwu, 184.8),
(21, zhaoliu, 164.8)代码:
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.List;public class BatchSinkCollection {public static void main(String[] args) throws Exception {ExecutionEnvironment env ExecutionEnvironment.getExecutionEnvironment();ListTuple3Integer, String, Double stuData new ArrayList();stuData.add(new Tuple3(19, zhangsan, 178.8));stuData.add(new Tuple3(17, lisi, 168.8));stuData.add(new Tuple3(18, wangwu, 184.8));stuData.add(new Tuple3(21, zhaoliu, 164.8));DataSetTuple3Integer, String, Double stu env.fromCollection(stuData);stu.print();stu.printToErr();stu.collect().forEach(System.out::println);env.execute();}
}基于文件的sink
flink支持多种存储设备上的文件包括本地文件hdfs文件等。flink支持多种文件的存储格式包括text文件CSV文件等。writeAsText()TextOuputFormat - 将元素作为字符串写入行。字符串是通过调用每个元素的toString()方法获得的。
将数据写入本地文件
目标:
基于下列数据,写入到文件中
Map(1 - spark, 2 - flink)代码:
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;import java.util.HashMap;
import java.util.Map;public class BatchSinkFile {public static void main(String[] args) throws Exception {ExecutionEnvironment env ExecutionEnvironment.getExecutionEnvironment();MapInteger, String data1 new HashMap();data1.put(1, spark);data1.put(2, flink);DataSetMapInteger, String ds1 env.fromElements(data1);ds1.setParallelism(1).writeAsText(test/data1/aa, FileSystem.WriteMode.OVERWRITE).setParallelism(1);env.execute();}
}将数据写入HDFS
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;import java.util.HashMap;
import java.util.Map;public class BatchSinkFile {public static void main(String[] args) throws Exception {ExecutionEnvironment env ExecutionEnvironment.getExecutionEnvironment();MapInteger, String data1 new HashMap();data1.put(1, spark);data1.put(2, flink);DataSetMapInteger, String ds1 env.fromElements(data1);ds1.setParallelism(1).writeAsText(hdfs://bigdata1:9000/a, FileSystem.WriteMode.OVERWRITE).setParallelism(1);env.execute();}
}Flink API
Flink的API层提供了DataStream API和DataSet API分别用于流式处理和批处理。这两个API允许开发者使用各种操作符和转换来处理数据包括转换、连接、聚合、窗口等计算任务。
在Flink中根据不同的场景流处理或批处理需要设置不同的执行环境。在批处理场景下需要使用DataSet API并设置批处理执行环境。在流处理场景下需要使用DataStream API并设置流处理执行环境。
以下是在不同场景下设置执行环境的示例代码分别展示了批处理和流处理的情况包括Scala和Java语言。 批处理场景 - 设置DataSet API的批处理执行环境Java
import org.apache.flink.api.java.ExecutionEnvironment;public class BatchJobExample {public static void main(String[] args) throws Exception {// 创建批处理执行环境ExecutionEnvironment env ExecutionEnvironment.getExecutionEnvironment();// 在这里添加批处理作业的代码逻辑// ...// 执行作业env.execute(Batch Job Example);}
}流处理场景 - 设置DataStream API的流处理执行环境Java
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class StreamJobExample {public static void main(String[] args) throws Exception {// 创建流处理执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 在这里添加流处理作业的代码逻辑// ...// 执行作业env.execute(Stream Job Example);}
}批处理场景 - 设置DataSet API的批处理执行环境Scala
import org.apache.flink.api.scala._object BatchJobExample {def main(args: Array[String]): Unit {// 创建批处理执行环境val env ExecutionEnvironment.getExecutionEnvironment// 在这里添加批处理作业的代码逻辑// ...// 执行作业env.execute(Batch Job Example)}
}流处理场景 - 设置DataStream API的流处理执行环境Scala
import org.apache.flink.streaming.api.scala._object StreamJobExample {def main(args: Array[String]): Unit {// 创建流处理执行环境val env StreamExecutionEnvironment.getExecutionEnvironment// 在这里添加流处理作业的代码逻辑// ...// 执行作业env.execute(Stream Job Example)}
}根据以上示例代码可以在不同的场景下选择合适的执行环境和API来构建和执行Flink作业。注意在导入包时确保使用正确的包名和类名以适应批处理或流处理的环境。
以下是一些常用的API函数和操作以表格形式提供
API 类型常用函数和操作描述DataStream APImap, flatMap对数据流中的每个元素进行映射或扁平化操作。filter过滤出满足条件的元素。keyBy按指定的字段或键对数据流进行分区。window将数据流按照时间窗口或计数窗口划分。reduce, fold在窗口内对元素进行聚合操作。union合并多个数据流。connect, coMap, coFlatMap连接两个不同类型的数据流并应用相应的函数。timeWindow, countWindow定义时间窗口或计数窗口。process自定义处理函数实现更复杂的流处理逻辑。DataSet APImap, flatMap对数据集中的每个元素进行映射或扁平化操作。filter过滤出满足条件的元素。groupBy按指定的字段或键对数据集进行分组。reduce, fold对分组后的数据集进行聚合操作。join, coGroup对两个数据集进行内连接或外连接操作。cross, cartesian对两个数据集进行笛卡尔积操作。distinct去除数据集中的重复元素。groupBy, aggregate分组并对分组后的数据集进行聚合操作。first, min, max获取数据集中的第一个、最小或最大元素。sum, avg计算数据集中元素的和或平均值。collect将数据集中的元素收集到本地的集合中。
这些API函数和操作涵盖了Flink中流处理和批处理的常见操作可以帮助用户实现各种复杂的数据处理和分析任务。根据实际需求可以选择适合的API函数和操作来构建Flink作业。
下面是一些参见的API的说明
map 将DataSet中的每一个元素转换为另外一个元素
示例
使用map操作将以下数据
1,张三, 2,李四, 3,王五, 4,赵六转换为一个scala的样例类。
步骤
获取ExecutionEnvironment运行环境使用fromCollection构建数据源创建一个User样例类使用map操作执行转换打印测试
参考代码
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.common.functions.MapFunction;public class User {public String id;public String name;public User() {}public User(String id, String name) {this.id id;this.name name;}Overridepublic String toString() {return User( id , name );}public static void main(String[] args) throws Exception {ExecutionEnvironment env ExecutionEnvironment.getExecutionEnvironment();DataSetString textDataSet env.fromCollection(Arrays.asList(1,张三, 2,李四, 3,王五, 4,赵六));DataSetUser userDataSet textDataSet.map(new MapFunctionString, User() {Overridepublic User map(String text) throws Exception {String[] fieldArr text.split(,);return new User(fieldArr[0], fieldArr[1]);}});userDataSet.print();}
}flatMap 将DataSet中的每一个元素转换为0…n个元素
示例
分别将以下数据转换成国家、省份、城市三个维度的数据。
将以下数据
张三,中国,江西省,南昌市
李四,中国,河北省,石家庄市
Tom,America,NewYork,Manhattan转换为
(张三,中国)
(张三,中国,江西省)
(张三,中国,江西省,江西省)
(李四,中国)
(李四,中国,河北省)
(李四,中国,河北省,河北省)
(Tom,America)
(Tom,America,NewYork)
(Tom,America,NewYork,NewYork)思路 以上数据为一条转换为三条显然应当使用flatMap来实现 分别在flatMap函数中构建三个数据并放入到一个列表中 姓名, 国家 姓名, 国家省份 姓名, 国家省份城市
步骤
构建批处理运行环境构建本地集合数据源使用flatMap将一条数据转换为三条数据 使用逗号分隔字段分别构建国家、国家省份、国家省份城市三个元组 打印输出
参考代码
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.List;public class UserProcessing {public static class User {public String name;public String country;public String province;public String city;public User() {}public User(String name, String country, String province, String city) {this.name name;this.country country;this.province province;this.city city;}Overridepublic String toString() {return User( name , country , province , city );}}public static void main(String[] args) throws Exception {ExecutionEnvironment env ExecutionEnvironment.getExecutionEnvironment();DataSetString userDataSet env.fromCollection(new ArrayListString() {{add(张三,中国,江西省,南昌市);add(李四,中国,河北省,石家庄市);add(Tom,America,NewYork,Manhattan);}});DataSetUser resultDataSet userDataSet.flatMap(new FlatMapFunctionString, User() {Overridepublic void flatMap(String text, CollectorUser collector) throws Exception {String[] fieldArr text.split(,);String name fieldArr[0];String country fieldArr[1];String province fieldArr[2];String city fieldArr[3];collector.collect(new User(name, country, province, city));collector.collect(new User(name, country, province city, ));collector.collect(new User(name, country, province city, city));}});resultDataSet.print();}
}mapPartition 将一个分区中的元素转换为另一个元素
示例
使用mapPartition操作将以下数据
1,张三, 2,李四, 3,王五, 4,赵六转换为一个scala的样例类。
步骤
获取ExecutionEnvironment运行环境使用fromCollection构建数据源创建一个User样例类使用mapPartition操作执行转换打印测试
参考代码
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;public class MapPartitionExample {public static class User {public String id;public String name;public User() {}public User(String id, String name) {this.id id;this.name name;}Overridepublic String toString() {return User( id , name );}}public static void main(String[] args) throws Exception {ExecutionEnvironment env ExecutionEnvironment.getExecutionEnvironment();DataSetString userDataSet env.fromCollection(new ArrayListString() {{add(1,张三);add(2,李四);add(3,王五);add(4,赵六);}});DataSetUser resultDataSet userDataSet.mapPartition(new MapPartitionFunctionString, User() {Overridepublic void mapPartition(IterableString iterable, CollectorUser collector) throws Exception {// TODO: 打开连接IteratorString iterator iterable.iterator();while (iterator.hasNext()) {String ele iterator.next();String[] fieldArr ele.split(,);collector.collect(new User(fieldArr[0], fieldArr[1]));}// TODO: 关闭连接}});resultDataSet.print();}
}map和mapPartition的效果是一样的但如果在map的函数中需要访问一些外部存储。例如 访问mysql数据库需要打开连接, 此时效率较低。而使用mapPartition可以有效减少连接数提高效率 filter 过滤出来一些符合条件的元素
示例
过滤出来以下以h开头的单词。
hadoop, hive, spark, flink步骤
获取ExecutionEnvironment运行环境使用fromCollection构建数据源使用filter操作执行过滤打印测试
参考代码
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;import java.util.ArrayList;
import java.util.List;public class FilterExample {public static void main(String[] args) throws Exception {ExecutionEnvironment env ExecutionEnvironment.getExecutionEnvironment();DataSetString wordDataSet env.fromCollection(new ArrayListString() {{add(hadoop);add(hive);add(spark);add(flink);}});DataSetString resultDataSet wordDataSet.filter(word - word.startsWith(h));resultDataSet.print();}
}reduce 可以对一个dataset或者一个group来进行聚合计算最终聚合成一个元素
示例1
请将以下元组数据使用reduce操作聚合成一个最终结果
(java , 1) , (java, 1) ,(java , 1) 将上传元素数据转换为(java,3)
步骤
获取ExecutionEnvironment运行环境使用fromCollection构建数据源使用redice执行聚合操作打印测试
参考代码
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;import java.util.ArrayList;
import java.util.List;public class ReduceExample {public static void main(String[] args) throws Exception {ExecutionEnvironment env ExecutionEnvironment.getExecutionEnvironment();DataSetTuple2String, Integer wordCountDataSet env.fromCollection(new ArrayListTuple2String, Integer() {{add(new Tuple2(java, 1));add(new Tuple2(java, 1));add(new Tuple2(java, 1));}});DataSetTuple2String, Integer resultDataSet wordCountDataSet.reduce((wc1, wc2) -new Tuple2(wc2.f0, wc1.f1 wc2.f1));resultDataSet.print();}
}示例2
请将以下元组数据下按照单词使用groupBy进行分组再使用reduce操作聚合成一个最终结果
(java , 1) , (java, 1) ,(scala , 1) 转换为
(java, 2), (scala, 1)步骤
获取ExecutionEnvironment运行环境使用fromCollection构建数据源使用groupBy按照单词进行分组使用reduce对每个分组进行统计打印测试
参考代码
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple;import java.util.ArrayList;
import java.util.List;public class GroupByReduceExample {public static void main(String[] args) throws Exception {ExecutionEnvironment env ExecutionEnvironment.getExecutionEnvironment();DataSetTuple2String, Integer wordCountDataSet env.fromCollection(new ArrayListTuple2String, Integer() {{add(new Tuple2(java, 1));add(new Tuple2(java, 1));add(new Tuple2(scala, 1));}});DataSetTuple2String, Integer groupedDataSet wordCountDataSet.groupBy(0).reduce((wc1, wc2) -new Tuple2(wc1.f0, wc1.f1 wc2.f1));groupedDataSet.print();}
}reduceGroup 可以对一个dataset或者一个group来进行聚合计算最终聚合成一个元素
reduce和reduceGroup的区别 reduce是将数据一个个拉取到另外一个节点然后再执行计算reduceGroup是先在每个group所在的节点上执行计算然后再拉取
示例
请将以下元组数据下按照单词使用groupBy进行分组再使用reduceGroup操作进行单词计数
(java , 1) , (java, 1) ,(scala , 1) 步骤
获取ExecutionEnvironment运行环境使用fromCollection构建数据源使用groupBy按照单词进行分组使用reduceGroup对每个分组进行统计打印测试
参考代码
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.List;public class GroupByReduceGroupExample {public static void main(String[] args) throws Exception {ExecutionEnvironment env ExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataSetTuple2String, Integer wordCountDataSet env.fromCollection(new ArrayListTuple2String, Integer() {{add(new Tuple2(java, 1));add(new Tuple2(java, 1));add(new Tuple2(scala, 1));}});DataSetTuple2String, Integer groupedDataSet wordCountDataSet.groupBy(0).reduceGroup((IterableTuple2String, Integer iter, CollectorTuple2String, Integer collector) - {Tuple2String, Integer result new Tuple2();for (Tuple2String, Integer wc : iter) {result.f0 wc.f0;result.f1 wc.f1;}collector.collect(result);});groupedDataSet.print();}
}aggregate 按照内置的方式来进行聚合, Aggregate只能作用于元组上。例如SUM/MIN/MAX…
示例
请将以下元组数据使用aggregate操作进行单词统计
(java , 1) , (java, 1) ,(scala , 1)步骤
获取ExecutionEnvironment运行环境使用fromCollection构建数据源使用groupBy按照单词进行分组使用aggregate对每个分组进行SUM统计打印测试
参考代码
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.tuple.Tuple2;import java.util.ArrayList;
import java.util.List;public class GroupByAggregateExample {public static void main(String[] args) throws Exception {ExecutionEnvironment env ExecutionEnvironment.getExecutionEnvironment();DataSetTuple2String, Integer wordCountDataSet env.fromCollection(new ArrayListTuple2String, Integer() {{add(new Tuple2(java, 1));add(new Tuple2(java, 1));add(new Tuple2(scala, 1));}});DataSetTuple2String, Integer groupedDataSet wordCountDataSet.groupBy(0);DataSetTuple2String, Integer resultDataSet groupedDataSet.aggregate(Aggregations.SUM, 1);resultDataSet.print();}
}注意 要使用aggregate只能使用字段索引名或索引名称来进行分组groupBy(0)否则会报一下错误: Exception in thread main java.lang.UnsupportedOperationException: Aggregate does not support grouping with KeySelector functions, yet.distinct 去除重复的数据
示例
请将以下元组数据使用distinct操作去除重复的单词
(java , 1) , (java, 2) ,(scala , 1)去重得到
(java, 1), (scala, 1)步骤
获取ExecutionEnvironment运行环境使用fromCollection构建数据源使用distinct指定按照哪个字段来进行去重打印测试
参考代码
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;import java.util.ArrayList;
import java.util.List;public class DistinctExample {public static void main(String[] args) throws Exception {ExecutionEnvironment env ExecutionEnvironment.getExecutionEnvironment();DataSetTuple2String, Integer wordCountDataSet env.fromCollection(new ArrayListTuple2String, Integer() {{add(new Tuple2(java, 1));add(new Tuple2(java, 1));add(new Tuple2(scala, 1));}});DataSetTuple2String, Integer resultDataSet wordCountDataSet.distinct(0);resultDataSet.print();}
}join 使用join可以将两个DataSet连接起来
示例
有两个csv文件有一个为score.csv一个为subject.csv分别保存了成绩数据以及学科数据。 需要将这两个数据连接到一起然后打印出来。 步骤 分别将两个文件复制到项目中的data/join/input中 构建批处理环境 创建两个样例类 * 学科Subject学科ID、学科名字
* 成绩Score唯一ID、学生姓名、学科ID、分数——Double类型分别使用readCsvFile加载csv数据源并制定泛型 使用join连接两个DataSet并使用where、equalTo方法设置关联条件 打印关联后的数据源
参考代码
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple4;public class JoinExample {public static class Score {public int id;public String name;public int subjectId;public double score;public Score() {}public Score(int id, String name, int subjectId, double score) {this.id id;this.name name;this.subjectId subjectId;this.score score;}Overridepublic String toString() {return Score( id , name , subjectId , score );}}public static class Subject {public int id;public String name;public Subject() {}public Subject(int id, String name) {this.id id;this.name name;}Overridepublic String toString() {return Subject( id , name );}}public static void main(String[] args) throws Exception {ExecutionEnvironment env ExecutionEnvironment.getExecutionEnvironment();DataSetScore scoreDataSet env.readCsvFile(./data/join/input/score.csv).ignoreFirstLine().pojoType(Score.class);DataSetSubject subjectDataSet env.readCsvFile(./data/join/input/subject.csv).ignoreFirstLine().pojoType(Subject.class);DataSetTuple4Integer, String, Integer, Double joinedDataSet scoreDataSet.join(subjectDataSet).where(subjectId).equalTo(id).projectFirst(0, 1, 2, 3).projectSecond(1);joinedDataSet.print();}
}union 将两个DataSet取并集不会去重。
示例
将以下数据进行取并集操作
数据集1
hadoop, hive, flume数据集2
hadoop, hive, spark步骤
构建批处理运行环境使用fromCollection创建两个数据源使用union将两个数据源关联在一起打印测试
参考代码
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;import java.util.ArrayList;
import java.util.List;public class UnionExample {public static void main(String[] args) throws Exception {ExecutionEnvironment env ExecutionEnvironment.getExecutionEnvironment();DataSetString wordDataSet1 env.fromCollection(new ArrayListString() {{add(hadoop);add(hive);add(flume);}});DataSetString wordDataSet2 env.fromCollection(new ArrayListString() {{add(hadoop);add(hive);add(spark);}});DataSetString resultDataSet wordDataSet1.union(wordDataSet2);resultDataSet.print();}
}connect
connect()提供了和union()类似的功能即连接两个数据流它与union()的区别如下。 connect()只能连接两个数据流union()可以连接多个数据流。 connect()所连接的两个数据流的数据类型可以不一致union()所连接的两个或多个数据流的数据类型必须一致。 两个DataStream经过connect()之后被转化为ConnectedStreams, ConnectedStreams会对两个流的数据应用不同的处理方法且两个流之间可以共享状态。 DataStreamInteger intStream senv.fromElements(2, 1, 5, 3, 4, 7);
DataStreamString stringStream senv.fromElements(A, B, C, D); ConnectedStreamsInteger, String connectedStream
intStream.connect(stringStream);
DataStreamString mapResult connectedStream.map(new MyCoMapFunction()); // CoMapFunction的3个泛型分别对应第一个流的输入类型、第二个流的输入类型输出类型
public static class MyCoMapFunction implements CoMapFunctionInteger, String, String
{ Override public String map1(Integer input1) { return input1.toString(); } Override public String map2(String input2) { return input2; }
} rebalance Flink也会产生数据倾斜的时候例如当前的数据量有10亿条在处理过程就有可能发生如下状况 rebalance会使用轮询的方式将数据均匀打散这是处理数据倾斜最好的选择。 步骤 构建批处理运行环境 使用env.generateSequence创建0-100的并行数据 使用fiter过滤出来大于8的数字 使用map操作传入RichMapFunction将当前子任务的ID和数字构建成一个元组 在RichMapFunction中可以使用getRuntimeContext.getIndexOfThisSubtask获取子任务序号打印测试
参考代码
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;public class MapWithSubtaskIndexExample {public static void main(String[] args) throws Exception {ExecutionEnvironment env ExecutionEnvironment.getExecutionEnvironment();DataSetLong numDataSet env.generateSequence(0, 100);DataSetLong filterDataSet numDataSet.filter(num - num 8);DataSetTuple2Long, Long resultDataSet filterDataSet.map(new RichMapFunctionLong, Tuple2Long, Long() {Overridepublic Tuple2Long, Long map(Long in) throws Exception {return new Tuple2(getRuntimeContext().getIndexOfThisSubtask(), in);}});resultDataSet.print();}
}上述代码没有加rebalance通过观察有可能会出现数据倾斜。
在filter计算完后调用rebalance这样就会均匀地将数据分布到每一个分区中。
hashPartition 按照指定的key进行hash分区
示例
基于以下列表数据来创建数据源并按照hashPartition进行分区然后输出到文件。
List(1,1,1,1,1,1,1,2,2,2,2,2)步骤
构建批处理运行环境设置并行度为2使用fromCollection构建测试数据集使用partitionByHash按照字符串的hash进行分区调用writeAsText写入文件到data/partition_output目录中打印测试
参考代码 import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.core.fs.FileSystem;import java.util.ArrayList;
import java.util.List;public class PartitionByHashExample {public static void main(String[] args) throws Exception {ExecutionEnvironment env ExecutionEnvironment.getExecutionEnvironment();// Set parallelism to 2env.setParallelism(2);DataSetInteger numDataSet env.fromCollection(new ArrayListInteger() {{add(1);add(1);add(1);add(1);add(1);add(1);add(1);add(2);add(2);add(2);add(2);add(2);}});DataSetInteger partitionDataSet numDataSet.partitionByHash(num - num.toString());partitionDataSet.writeAsText(./data/partition_output, FileSystem.WriteMode.OVERWRITE);partitionDataSet.print();env.execute();}
}sortPartition 指定字段对分区中的数据进行排序
示例
按照以下列表来创建数据集
List(hadoop, hadoop, hadoop, hive, hive, spark, spark, flink)对分区进行排序后输出到文件。
步骤
构建批处理运行环境使用fromCollection构建测试数据集设置数据集的并行度为2使用sortPartition按照字符串进行降序排序调用writeAsText写入文件到data/sort_output目录中启动执行
参考代码
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.core.fs.FileSystem;import java.util.ArrayList;
import java.util.List;public class SortPartitionExample {public static void main(String[] args) throws Exception {ExecutionEnvironment env ExecutionEnvironment.getExecutionEnvironment();DataSetString wordDataSet env.fromCollection(new ArrayListString() {{add(hadoop);add(hadoop);add(hadoop);add(hive);add(hive);add(spark);add(spark);add(flink);}});wordDataSet.setParallelism(2);DataSetString sortedDataSet wordDataSet.sortPartition(str - str, Order.DESCENDING);sortedDataSet.writeAsText(./data/sort_output/, FileSystem.WriteMode.OVERWRITE);env.execute(App);}
}窗口
在许多情况下我们需要解决这样的问题针对一个特定的时间段例如一个小时我们需要对数据进行统计和分析。但是要实现这种数据窗口操作首先需要确定哪些数据应该进入这个窗口。在深入了解窗口操作的定义之前我们必须先确定作业将使用哪种时间语义。
换句话说时间窗口是数据处理中的一个关键概念用于将数据划分为特定的时间段进行计算。然而在确定如何定义这些窗口之前我们必须选择适合的时间语义即事件时间、处理时间或摄入时间。不同的时间语义在数据处理中具有不同的含义和用途因此在选择时间窗口之前我们需要明确作业所需的时间语义以便正确地界定和处理数据窗口。
时间概念 … 由于字数限制此处省略 完整内容请参见《大数据Flink学习圣经》pdf 免费找尼恩获取 窗口程序 … 由于字数限制此处省略 完整内容请参见《大数据Flink学习圣经》pdf 免费找尼恩获取 滚动窗口 … 由于字数限制此处省略 完整内容请参见《大数据Flink学习圣经》pdf 免费找尼恩获取 滑动窗口 … 由于字数限制此处省略 完整内容请参见《大数据Flink学习圣经》pdf 免费找尼恩获取 会话窗口 … 由于字数限制此处省略 完整内容请参见《大数据Flink学习圣经》pdf 免费找尼恩获取 基于数量窗口 … 由于字数限制此处省略 完整内容请参见《大数据Flink学习圣经》pdf 免费找尼恩获取 触发器 … 由于字数限制此处省略 完整内容请参见《大数据Flink学习圣经》pdf 免费找尼恩获取 清除器 … 由于字数限制此处省略 完整内容请参见《大数据Flink学习圣经》pdf 免费找尼恩获取 6. Flink程序本地执行和集群执行
6.1. 本地执行 … 由于字数限制此处省略 完整内容请参见《大数据Flink学习圣经》pdf 免费找尼恩获取 6.2. 集群执行 … 由于字数限制此处省略 完整内容请参见《大数据Flink学习圣经》pdf 免费找尼恩获取 7. Flink的广播变量 … 由于字数限制此处省略 完整内容请参见《大数据Flink学习圣经》pdf 免费找尼恩获取 8. Flink的累加器 … 由于字数限制此处省略 完整内容请参见《大数据Flink学习圣经》pdf 免费找尼恩获取 9. Flink的分布式缓存 … 由于字数限制此处省略 完整内容请参见《大数据Flink学习圣经》pdf 免费找尼恩获取 关于TABLE API SQL , 以及状态和检查点等知识 会陆续补充敬请期待
说在后面
本文是《大数据Flink学习圣经》 V1版本 是 《尼恩 大数据 面试宝典》 姊妹篇。
这里特别说明一下《尼恩 大数据 面试宝典》5个专题 PDF 自首次发布以来 已经收集了 好几百题大量的大厂面试干货、正货 。 《尼恩 大数据 面试宝典》面试题集合 已经变成大数据学习和面试的必读书籍。
于是尼恩架构团队趁热打铁推出 《大数据Flink学习圣经》。
完整的pdf可以关注尼恩的 公众号【技术自由圈】领取。
并且《大数据Flink学习圣经》、 《尼恩 大数据 面试宝典》 都会持续迭代、不断更新以 吸纳最新的面试题最新版本具体请参见 公众号【技术自由圈】
作者介绍
一作Andy资深架构师 《Java 高并发核心编程 加强版》作者之1 。
二作尼恩41岁资深老架构师 IT领域资深作家、著名博主。《Java 高并发核心编程 加强版 卷1、卷2、卷3》创世作者。 《K8S学习圣经》《Docker学习圣经》《Go学习圣经》等11个PDF 圣经的作者。 也是一个 资深架构导师、架构转化 导师 成功指导了多个中级Java、高级Java转型架构师岗位 最高的学员年薪拿到近100W。
参考
Flink原理与实践Apache Flink Documentation | Apache FlinkApache Flink 学习网 (flink-learning.org.cn)
推荐阅读
《尼恩大数据面试宝典专题1史上最全Hadoop面试题》
《尼恩大数据面试宝典专题2绝密100个Spark面试题熟背100遍猛拿高薪》
《尼恩大数据面试宝典专题3史上最全Hive面试题不断迭代持续升级》
《尼恩大数据面试宝典专题4史上最全Flink面试题不断迭代持续升级》
《尼恩大数据面试宝典专题5史上最全HBase面试题不断迭代持续升级》 《尼恩 架构笔记》《尼恩高并发三部曲》《尼恩Java面试宝典》PDF请到下面公号【技术自由圈】取↓↓↓
文章转载自: http://www.morning.qwmsq.cn.gov.cn.qwmsq.cn http://www.morning.yhdqq.cn.gov.cn.yhdqq.cn http://www.morning.gfqj.cn.gov.cn.gfqj.cn http://www.morning.trtxt.cn.gov.cn.trtxt.cn http://www.morning.ctlzf.cn.gov.cn.ctlzf.cn http://www.morning.tfwg.cn.gov.cn.tfwg.cn http://www.morning.qcsbs.cn.gov.cn.qcsbs.cn http://www.morning.zlhcw.cn.gov.cn.zlhcw.cn http://www.morning.bbyqz.cn.gov.cn.bbyqz.cn http://www.morning.nqnqz.cn.gov.cn.nqnqz.cn http://www.morning.ktrh.cn.gov.cn.ktrh.cn http://www.morning.lbgfz.cn.gov.cn.lbgfz.cn http://www.morning.srkqs.cn.gov.cn.srkqs.cn http://www.morning.jgcyn.cn.gov.cn.jgcyn.cn http://www.morning.kwrzg.cn.gov.cn.kwrzg.cn http://www.morning.mcjp.cn.gov.cn.mcjp.cn http://www.morning.rmdwp.cn.gov.cn.rmdwp.cn http://www.morning.rhkgz.cn.gov.cn.rhkgz.cn http://www.morning.lbbrw.cn.gov.cn.lbbrw.cn http://www.morning.qcdtzk.cn.gov.cn.qcdtzk.cn http://www.morning.drbd.cn.gov.cn.drbd.cn http://www.morning.snxbf.cn.gov.cn.snxbf.cn http://www.morning.pmnn.cn.gov.cn.pmnn.cn http://www.morning.mkrjf.cn.gov.cn.mkrjf.cn http://www.morning.lpcct.cn.gov.cn.lpcct.cn http://www.morning.fglxh.cn.gov.cn.fglxh.cn http://www.morning.qggcc.cn.gov.cn.qggcc.cn http://www.morning.fwdln.cn.gov.cn.fwdln.cn http://www.morning.tnjkg.cn.gov.cn.tnjkg.cn http://www.morning.rlfr.cn.gov.cn.rlfr.cn http://www.morning.qyqmj.cn.gov.cn.qyqmj.cn http://www.morning.lwjlj.cn.gov.cn.lwjlj.cn http://www.morning.plkrl.cn.gov.cn.plkrl.cn http://www.morning.ftcrt.cn.gov.cn.ftcrt.cn http://www.morning.bfmq.cn.gov.cn.bfmq.cn http://www.morning.hffjj.cn.gov.cn.hffjj.cn http://www.morning.bangaw.cn.gov.cn.bangaw.cn http://www.morning.rkypb.cn.gov.cn.rkypb.cn http://www.morning.kcdts.cn.gov.cn.kcdts.cn http://www.morning.bsqth.cn.gov.cn.bsqth.cn http://www.morning.nlbhj.cn.gov.cn.nlbhj.cn http://www.morning.rnrwq.cn.gov.cn.rnrwq.cn http://www.morning.zxqxx.cn.gov.cn.zxqxx.cn http://www.morning.rgqnt.cn.gov.cn.rgqnt.cn http://www.morning.bmmyx.cn.gov.cn.bmmyx.cn http://www.morning.flxgx.cn.gov.cn.flxgx.cn http://www.morning.npgwb.cn.gov.cn.npgwb.cn http://www.morning.pghgq.cn.gov.cn.pghgq.cn http://www.morning.lmdfj.cn.gov.cn.lmdfj.cn http://www.morning.lmzpk.cn.gov.cn.lmzpk.cn http://www.morning.fslxc.cn.gov.cn.fslxc.cn http://www.morning.rrcrs.cn.gov.cn.rrcrs.cn http://www.morning.bzlgb.cn.gov.cn.bzlgb.cn http://www.morning.fnssm.cn.gov.cn.fnssm.cn http://www.morning.rmkyb.cn.gov.cn.rmkyb.cn http://www.morning.mwlxk.cn.gov.cn.mwlxk.cn http://www.morning.mbfj.cn.gov.cn.mbfj.cn http://www.morning.jfbpf.cn.gov.cn.jfbpf.cn http://www.morning.kgltb.cn.gov.cn.kgltb.cn http://www.morning.mbmtn.cn.gov.cn.mbmtn.cn http://www.morning.bwjws.cn.gov.cn.bwjws.cn http://www.morning.xzqzd.cn.gov.cn.xzqzd.cn http://www.morning.qxmys.cn.gov.cn.qxmys.cn http://www.morning.mkkcr.cn.gov.cn.mkkcr.cn http://www.morning.zckhn.cn.gov.cn.zckhn.cn http://www.morning.bpmtr.cn.gov.cn.bpmtr.cn http://www.morning.ndcjq.cn.gov.cn.ndcjq.cn http://www.morning.fqmcc.cn.gov.cn.fqmcc.cn http://www.morning.dmthy.cn.gov.cn.dmthy.cn http://www.morning.lxqkt.cn.gov.cn.lxqkt.cn http://www.morning.wqbfd.cn.gov.cn.wqbfd.cn http://www.morning.qggxt.cn.gov.cn.qggxt.cn http://www.morning.slqzb.cn.gov.cn.slqzb.cn http://www.morning.mlpmf.cn.gov.cn.mlpmf.cn http://www.morning.pmtky.cn.gov.cn.pmtky.cn http://www.morning.zxznh.cn.gov.cn.zxznh.cn http://www.morning.nxcgp.cn.gov.cn.nxcgp.cn http://www.morning.kqbjy.cn.gov.cn.kqbjy.cn http://www.morning.krhkn.cn.gov.cn.krhkn.cn http://www.morning.lphtm.cn.gov.cn.lphtm.cn