虚拟主机上的网站上传方式,网站设计方案应该怎么做,企业管理咨询,微商城模板包含哪些DataStream 学习
1.DataStream编程模型总结 文章目录 DataStream 学习介绍一、DataSet编程模型二、数据源1.文件类数据源2.集合类数据源3.通用类数据源4第三方文件系统 介绍
Flink把批处理看成是一个流处理的特例#xff0c;因此可以在底层统一的流处理引擎上#xff0c;同…DataStream 学习
1.DataStream编程模型总结 文章目录 DataStream 学习介绍一、DataSet编程模型二、数据源1.文件类数据源2.集合类数据源3.通用类数据源4第三方文件系统 介绍
Flink把批处理看成是一个流处理的特例因此可以在底层统一的流处理引擎上同时提供了STREAM API和SET API,经典的有限数据流处理方式有 由于批处理的对象是有界数据集因此批处理不需要时间和窗口机制
一、DataSet编程模型
link批处理程序的基本运行流程包括以下4个步骤
创建执行环境创建数据源指定对数据进行的转换操作指定数据计算的输出结果方式。 上面第1步中创建批处理执行环境的方式如下
val env ExecutionEnvironment.getExecutionEnvironment此外还需要在pom.xml文件中引入flink-scala_2.12依赖库具体如下
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-scala_2.12/artifactIdversion1.11.2/version
/dependency编程模型如图 数据的处理过程 读取数据源-进行转换操作-获取结果数据。
批处理数据的基本流程
二、数据源
1.文件类数据源
Flink提供了从文件中读取数据生成DataSet的多种方法具体如下
readTextFile(path):逐行读取文件并将文件内容转换成DataSet类型数据集
readTextFileWithValue(path)读取文本文件内容并将文件内容转换成DataSet[StringValue]类型数据集。 该方法与readTextFile(String)不同的是其泛型是StringValue是一种可变的String类型通过StringValue存储文本数据可以有效降低String对象创建数量减小垃圾回收的压力
readCsvFile(path)解析以逗号(或其他字符)分隔字段的文件返回元组或POJO对象
readSequenceFile(Key, Value, path)读取SequenceFile以Tuple2Key, Value类型返回。
以readTextFile(path)为例可以使用如下语句读取文本文件内容
val dataSet : DataSet[String] env.readTextFile(file:///home/hadoop/word.txt)假设有一个CSV格式文件sales.csv内容如下
transactionId,customerId,itemId,amountPaid
111,1,1,100.0
112,2,2,505.0
113,1,3,510.0
114,2,4,600.0
115,3,2,500.0则可以使用如下程序读取该CSV文件
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._object ReadCSVFile{def main(args: Array[String]): Unit {val bEnv ExecutionEnvironment.getExecutionEnvironmentval filePathfile:///home/hadoop/sales.csvval csv bEnv.readCsvFile[SalesLog](filePath,ignoreFirstLine true)//这里csv.print()}case class SalesLog(transactionId:String,customerId:String,itemId:String,amountPaid:Double)//这里定义的类型
}结果如下
SalesLog(111,1,1,100.0)
SalesLog(112,2,2,505.0)
SalesLog(113,1,3,510.0)
SalesLog(114,2,4,600.0)
SalesLog(115,3,2,500.0)2.集合类数据源
Flink提供了fromCollection()、fromElements()和generateSequence()等方法来构建集合类数据源具体如下
fromCollection()从集合中创建DataSet数据集集合中的元素数据类型相同
fromElements()从给定数据元素序列中创建DataSet数据集且所有的数据对象类型必须一致
generateSequence()指定一个范围区间然后在区间内部生成数字序列数据集,由于是并行处理的所以最终的顺序不能保证一致。
val myArray Array(hello world,hadoop spark flink)
val collectionSet env.fromCollection(myArray)//从集合中获取val dataSet env.fromElements(hadoop,spark,flink)//一个个元素获取val numSet env.generateSequence(1,10)//生成的数据 1 2 3 4 ... 10 包含103.通用类数据源
以Flink内置的JDBCInputFormat类为实例介绍通用类数据源的用法。 假设已经在Linux系统中安装了MySQL数据库在Linux终端中执行如下命令启动MySQL 输入数据库登录密码以后就可以启动MySQL了然后执行如下命令创建数据库并添加数据
$ create database flink
$ use flink
$ create table student(sno char(8),cno char(2),grade int);
$ insert into student values(95001,1,96);
$ insert into student values(95002,1,94);新建代码文件InputFromMySQL.scala内容如下 i
mport org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.api.scala._object InputFromMySQL{def main(args: Array[String]): Unit {//创建执行环境val env ExecutionEnvironment.getExecutionEnvironment//使用JDBC输入格式从关系数据库读取数据val inputMySQL env.createInput(JDBCInputFormat.buildJDBCInputFormat()//数据库连接驱动名称.setDrivername(com.mysql.jdbc.Driver)//数据库连接驱动名称.setDBUrl(jdbc:mysql://localhost:3306/flink)//数据库连接用户名.setUsername(root)//数据库连接密码.setPassword(123456)//数据库连接查询SQL.setQuery(select sno,cno,grade from student)//字段类型、顺序和个数必须与SQL保持一致.setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.INT_TYPE_INFO)).finish())inputMySQL.print()}
}新建pom.xml文件在里面添加与访问MySQL相关的依赖包内容如下
projectgroupIdcn.edu.xmu.dblab/groupIdartifactIdsimple-project/artifactIdmodelVersion4.0.0/modelVersionnameSimple Project/namepackagingjar/packagingversion1.0/versionrepositoriesrepositoryidalimaven/idnamealiyun maven/nameurlhttp://maven.aliyun.com/nexus/content/groups/public//url/repository/repositories
dependenciesdependencygroupIdorg.apache.flink/groupIdartifactIdflink-scala_2.12/artifactIdversion1.11.2/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-scala_2.12/artifactIdversion1.11.2/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients_2.12/artifactIdversion1.11.2/version/dependency
dependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion5.1.40/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-jdbc_2.12/artifactIdversion1.11.2/version/dependency/dependenciesbuildpluginsplugingroupIdnet.alchim31.maven/groupIdartifactIdscala-maven-plugin/artifactIdversion3.4.6/versionexecutionsexecutiongoalsgoalcompile/goal/goals/execution/executions/pluginplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-assembly-plugin/artifactIdversion3.0.0/versionconfigurationdescriptorRefsdescriptorRefjar-with-dependencies/descriptorRef/descriptorRefs/configurationexecutionsexecutionidmake-assembly/idphasepackage/phasegoalsgoalsingle/goal/goals/execution/executions/plugin/plugins/build
/project使用Maven工具对程序进行编译打包然后提交到Flink中运行请确认Flink已经启动。运行结束以后可以在屏幕上看到如下的输出结果
95001,1,96
95002,1,944第三方文件系统
Flink通过FileSystem类来抽象自己的文件系统这个抽象提供了各类文件系统实现的通用操作和最低保证。
每种数据源比如HDFS、S3、Alluxio、XtreemFS、FTP等可以继承和实现FileSystem类将数据从各个系统读取到Flink中。
DataSet API中内置了HDFS数据源这里给出一个读取HDFS文件系统的一个实例代码如下
import org.apache.flink.api.scala.ExecutionEnvironmentobject ReadHDFS{def main(args: Array[String]): Unit {//获取执行环境val env ExecutionEnvironment.getExecutionEnvironment//创建数据源
val inputHDFS env.readTextFile(hdfs://localhost:9000/word.txt)//打印输出
inputHDFS.print()}
}获取数据源就1行代码 但是在pom中需要添加依赖。 在pom.xml文件中需要添加与访问HDFS相关的依赖包内容如下
projectgroupIdcn.edu.xmu.dblab/groupIdartifactIdsimple-project/artifactIdmodelVersion4.0.0/modelVersionnameSimple Project/namepackagingjar/packagingversion1.0/versionrepositoriesrepositoryidalimaven/idnamealiyun maven/nameurlhttp://maven.aliyun.com/nexus/content/groups/public//url/repository/repositories
dependenciesdependencygroupIdorg.apache.flink/groupIdartifactIdflink-scala_2.12/artifactIdversion1.11.2/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-scala_2.12/artifactIdversion1.11.2/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients_2.12/artifactIdversion1.11.2/version/dependencydependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-common/artifactIdversion3.1.3/version/dependencydependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-client/artifactIdversion3.1.3/version/dependency/dependenciesbuildpluginsplugingroupIdnet.alchim31.maven/groupIdartifactIdscala-maven-plugin/artifactIdversion3.4.6/versionexecutionsexecutiongoalsgoalcompile/goal/goals/execution/executions/plugin
plugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-assembly-plugin/artifactIdversion3.0.0/versionconfigurationdescriptorRefsdescriptorRefjar-with-dependencies/descriptorRef/descriptorRefs/configurationexecutionsexecutionidmake-assembly/idphasepackage/phasegoalsgoalsingle/goal/goals/execution/executions/plugin/plugins/build
/project使用Maven工具对程序进行编译打包。 为了让Flink能够顺利访问HDFS需要修改环境变量 如果环境变量已经完成了修改这里就不需要重复操作如果还没有则修改添加hadoop环境变量 修改如下。
$ vim ~/.bashrc
export HADOOP_HOME/usr/local/hadoop
export HADOOP_CONF_DIR${HADOOP_HOME}/etc/hadoop
export HADOOP_CLASSPATH$(/usr/local/hadoop/bin/hadoop classpath)
$ source ~/.bashrc使用flink run命令把ReadHDFS程序提交到Flink中运行请确认Flink和Hadoop已经启动如果运行成功就可以在屏幕上看到hdfs://localhost:9000/word.txt文件里面的内容了。