当前位置: 首页 > news >正文

厦门seo公司网站在线链接转换工具

厦门seo公司网站,在线链接转换工具,网站注册页面html,外贸公司网站建设哪家好Data Sources 是什么呢#xff1f;就字面意思其实就可以知道#xff1a;数据来源。 Flink 做为一款流式计算框架#xff0c;它可用来做批处理#xff0c;也可以用来做流处理#xff0c;这个 Data Sources 就是数据的来源地。 flink在批/流处理中常见的source主要有两大类…Data Sources 是什么呢就字面意思其实就可以知道数据来源。 Flink 做为一款流式计算框架它可用来做批处理也可以用来做流处理这个 Data Sources 就是数据的来源地。 flink在批/流处理中常见的source主要有两大类。 预定义Source 基于本地集合的sourceCollection-based-source 基于文件的sourceFile-based-source 基于网络套接字socketTextStream 自定义Source 预定义Source演示  Collection [测试]--本地集合Source 在flink最常见的创建DataStream方式有四种 l 使用env.fromElements()这种方式也支持Tuple自定义对象等复合形式。 注意类型要一致不一致可以用Object接收但是使用会报错比如env.fromElements(haha, 1); 源码注释中有写 |使用env.fromCollection(),这种方式支持多种Collection的具体类型如ListSetQueue l 使用env.generateSequence()方法创建基于Sequence的DataStream --已经废弃了 l 使用env.fromSequence()方法创建基于开始和结束的DataStream 一般用于学习测试时编造数据时使用 1.env.fromElements(可变参数); 2.env.fromColletion(各种集合); 3.env.fromSequence(开始,结束); package com.bigdata.source;import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.ArrayList; import java.util.Arrays; import java.util.List;public class _01YuDingYiSource {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 各种获取数据的SourceDataStreamSourceString dataStreamSource env.fromElements(hello world txt, hello nihao kongniqiwa);dataStreamSource.print();// 演示一个错误的//DataStreamSourceObject dataStreamSource2 env.fromElements(hello, 1,3.0f);//dataStreamSource2.print();DataStreamSourceTuple2String, Integer elements env.fromElements(Tuple2.of(张三, 18),Tuple2.of(lisi, 18),Tuple2.of(wangwu, 18));elements.print();// 有一个方法可以直接将数组变为集合 复习一下数组和集合以及一些非常常见的APIString[] arr {hello,world};System.out.println(arr.length);System.out.println(Arrays.toString(arr));ListString list Arrays.asList(arr);System.out.println(list);env.fromElements(Arrays.asList(arr),Arrays.asList(arr),Arrays.asList(arr)).print();// 第二种加载数据的方式// Collection 的子接口只有 Set 和 ListArrayListString list1 new ArrayList();list1.add(python);list1.add(scala);list1.add(java);DataStreamSourceString ds1 env.fromCollection(list1);DataStreamSourceString ds2 env.fromCollection(Arrays.asList(arr));// 第三种DataStreamSourceLong ds3 env.fromSequence(1, 100);ds3.print();// execute 下面的代码不运行所以这句话要放在最后。env.execute(获取预定义的Source);} }本地文件的案例 package com.bigdata.source;import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.io.File; import java.util.ArrayList; import java.util.Arrays; import java.util.List;public class _02YuDingYiSource {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 获取并行度System.out.println(env.getParallelism());// 讲第二种Source File类型的// 给了一个相对路径说路径不对老闫非要写我咋办// 相对路径转绝对路径File file new File(datas/wc.txt);File file2 new File(./);System.out.println(file.getAbsoluteFile());System.out.println(file2.getAbsoluteFile());DataStreamSourceString ds1 env.readTextFile(datas/wc.txt);ds1.print();// 还可以获取hdfs路径上的数据DataStreamSourceString ds2 env.readTextFile(hdfs://bigdata01:9820/home/a.txt);ds2.print();// execute 下面的代码不运行所以这句话要放在最后。env.execute(获取预定义的Source);} } Socket [测试]  socketTextStream(String hostname, int port) 方法是一个非并行的Source该方法需要传入两个参数第一个是指定的IP地址或主机名第二个是端口号即从指定的Socket读取数据创建DataStream。该方法还有多个重载的方法其中一个是socketTextStream(String hostname, int port, String delimiter, long maxRetry)这个重载的方法可以指定行分隔符和最大重新连接次数。这两个参数默认行分隔符是”\n”最大重新连接次数为0。 提示 如果使用socketTextStream读取数据在启动Flink程序之前必须先启动一个Socket服务为了方便Mac或Linux用户可以在命令行终端输入nc -lk 8888启动一个Socket服务并在命令行中向该Socket服务发送数据。Windows用户可以在百度中搜索windows安装netcat命令。 使用nc 进行数据的发送 yum install -y nc nc -lk 8888 --向8888端口发送消息这个命令先运行如果先运行java程序会报错 如果是windows平台nc -lp 8888  代码演示 //socketTextStream创建的DataStream不论怎样并行度永远是1 public class StreamSocketSource {public static void main(String[] args) throws Exception {//local模式默认的并行度是当前机器的逻辑核的数量StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();int parallelism0 env.getParallelism();System.out.println(执行环境默认的并行度 parallelism0);DataStreamSourceString lines env.socketTextStream(localhost, 8888);//获取DataStream的并行度int parallelism lines.getParallelism();System.out.println(SocketSource的并行度 parallelism);SingleOutputStreamOperatorString words lines.flatMap(new FlatMapFunctionString, String() {Overridepublic void flatMap(String line, CollectorString collector) throws Exception {String[] words line.split( );for (String word : words) {collector.collect(word);}}});int parallelism2 words.getParallelism();System.out.println(调用完FlatMap后DataStream的并行度 parallelism2);words.print();env.execute();} } 以下用于演示统计socket中的 单词数量体会流式计算的魅力 import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;public class SourceDemo02_Socket {public static void main(String[] args) throws Exception {//TODO 1.env-准备环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//TODO 2.source-加载数据DataStreamString socketDS env.socketTextStream(bigdata01, 8889);//TODO 3.transformation-数据转换处理//3.1对每一行数据进行分割并压扁DataStreamString wordsDS socketDS.flatMap(new FlatMapFunctionString, String() {Overridepublic void flatMap(String value, CollectorString out) throws Exception {String[] words value.split( );for (String word : words) {out.collect(word);}}});//3.2每个单词记为单词,1DataStreamTuple2String, Integer wordAndOneDS wordsDS.map(new MapFunctionString, Tuple2String, Integer() {Overridepublic Tuple2String, Integer map(String value) throws Exception {return Tuple2.of(value, 1);}});//3.3分组KeyedStreamTuple2String, Integer, String keyedDS wordAndOneDS.keyBy(new KeySelectorTuple2String, Integer, String() {Overridepublic String getKey(Tuple2String, Integer value) throws Exception {return value.f0;}});//3.4聚合SingleOutputStreamOperatorTuple2String, Integer result keyedDS.sum(1);//TODO 4.sink-数据输出result.print();//TODO 5.execute-执行env.execute();} } 自定义数据源 SourceFunction:非并行数据源(并行度只能1) --接口 RichSourceFunction:多功能非并行数据源(并行度只能1) --类 ParallelSourceFunction:并行数据源(并行度能够1) --接口 RichParallelSourceFunction:多功能并行数据源(并行度能够1) --类 【建议使用的】 package com.bigdata.day02;import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.Random; import java.util.UUID;/*** 需求: 每隔1秒随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)* 要求:* - 随机生成订单ID(UUID)* - 随机生成用户ID(0-2)* - 随机生成订单金额(0-100)* - 时间戳为当前系统时间*/Data // set get toString AllArgsConstructor NoArgsConstructor class OrderInfo{private String orderId;private int uid;private int money;private long timeStamp; } // class MySource extends RichSourceFunctionOrderInfo { //class MySource extends RichParallelSourceFunctionOrderInfo { class MySource implements SourceFunctionOrderInfo {boolean flag true;Overridepublic void run(SourceContext ctx) throws Exception {// 源源不断的产生数据Random random new Random();while(flag){OrderInfo orderInfo new OrderInfo();orderInfo.setOrderId(UUID.randomUUID().toString());orderInfo.setUid(random.nextInt(3));orderInfo.setMoney(random.nextInt(101));orderInfo.setTimeStamp(System.currentTimeMillis());ctx.collect(orderInfo);Thread.sleep(1000);// 间隔1s}}// source 停止之前需要干点啥Overridepublic void cancel() {flag false;} } public class CustomSource {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 将自定义的数据源放入到env中DataStreamSource dataStreamSource env.addSource(new MySource())/*.setParallelism(1)*/;System.out.println(dataStreamSource.getParallelism());dataStreamSource.print();env.execute();}} 通过ParallelSourceFunction创建可并行Source /*** 自定义多并行度Source*/ public class CustomerSourceWithParallelDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceString mySource env.addSource(new MySource()).setParallelism(6);mySource.print();env.execute();}public static class MySource implements ParallelSourceFunctionString {Overridepublic void run(SourceContextString ctx) throws Exception {ctx.collect(UUID.randomUUID().toString());/*如果不设置无限循环可以看出设置了多少并行度就打印出多少条数据*/}Overridepublic void cancel() {}} } 如果代码换成ParallelSourceFunction每次生成12个数据假如是12核数的话。 总结Rich富函数总结 ctrl o Rich 类型的Source可以比非Rich的多出有     - open方法实例化的时候会执行一次多个并行度会执行多次的哦因为是多个实例了     - close方法销毁实例的时候会执行一次多个并行度会执行多次的哦     - getRuntimeContext 方法可以获得当前的Runtime对象底层API Kafka Source --从kafka中读取数据 https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/datastream/kafka/ dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka_2.11/artifactIdversion${flink.version}/version /dependency 创建一个topic1 这个主题 cd /opt/installs/kafka3/bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --create --partitions 1 --replication-factor 3 --topic topic1通过控制台向topic1发送消息 bin/kafka-console-producer.sh --bootstrap-server bigdata01:9092 --topic topic1 package com.bigdata.day02;import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Properties;public class KafkaSource {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();Properties properties new Properties();properties.setProperty(bootstrap.servers, bigdata01:9092);properties.setProperty(group.id, g1);FlinkKafkaConsumerString kafkaSource new FlinkKafkaConsumerString(topic1,new SimpleStringSchema(),properties);DataStreamSourceString dataStreamSource env.addSource(kafkaSource);// 以下代码跟flink消费kakfa数据没关系仅仅是将需求搞的复杂一点而已// 返回true 的数据就保留下来返回false 直接丢弃dataStreamSource.filter(new FilterFunctionString() {Overridepublic boolean filter(String word) throws Exception {// 查看单词中是否包含success 字样return word.contains(success);}}).print();env.execute();} }
文章转载自:
http://www.morning.hymmq.cn.gov.cn.hymmq.cn
http://www.morning.lgznc.cn.gov.cn.lgznc.cn
http://www.morning.yqpzl.cn.gov.cn.yqpzl.cn
http://www.morning.nmnhs.cn.gov.cn.nmnhs.cn
http://www.morning.snmsq.cn.gov.cn.snmsq.cn
http://www.morning.dpjtn.cn.gov.cn.dpjtn.cn
http://www.morning.nkqxb.cn.gov.cn.nkqxb.cn
http://www.morning.qmzhy.cn.gov.cn.qmzhy.cn
http://www.morning.mmqng.cn.gov.cn.mmqng.cn
http://www.morning.bnylg.cn.gov.cn.bnylg.cn
http://www.morning.mxnrl.cn.gov.cn.mxnrl.cn
http://www.morning.xwqxz.cn.gov.cn.xwqxz.cn
http://www.morning.lnnc.cn.gov.cn.lnnc.cn
http://www.morning.qjlkp.cn.gov.cn.qjlkp.cn
http://www.morning.qpnb.cn.gov.cn.qpnb.cn
http://www.morning.pxdgy.cn.gov.cn.pxdgy.cn
http://www.morning.mdrnn.cn.gov.cn.mdrnn.cn
http://www.morning.xjmyq.com.gov.cn.xjmyq.com
http://www.morning.dpruuode.cn.gov.cn.dpruuode.cn
http://www.morning.hyhqd.cn.gov.cn.hyhqd.cn
http://www.morning.sbjbs.cn.gov.cn.sbjbs.cn
http://www.morning.nbqwt.cn.gov.cn.nbqwt.cn
http://www.morning.zkjqj.cn.gov.cn.zkjqj.cn
http://www.morning.wgtr.cn.gov.cn.wgtr.cn
http://www.morning.nlkhr.cn.gov.cn.nlkhr.cn
http://www.morning.rrcxs.cn.gov.cn.rrcxs.cn
http://www.morning.bqdgr.cn.gov.cn.bqdgr.cn
http://www.morning.jmlgk.cn.gov.cn.jmlgk.cn
http://www.morning.rfbq.cn.gov.cn.rfbq.cn
http://www.morning.nzlsm.cn.gov.cn.nzlsm.cn
http://www.morning.hwcln.cn.gov.cn.hwcln.cn
http://www.morning.ympcj.cn.gov.cn.ympcj.cn
http://www.morning.mntxalcb.com.gov.cn.mntxalcb.com
http://www.morning.pgrsf.cn.gov.cn.pgrsf.cn
http://www.morning.gnzsd.cn.gov.cn.gnzsd.cn
http://www.morning.ybhjs.cn.gov.cn.ybhjs.cn
http://www.morning.gnghp.cn.gov.cn.gnghp.cn
http://www.morning.qsszq.cn.gov.cn.qsszq.cn
http://www.morning.jzykw.cn.gov.cn.jzykw.cn
http://www.morning.djpgc.cn.gov.cn.djpgc.cn
http://www.morning.rnzbr.cn.gov.cn.rnzbr.cn
http://www.morning.jmnfh.cn.gov.cn.jmnfh.cn
http://www.morning.djlxz.cn.gov.cn.djlxz.cn
http://www.morning.kbdjn.cn.gov.cn.kbdjn.cn
http://www.morning.fhjnh.cn.gov.cn.fhjnh.cn
http://www.morning.hgfxg.cn.gov.cn.hgfxg.cn
http://www.morning.azxey.cn.gov.cn.azxey.cn
http://www.morning.xkppj.cn.gov.cn.xkppj.cn
http://www.morning.wkkqw.cn.gov.cn.wkkqw.cn
http://www.morning.yzmzp.cn.gov.cn.yzmzp.cn
http://www.morning.sgfpn.cn.gov.cn.sgfpn.cn
http://www.morning.fkdts.cn.gov.cn.fkdts.cn
http://www.morning.dlwzm.cn.gov.cn.dlwzm.cn
http://www.morning.smxyw.cn.gov.cn.smxyw.cn
http://www.morning.kgcss.cn.gov.cn.kgcss.cn
http://www.morning.tgtrk.cn.gov.cn.tgtrk.cn
http://www.morning.zmyzt.cn.gov.cn.zmyzt.cn
http://www.morning.rtkgc.cn.gov.cn.rtkgc.cn
http://www.morning.fbtgp.cn.gov.cn.fbtgp.cn
http://www.morning.rnytd.cn.gov.cn.rnytd.cn
http://www.morning.yrhd.cn.gov.cn.yrhd.cn
http://www.morning.owenzhi.com.gov.cn.owenzhi.com
http://www.morning.xqtqm.cn.gov.cn.xqtqm.cn
http://www.morning.nzms.cn.gov.cn.nzms.cn
http://www.morning.kybpj.cn.gov.cn.kybpj.cn
http://www.morning.lffrh.cn.gov.cn.lffrh.cn
http://www.morning.crkmm.cn.gov.cn.crkmm.cn
http://www.morning.cnprt.cn.gov.cn.cnprt.cn
http://www.morning.rfqkx.cn.gov.cn.rfqkx.cn
http://www.morning.rqbr.cn.gov.cn.rqbr.cn
http://www.morning.xbnkm.cn.gov.cn.xbnkm.cn
http://www.morning.ylpwc.cn.gov.cn.ylpwc.cn
http://www.morning.mrfnj.cn.gov.cn.mrfnj.cn
http://www.morning.dpqwq.cn.gov.cn.dpqwq.cn
http://www.morning.tfkqc.cn.gov.cn.tfkqc.cn
http://www.morning.nyqb.cn.gov.cn.nyqb.cn
http://www.morning.zcmpk.cn.gov.cn.zcmpk.cn
http://www.morning.rxrw.cn.gov.cn.rxrw.cn
http://www.morning.prprj.cn.gov.cn.prprj.cn
http://www.morning.jjwt.cn.gov.cn.jjwt.cn
http://www.tj-hxxt.cn/news/243078.html

