计算机专业做网站的总结,做房产中介网站,做编程的+网站,深圳建设厅网站官网目录
Flink程序的基本构成
获得执行环境#xff08;environment#xff09;
加载/初始化数据#xff08;source#xff09;
基于文件
基于socket
基于集合
自定义
转换操作#xff08;transformation#xff09;
基本转换
物理分区
任务链和资源组
名称和描述…目录
Flink程序的基本构成
获得执行环境environment
加载/初始化数据source
基于文件
基于socket
基于集合
自定义
转换操作transformation
基本转换
物理分区
任务链和资源组
名称和描述
指定计算结果放置在何处sink
触发程序执行execution Flink程序的基本构成
一个Flink程序的基本构成如下
1.获得一个执行环境environment 2.加载/创建初始数据source 3.在此数据上指定转换transformation 4.指定将计算结果放置在何处sink 5.触发程序执行execution 获得执行环境environment
获得流处理执行环境的三种方式
1.根据上下文实际情况的执行环境 StreamExecutionEnvironment.getExecutionEnvironment(); 2.本地执行环境 StreamExecutionEnvironment.createLocalEnvironment(); 3.远程执行环境 createRemoteEnvironment(String host, int port, String... jarFiles); 例如
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();LocalStreamEnvironment localEnvironment StreamExecutionEnvironment.createLocalEnvironment();StreamExecutionEnvironment remoteEnvironment StreamExecutionEnvironment.createRemoteEnvironment(node1, 8081,path/to/jarfile); 通常情况下直接使用getExecutionEnvironment()来获取执行环境因为程序运行时根据上下文条件自动选择相应的环境。如果在IDE中执行程序将返回本地的执行环境。如果将程序打成jar文件并通过命令提交jar到flink集群此时将返回flink集群环境。 加载/初始化数据source 基于文件
readTextFile(path)readFile(fileInputFormat, path)readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) 例如
DataSourceString lineDs env.readTextFile(data/words.txt); 基于socket
socketTextStream(hostname, port)
例如
DataStreamSourceString lineStream env.socketTextStream(node2, 7777); 基于集合
fromCollection(Collection)fromCollection(Iterator, Class)fromElements(T ...)fromParallelCollection(SplittableIterator, Class)generateSequence(from, to) 例如
DataStreamSourceInteger source env.fromCollection(Arrays.asList(0, 1, 2));
DataStreamInteger dataStream env.fromElements(1, 0, 3, 0, 5);
DataStreamSourceLong source1 env.generateSequence(1, 10); 自定义
旧的方式addSource(SourceFunctionOUT function)
例如读取kafka的数据
env.addSource(new FlinkKafkaConsumer(...)) 新的方式fromSource(SourceOUT, ?, ? source, WatermarkStrategyOUT timestampsAndWatermarks, String sourceName)
例如读取kafka的数据
env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), kafkasource)转换操作transformation
转换操作运算符将一个或多个数据流转换为新的数据流。
程序可以将多个转换组合成复杂的数据流拓扑结构。
本节描述了基本转换、应用这些转换后的有效物理分区以及对Flink运算符链的深入了解。 基本转换
MapFlatMapFilterKeyByReduceWindowWindowAllWindow ApplyWindowReduceUnionWindow JoinInterval JoinWindow CoGroupConnectCoMap, CoFlatMapCache 在Flink WordCount工程 的基础上操作把以下案例代码放在org.example.transformations包或者其他自定义的包下。 Map
DataStream → DataStream
对流数据里的每个元素进行转换得到另一个流数据。
DataStreamInteger dataStream //...
dataStream.map(new MapFunctionInteger, Integer() {Overridepublic Integer map(Integer value) throws Exception {return 2 * value;}
});
将数据流里的每个元素乘以2得到新的数据流并输出完整代码如下
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*Takes one element and produces one element.A map function that doubles the values of the input stream:*/
public class OperatorMap {public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// sourceDataStreamInteger dataStream env.fromElements(1, 2, 3, 4, 5);// transformationsSingleOutputStreamOperatorInteger data dataStream.map(new MapFunctionInteger, Integer() {Overridepublic Integer map(Integer value) throws Exception {return 2 * value;}});// sinkdata.print();// executeenv.execute();}
}运行结果
2 10 7 4 6 2 8 6 1 8 FlatMap
DataStream → DataStream
将数据流中的每个元素转换得到0个1个 或 多个元素
dataStream.flatMap(new FlatMapFunctionString, String() {Overridepublic void flatMap(String value, CollectorString out)throws Exception {for(String word: value.split( )){out.collect(word);}}
});把句子中的单词取出来完整代码如下
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*Takes one element and produces zero, one, or more elements.A flatmap function that splits sentences to words:*/
public class OperatorFlatMap {public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// sourceDataStreamString dataStream env.fromElements(hello world, hello flink, hello hadoop);// transformationsSingleOutputStreamOperatorString data dataStream.flatMap(new FlatMapFunctionString, String() {Overridepublic void flatMap(String value, CollectorString out) throws Exception {for (String word : value.split( )) {out.collect(word);}}});// sinkdata.print();// executeenv.execute();}
}运行结果
5 hello 7 hello 6 hello 7 hadoop 5 world 6 flink Filter
DataStream → DataStream
为每个元素计算一个布尔函数并保留函数返回true的元素。
dataStream.filter(new FilterFunctionInteger() {Overridepublic boolean filter(Integer value) throws Exception {return value ! 0;}
});
输出不是0的元素完整代码如下
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*Evaluates a boolean function for each element and retains those for which the function returns true.A filter that filters out zero values*/
public class OperatorFilter {public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// sourceDataStreamInteger dataStream env.fromElements(1, 0, 3, 0, 5);// transformationsSingleOutputStreamOperatorInteger data dataStream.filter(new FilterFunctionInteger() {Overridepublic boolean filter(Integer value) throws Exception {return value ! 0;}});// sinkdata.print();// executeenv.execute();}
}运行结果
8 5 6 3 4 1 KeyBy
DataStream → KeyedStream
在逻辑上将流划分为不相交的分区。具有相同键的所有记录都被分配到同一个分区。在内部keyBy()是通过散列分区实现的。
dataStream.keyBy(value - value.getSomeKey());
dataStream.keyBy(value - value.f0);根据key进行分组并对值进行求和完整代码如下
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.Arrays;
import java.util.List;/*Logically partitions a stream into disjoint partitions.All records with the same key are assigned to the same partition. Internally, keyBy() is implemented with hash partitioning.There are different ways to specify keys.*/
public class OperatorKeyBy {public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// sourceListTuple2String, Integer dataSource Arrays.asList(Tuple2.of(hello, 3),Tuple2.of(flink, 2),Tuple2.of(hadoop, 4),Tuple2.of(flink, 5));DataStreamSourceTuple2String, Integer dataStream env.fromCollection(dataSource);// transformationsSingleOutputStreamOperatorTuple2String, Integer data dataStream.keyBy(value - value.f0).sum(1);// sinkdata.print();// executeenv.execute();}
}运行结果
3 (hello,3) 7 (flink,2) 8 (hadoop,4) 7 (flink,7) Reduce
KeyedStream → DataStream
对键控数据流进行“滚动”缩减。将当前元素与上一个缩减值组合并发出新值。
keyedStream.reduce(new ReduceFunctionInteger() {Overridepublic Integer reduce(Integer value1, Integer value2)throws Exception {return value1 value2;}
});
对有相同key的值进行规约运算这里做求和运算完整代码如下
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.Arrays;
import java.util.List;/*KeyedStream → DataStreamA “rolling” reduce on a keyed data stream.Combines the current element with the last reduced value and emits the new value.A reduce function that creates a stream of partial sums:*/
public class OperatorReduce {public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// sourceListTuple2String, Integer dataSource Arrays.asList(Tuple2.of(hello, 3),Tuple2.of(flink, 2),Tuple2.of(hadoop, 3),Tuple2.of(flink, 5),Tuple2.of(hello, 1),Tuple2.of(hadoop, 1));DataStreamSourceTuple2String, Integer dataStream env.fromCollection(dataSource);// transformationsKeyedStreamTuple2String, Integer, String keyedStream dataStream.keyBy(value - value.f0);SingleOutputStreamOperatorTuple2String, Integer data keyedStream.reduce(new ReduceFunctionTuple2String, Integer() {Overridepublic Tuple2String, Integer reduce(Tuple2String, Integer value1, Tuple2String, Integer value2) throws Exception {return Tuple2.of(value1.f0, (value1.f1 value2.f1));}});// sinkdata.print();// executeenv.execute();}
}运行结果
7 (flink,2) 8 (hadoop,3) 3 (hello,3) 7 (flink,7) 8 (hadoop,4) 3 (hello,4) Window
KeyedStream → WindowedStream
可以在已分区的KeyedStreams上定义窗口Windows。窗口根据某些特性例如最后10秒内到达的数据对每个键中的数据进行分组。
dataStream.keyBy(value - value.f0).window(TumblingEventTimeWindows.of(Time.seconds(10))); 可以对窗口的数据进行计算例如计算10秒滚动窗口中每个单词出现的次数案例代码如下
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;public class WindowWordCount {public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// source and transformationsSingleOutputStreamOperatorTuple2String, Integer dataStream env.socketTextStream(node1, 7777).flatMap(new Splitter()).keyBy(value - value.f0).window(TumblingProcessingTimeWindows.of(Time.seconds(10))).sum(1);// sinkdataStream.print();// executionenv.execute(Window WordCount);}public static class Splitter implements FlatMapFunctionString, Tuple2String, Integer {Overridepublic void flatMap(String sentence, CollectorTuple2String, Integer out) throws Exception {for (String word : sentence.split( )) {out.collect(new Tuple2String, Integer(word, 1));}}}
}启动nc监听
[hadoopnode1 ~]$ nc -lk 7777 运行flink程序 发送测试数据
[hadoopnode1 ~]$ nc -lk 7777
hello world
hello flink
hello hadoop
hello java
hello
运行结果
5 (world,1)
8 (hadoop,1)
3 (hello,3)
7 (flink,1)
2 (java,1)
3 (hello,1)
3 (hello,1)
注意输入数据的速度不一样会导致数据分配到不同的窗口计算出的结果也会不一样。 WindowAll
DataStream → AllWindowedStream
可以在常规数据流上定义窗口。 Windows 根据某些特征例如最近 10 秒内到达的数据对所有流事件进行分组.
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(10))); 注意很多情况下WindowAll是一种非并行转换。所有记录将被收集到同一个任务中进行计算数据量大可能会出现OOM问题。 把所有窗口中的数据进行规约运算这里使用逗号来拼接每个单词完整代码如下
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;/*Windows can be defined on regular DataStreams.Windows group all the stream events according to some characteristicThis is in many cases a non-parallel transformation. (非并行)All records will be gathered in one task for the windowAll operator.*/
public class OperatorWindowAll {public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// source and transformationsSingleOutputStreamOperatorString result env.socketTextStream(node1, 7777).windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10))).reduce(new ReduceFunctionString() {Overridepublic String reduce(String value1, String value2) throws Exception {return value1 , value2;}});// sinkresult.print();// executeenv.execute();}
}测试数据
[hadoopnode1 ~]$ nc -lk 7777
hello
world
hadoop
flink
hello
运行结果
4 hello,world
5 hadoop,flink,hello Window Apply
WindowedStream → DataStream AllWindowedStream → DataStream
将通用函数应用到整个窗口。
注意如果使用 windowAll 转换则需要使用 AllWindowFunction。
windowedStream.apply(new WindowFunctionTuple2String,Integer, Integer, Tuple, Window() {public void apply (Tuple tuple,Window window,IterableTuple2String, Integer values,CollectorInteger out) throws Exception {int sum 0;for (value t: values) {sum t.f1;}out.collect (new Integer(sum));}
});
// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply (new AllWindowFunctionTuple2String,Integer, Integer, Window() {public void apply (Window window,IterableTuple2String, Integer values,CollectorInteger out) throws Exception {int sum 0;for (value t: values) {sum t.f1;}out.collect (new Integer(sum));}
});
对窗口内元素根据key相同进行求和运算完整代码如下
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;public class OperatorWindowApply {public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// source and transformationsDataStreamSourceString dataStream env.socketTextStream(node1, 7777);WindowedStreamTuple2String, Integer, String, TimeWindow windowedStream dataStream.flatMap(new Splitter()).keyBy(value - value.f0).window(TumblingProcessingTimeWindows.of(Time.seconds(10)));SingleOutputStreamOperatorInteger applyStream windowedStream.apply(new WindowFunctionTuple2String, Integer, Integer, String, TimeWindow() {Overridepublic void apply(String s, TimeWindow window, IterableTuple2String, Integer values, CollectorInteger out) throws Exception {int sum 0;for (Tuple2String, Integer value : values) {sum value.f1;}out.collect(new Integer(sum));}});// sinkapplyStream.print();// executeenv.execute();}public static class Splitter implements FlatMapFunctionString, Tuple2String, Integer {Overridepublic void flatMap(String sentence, CollectorTuple2String, Integer out) throws Exception {for (String word : sentence.split( )) {out.collect(new Tuple2String, Integer(word, 1));}}}
}发送测试数据
[hadoopnode1 ~]$ nc -lk 7777
hello world
hello hadoop
hello flink
flink
运行结果
5 1
3 1
3 2
7 2
8 1
注意输入速度不一样导致数据分配到不同的窗口运行结果也会不一样。 分析结果
第一行hello world在一个窗口每个单词都出现1次所以输出1 、 1
第二行、第三行、第四行 在同一窗口hello出现2次 flink出现2次 hadoop出现一次所以输出 2 、 2、 1 WindowReduce
WindowedStream → DataStream
将函数式Reduce函数应用于窗口并返回Reduce后的值。
windowedStream.reduce (new ReduceFunctionTuple2String,Integer() {public Tuple2String, Integer reduce(Tuple2String, Integer value1, Tuple2String, Integer value2) throws Exception {return new Tuple2String,Integer(value1.f0, value1.f1 value2.f1);}
});
使用Reduce实现词频统计完整代码如下
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;public class OperatorWindowReduce {public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// source and transformationsDataStreamSourceString dataStream env.socketTextStream(node1, 7777);WindowedStreamTuple2String, Integer, String, TimeWindow windowedStream dataStream.flatMap(new Splitter()).keyBy(value - value.f0).window(TumblingProcessingTimeWindows.of(Time.seconds(10)));SingleOutputStreamOperatorTuple2String, Integer result windowedStream.reduce(new ReduceFunctionTuple2String, Integer() {Overridepublic Tuple2String, Integer reduce(Tuple2String, Integer value1, Tuple2String, Integer value2) throws Exception {return new Tuple2String, Integer(value1.f0, value1.f1 value2.f1);}});// sinkresult.print();// executeenv.execute();}public static class Splitter implements FlatMapFunctionString, Tuple2String, Integer {Overridepublic void flatMap(String sentence, CollectorTuple2String, Integer out) throws Exception {for (String word : sentence.split( )) {out.collect(new Tuple2String, Integer(word, 1));}}}
}测试数据
[hadoopnode1 ~]$ nc -lk 7777
hello hello world
hello flink
flink flink
hadoop hadoop
hello
运行结果
5 (world,1)
3 (hello,2)
7 (flink,3)
3 (hello,1)
8 (hadoop,2)
3 (hello,1) Union
DataStream* → DataStream
两个或多个相同类型的数据流联合创建一个包含所有流中所有元素的新流。
dataStream.union(otherStream1, otherStream2, ...);
两个相同类型的数据流联结完整代码如下
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*
Union of two or more data streams creating a new stream containing all the elements from all the streams.
Note: If you union a data stream with itself you will get each element twice in the resulting stream.*/
public class OperatorUnion {public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// sourceDataStreamInteger dataStream1 env.fromElements(1, 2, 3);DataStreamInteger dataStream2 env.fromElements(4, 5, 6);// transformationsDataStreamInteger res dataStream1.union(dataStream2);// sinkres.print();// executeenv.execute();}
}运行结果
1 2 3 4 5 6 Window Join
DataStream,DataStream → DataStream
连接给定键和公共窗口上的两个数据流
dataStream.join(otherStream).where(key selector).equalTo(key selector).window(TumblingEventTimeWindows.of(Time.seconds(3))).apply (new JoinFunction () {...}); 两个数据流的窗口联结案例完整代码如下
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;public class OperatorWindowJoin {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorTuple2String, Integer ds1 env.fromElements(Tuple2.of(a, 1),Tuple2.of(a, 2),Tuple2.of(b, 3),Tuple2.of(c, 4),Tuple2.of(c, 12)).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple2String, IntegerforMonotonousTimestamps().withTimestampAssigner((value, ts) - value.f1 * 1000L));SingleOutputStreamOperatorTuple3String, Integer,Integer ds2 env.fromElements(Tuple3.of(a, 1,1),Tuple3.of(a, 11,1),Tuple3.of(b, 2,1),Tuple3.of(b, 12,1),Tuple3.of(c, 14,1),Tuple3.of(d, 15,1)).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple3String, Integer,IntegerforMonotonousTimestamps().withTimestampAssigner((value, ts) - value.f1 * 1000L));DataStreamString join ds1.join(ds2).where(r1 - r1.f0).equalTo(r2 - r2.f0).window(TumblingEventTimeWindows.of(Time.seconds(10))).apply(new JoinFunctionTuple2String, Integer, Tuple3String, Integer, Integer, String() {Overridepublic String join(Tuple2String, Integer first, Tuple3String, Integer, Integer second) throws Exception {return first ----- second;}});join.print();env.execute();}
}运行结果
(a,1)-----(a,1,1)
(a,2)-----(a,1,1)
(b,3)-----(b,2,1)
(c,12)-----(c,14,1) Interval Join
KeyedStream,KeyedStream → DataStream
在给定时间间隔内使用公共key连接两个KeyedStream的两个元素 e1 和 e2以便 e1.timestamp lowerBound e2.timestamp e1.timestamp upperBound。
keyedStream.intervalJoin(otherKeyedStream).between(Time.milliseconds(-2), Time.milliseconds(2)) // 时间下限时间上限.upperBoundExclusive(true) // 可选项.lowerBoundExclusive(true) // 可选项.process(new IntervalJoinFunction() {...});间隔连接完整代码如下
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;public class OperatorIntervalJoin {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorTuple2String, Integer ds1 env.fromElements(Tuple2.of(a, 1),Tuple2.of(a, 2),Tuple2.of(b, 3),Tuple2.of(c, 4)).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple2String, IntegerforMonotonousTimestamps().withTimestampAssigner((value, ts) - value.f1 * 1000L));SingleOutputStreamOperatorTuple3String, Integer, Integer ds2 env.fromElements(Tuple3.of(a, 1, 1),Tuple3.of(a, 11, 1),Tuple3.of(b, 2, 1),Tuple3.of(b, 12, 1),Tuple3.of(c, 14, 1),Tuple3.of(d, 15, 1)).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple3String, Integer, IntegerforMonotonousTimestamps().withTimestampAssigner((value, ts) - value.f1 * 1000L));KeyedStreamTuple2String, Integer, String ks1 ds1.keyBy(r1 - r1.f0);KeyedStreamTuple3String, Integer, Integer, String ks2 ds2.keyBy(r2 - r2.f0);//调用 interval joinks1.intervalJoin(ks2)// 连接时间间隔.between(Time.seconds(-2), Time.seconds(2)).process(new ProcessJoinFunctionTuple2String, Integer, Tuple3String, Integer, Integer, String() {Overridepublic void processElement(Tuple2String, Integer left, Tuple3String, Integer, Integer right, Context ctx, CollectorString out) throws Exception {out.collect(left ------ right);}}).print();env.execute();}
}运行结果
(a,1)------(a,1,1)
(a,2)------(a,1,1)
(b,3)------(b,2,1) Window CoGroup
DataStream,DataStream → DataStream
将给定键和公共窗口上的两个数据流联合分组。
dataStream.coGroup(otherStream).where(0).equalTo(1).window(TumblingEventTimeWindows.of(Time.seconds(3))).apply (new CoGroupFunction () {...});
两个数据流联合分组完整代码如下
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.example.jiaocai.chapter5.CoGroupExample;public class OperatorWindowCoGroup {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamString socketSource1 env.socketTextStream(node1, 7777);DataStreamString socketSource2 env.socketTextStream(node1, 8888);DataStreamTuple2String, Integer input1 socketSource1.map(line - {String[] arr line.split( );String id arr[0];int t Integer.parseInt(arr[1]);return Tuple2.of(id, t);}).returns(Types.TUPLE(Types.STRING, Types.INT));DataStreamTuple2String, Integer input2 socketSource2.map(line - {String[] arr line.split( );String id arr[0];int t Integer.parseInt(arr[1]);return Tuple2.of(id, t);}).returns(Types.TUPLE(Types.STRING, Types.INT));DataStreamString coGroupResult input1.coGroup(input2).where(i1 - i1.f0).equalTo(i2 - i2.f0).window(TumblingProcessingTimeWindows.of(Time.seconds(10))).apply(new CoGroupExample.MyCoGroupFunction());coGroupResult.print();env.execute(window cogroup function);}public static class MyCoGroupFunction implements CoGroupFunctionTuple2String, Integer, Tuple2String, Integer, String {Overridepublic void coGroup(IterableTuple2String, Integer input1, IterableTuple2String, Integer input2, CollectorString out) {input1.forEach(element - System.out.println(input1 : element.f1));input2.forEach(element - System.out.println(input2 : element.f1));}}
}测试数据
[hadoopnode1 ~]$ nc -lk 7777
hello 2
hello 1
[hadoopnode1 ~]$ nc -lk 8888
hello 3
hello 4
运行结果
input1 :2
input1 :1
input2 :3
input2 :4 Connect
DataStream,DataStream → ConnectedStream
“连接”两个保留其类型的数据流连接允许两个流之间共享状态。两个流的数据类型可以不一样。
DataStreamInteger someStream //...
DataStreamString otherStream //...ConnectedStreamsInteger, String connectedStreams someStream.connect(otherStream);
connect连接后得到ConnectedStreams流对于ConnectedStreams流转换时需要实现CoMapFunction或CoFlatMapFunction接口重写里面的两个方法分别来处理两个流数据也就是第一个方法处理第一个流的数据第二个方法处理第二个流的数据。传入的数据类型如下
// IN1 表示第一个流的数据类型
// IN2 表示第二个流的数据类型
// IN3 表示处理后输出流的数据类型
public interface CoMapFunctionIN1, IN2, OUT extends Function, Serializable {
两个不同数据类型的数据流的联结完整代码如下
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;public class OperatorConnect {public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// sourceDataStreamInteger dataStream1 env.fromElements(1, 2, 3);DataStreamString dataStream2 env.fromElements(hello, flink, spark);// transformationsConnectedStreamsInteger, String connectedStreams dataStream1.connect(dataStream2);SingleOutputStreamOperatorString res connectedStreams.map(new CoMapFunctionInteger, String, String() {Overridepublic String map1(Integer input1) throws Exception {return input1.toString();}Overridepublic String map2(String input2) throws Exception {return input2;}});// sinkres.print();// executeenv.execute();}
}运行结果
1 hello 2 flink 3 spark CoMap, CoFlatMap
ConnectedStream → DataStream
将连接流转换为数据流其中转换与map、flatMap类似
connectedStreams.map(new CoMapFunctionInteger, String, Boolean() {Overridepublic Boolean map1(Integer value) {return true;}Overridepublic Boolean map2(String value) {return false;}
});connectedStreams.flatMap(new CoFlatMapFunctionInteger, String, String() {Overridepublic void flatMap1(Integer value, CollectorString out) {out.collect(value.toString());}Overridepublic void flatMap2(String value, CollectorString out) {for (String word: value.split( )) {out.collect(word);}}
});
将连接流转换为数据流完整代码如下
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.util.Collector;public class OperatorCoFlatMap {public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// sourceDataStreamInteger dataStream1 env.fromElements(1, 2, 3);DataStreamString dataStream2 env.fromElements(hello world, hello flink);// transformationsConnectedStreamsInteger, String connectedStreams dataStream1.connect(dataStream2);SingleOutputStreamOperatorString res connectedStreams.flatMap(new CoFlatMapFunctionInteger, String, String() {Overridepublic void flatMap1(Integer value, CollectorString out) throws Exception {out.collect(value.toString());}Overridepublic void flatMap2(String value, CollectorString out) throws Exception {for (String word : value.split( )) {out.collect(word);}}});// sinkres.print();// executeenv.execute();}
}运行结果
8 hello 8 flink 4 1 7 hello 7 world 5 2 6 3 Cache
缓存转换的中间结果。 目前仅支持以批量执行模式运行的作业。 缓存中间结果是在第一次计算中间结果时延迟生成的以便后续作业可以重用该结果。 如果缓存丢失将使用原始转换重新计算。
DataStreamInteger dataStream //...
CachedDataStreamInteger cachedDataStream dataStream.cache();//缓存数据
cachedDataStream.print(); 物理分区
自定义分区随机分区重新缩放广播 自定义分区
dataStream.partitionCustom(partitioner, someKey);
dataStream.partitionCustom(partitioner, 0); 随机分区
dataStream.shuffle(); 重新缩放
dataStream.rescale(); 广播
dataStream.broadcast(); 任务链和资源组
启动新链禁用链接设置插槽共享组
启动新链
Begin a new chain, starting with this operator. The two mappers will be chained, and filter will not be chained to the first mapper.
someStream.filter(...).map(...).startNewChain().map(...); 禁用链接
Do not chain the map operator.
someStream.map(...).disableChaining(); 设置插槽共享组 someStream.filter(...).slotSharingGroup(name); 名称和描述 someStream.filter(...).setName(filter).setDescription(一些描述内容); 指定计算结果放置在何处sink
writeAsText(path)输出到文件中writeAsCsv(...)输出csv文件中print()打印到控制台writeUsingOutputFormat()writeToSocket输出到Socket中addSink自定义sink例如输出到kafka中 例如
DataStreamSourceInteger dataStream env.fromElements(1, 2, 3);
dataStream.writeAsText(sinkout);
dataStream.print(); 触发程序执行execution
execute同步执行会阻塞其他作业
execute()
execute(String jobName)
execute(StreamGraph streamGraph)
例如
env.execute(); executeAsync异步执行不会阻塞其他作业
executeAsync()
executeAsync(String jobName)
executeAsync(StreamGraph streamGraph)
例如
env.executeAsync(); 完成enjoy it! 文章转载自: http://www.morning.hprmg.cn.gov.cn.hprmg.cn http://www.morning.mtjwp.cn.gov.cn.mtjwp.cn http://www.morning.jljwk.cn.gov.cn.jljwk.cn http://www.morning.pjwrl.cn.gov.cn.pjwrl.cn http://www.morning.crfyr.cn.gov.cn.crfyr.cn http://www.morning.jsdntd.com.gov.cn.jsdntd.com http://www.morning.tpkxs.cn.gov.cn.tpkxs.cn http://www.morning.jbctp.cn.gov.cn.jbctp.cn http://www.morning.sfyqs.cn.gov.cn.sfyqs.cn http://www.morning.dzzjq.cn.gov.cn.dzzjq.cn http://www.morning.jhfkr.cn.gov.cn.jhfkr.cn http://www.morning.jypsm.cn.gov.cn.jypsm.cn http://www.morning.bpkqd.cn.gov.cn.bpkqd.cn http://www.morning.hpggl.cn.gov.cn.hpggl.cn http://www.morning.lbpqk.cn.gov.cn.lbpqk.cn http://www.morning.mzhh.cn.gov.cn.mzhh.cn http://www.morning.hrtwt.cn.gov.cn.hrtwt.cn http://www.morning.snrhg.cn.gov.cn.snrhg.cn http://www.morning.njqpg.cn.gov.cn.njqpg.cn http://www.morning.qzqjz.cn.gov.cn.qzqjz.cn http://www.morning.chkfp.cn.gov.cn.chkfp.cn http://www.morning.mjglk.cn.gov.cn.mjglk.cn http://www.morning.nqyzg.cn.gov.cn.nqyzg.cn http://www.morning.hxhrg.cn.gov.cn.hxhrg.cn http://www.morning.spdyl.cn.gov.cn.spdyl.cn http://www.morning.ptmsk.cn.gov.cn.ptmsk.cn http://www.morning.llqky.cn.gov.cn.llqky.cn http://www.morning.ftntr.cn.gov.cn.ftntr.cn http://www.morning.mcjyair.com.gov.cn.mcjyair.com http://www.morning.fypgl.cn.gov.cn.fypgl.cn http://www.morning.rfpq.cn.gov.cn.rfpq.cn http://www.morning.lmrcq.cn.gov.cn.lmrcq.cn http://www.morning.wbdm.cn.gov.cn.wbdm.cn http://www.morning.dnls.cn.gov.cn.dnls.cn http://www.morning.gmgyt.cn.gov.cn.gmgyt.cn http://www.morning.kmlmf.cn.gov.cn.kmlmf.cn http://www.morning.rsdm.cn.gov.cn.rsdm.cn http://www.morning.qpfmh.cn.gov.cn.qpfmh.cn http://www.morning.smdnl.cn.gov.cn.smdnl.cn http://www.morning.wfpmt.cn.gov.cn.wfpmt.cn http://www.morning.rbsmm.cn.gov.cn.rbsmm.cn http://www.morning.flfxb.cn.gov.cn.flfxb.cn http://www.morning.dzdtj.cn.gov.cn.dzdtj.cn http://www.morning.zbnts.cn.gov.cn.zbnts.cn http://www.morning.qjbxt.cn.gov.cn.qjbxt.cn http://www.morning.lqrpk.cn.gov.cn.lqrpk.cn http://www.morning.gbybx.cn.gov.cn.gbybx.cn http://www.morning.wmsgt.cn.gov.cn.wmsgt.cn http://www.morning.monstercide.com.gov.cn.monstercide.com http://www.morning.cypln.cn.gov.cn.cypln.cn http://www.morning.yqqgp.cn.gov.cn.yqqgp.cn http://www.morning.prgnp.cn.gov.cn.prgnp.cn http://www.morning.xhhqd.cn.gov.cn.xhhqd.cn http://www.morning.jnrry.cn.gov.cn.jnrry.cn http://www.morning.dpdr.cn.gov.cn.dpdr.cn http://www.morning.wcyr.cn.gov.cn.wcyr.cn http://www.morning.gxqpm.cn.gov.cn.gxqpm.cn http://www.morning.xpgwz.cn.gov.cn.xpgwz.cn http://www.morning.wlfxn.cn.gov.cn.wlfxn.cn http://www.morning.tlpsd.cn.gov.cn.tlpsd.cn http://www.morning.dgsr.cn.gov.cn.dgsr.cn http://www.morning.ahscrl.com.gov.cn.ahscrl.com http://www.morning.kfjnx.cn.gov.cn.kfjnx.cn http://www.morning.sbczr.cn.gov.cn.sbczr.cn http://www.morning.zrnph.cn.gov.cn.zrnph.cn http://www.morning.qbjrf.cn.gov.cn.qbjrf.cn http://www.morning.tnqk.cn.gov.cn.tnqk.cn http://www.morning.hxbjt.cn.gov.cn.hxbjt.cn http://www.morning.ksggl.cn.gov.cn.ksggl.cn http://www.morning.kmlmf.cn.gov.cn.kmlmf.cn http://www.morning.njfgl.cn.gov.cn.njfgl.cn http://www.morning.mqffm.cn.gov.cn.mqffm.cn http://www.morning.tgnwt.cn.gov.cn.tgnwt.cn http://www.morning.lmxzw.cn.gov.cn.lmxzw.cn http://www.morning.srzhm.cn.gov.cn.srzhm.cn http://www.morning.jtjmz.cn.gov.cn.jtjmz.cn http://www.morning.xqcst.cn.gov.cn.xqcst.cn http://www.morning.wsyq.cn.gov.cn.wsyq.cn http://www.morning.mjbkp.cn.gov.cn.mjbkp.cn http://www.morning.cwnqd.cn.gov.cn.cwnqd.cn