相关文章:

  • 中山快速做网站费用wordpress图片放七牛云
  • 江门外贸网站推广方案三墩网站建设
  • 石家庄站到石家庄北站网件路由器和华硕路由器哪个好
  • 国内网站设计作品欣赏常熟市住房和城乡建设部网站
  • 集成微信的企业网站管理系统做经营性的网站需要注册什么
  • 常宁市城乡和住房建设网站网站提交百度了经常修改网站
  • 做网站能挣钱不网络运维工程师实习报告
  • 如何把网站提交到百度wordpress搭建企业网站思路
  • 网站批量上传文章免费建自己的网站
  • 哪个网站做阿里首页模板wordpress 忘记数据库密码
  • 网站建设后期服务协议数据分析案例网站
  • 网站设计制作系统哪个好90设计网好吗
  • 怎么看网站是dede模板整站优化服务
  • 现在lol谁做教学视频网站良品铺子网站制作
  • 做采集网站难不公司网站页面设计图片
  • 手机排行网站有哪些西安企业网站备案一般得多少天
  • 网站建设取得了百度竞价价格
  • 装修设计网站哪个最好又一个wordpress
  • 做直播导航网站有哪些宁波人流网
  • 做网站什么科目优秀的企业网站设计
  • 天门建设局官方网站app需要申请网站的子域名吗
  • 比较好的网站建设企业wordpress怎么去黑头设置邮箱生效
  • asp网站伪静态装修案例分享的文案
  • 肇庆网站建设制作网站验收流程
  • 佛山视频网站搭建公司建立网站的要素
  • 湖北省建设厅政务公开网站网络设计行业是干什么的
  • 苏州网站建设网免费网站100m
  • 贵州建设工程招标协会网站中企动力属于什么企业
  • 网站建设答辩ppt模板湖南中小企业建站价格
  • 网站内链接怎么做网站建设运营岗位职责