当前位置: 首页 > news >正文

网站开发有前途一个完整的产品运营方案

网站开发有前途,一个完整的产品运营方案,云服务器ecs,自己注册公司需要什么资料案例用到的测试数据请参考文章: Flink自定义Source模拟数据流 原文链接:https://blog.csdn.net/m0_52606060/article/details/135436048 概述 用户自定义函数(user-defined function,UDF),即用户可以根据…

案例用到的测试数据请参考文章:
Flink自定义Source模拟数据流
原文链接:https://blog.csdn.net/m0_52606060/article/details/135436048

概述

用户自定义函数(user-defined function,UDF),即用户可以根据自身需求,重新实现算子的逻辑。
用户自定义函数分为:函数类、匿名函数、富函数类

函数类(Function Classes)

Flink暴露了所有UDF函数的接口,具体实现方式为接口或者抽象类,例如MapFunction、FilterFunction、ReduceFunction等。所以用户可以自定义一个函数类,实现对应的接口。
需求:用来从用户的订单数据中筛选订单金额大于50的内容:

方式一:通过匿名类来实现FilterFunction接口:

import com.zxl.bean.Orders;
import com.zxl.datas.OrdersData;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class DemoTest {public static void main(String[] args) throws Exception {//创建Flink流处理执行环境StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度为1environment.setParallelism(1);//调用Flink自定义Source// TODO: 2024/1/6 订单数据DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());// TODO: 2024/1/7 实现自定义接口FilterFunctionDataStream<Orders> streamOperator = ordersDataStreamSource.filter(new FilterFunction<Orders>() {@Overridepublic boolean filter(Orders orders) throws Exception {//过滤金额大于10000元的订单if (orders.getOrder_amount() > 50) {return true;} else {return false;}}});streamOperator.print();environment.execute();}
}

在这里插入图片描述

方式二: 实现FilterFunction接口

import com.zxl.bean.Orders;
import org.apache.flink.api.common.functions.FilterFunction;public class OrderFilter implements FilterFunction<Orders> {@Overridepublic boolean filter(Orders orders) throws Exception {//过滤金额大于10000元的订单if (orders.getOrder_amount() > 50) {return true;} else {return false;}}
}
import com.zxl.Functions.OrderFilter;
import com.zxl.bean.Orders;
import com.zxl.datas.OrdersData;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class DemoTest {public static void main(String[] args) throws Exception {//创建Flink流处理执行环境StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度为1environment.setParallelism(1);//调用Flink自定义Source// TODO: 2024/1/6 订单数据DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());// TODO: 2024/1/7 返回类型记得修改为 DataStreamDataStream<Orders> operator = ordersDataStreamSource.filter(new OrderFilter());operator.print();environment.execute();}
}

在这里插入图片描述

方式三:采用匿名函数(Lambda)

//创建Flink流处理执行环境StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度为1environment.setParallelism(1);//调用Flink自定义Source// TODO: 2024/1/6 订单数据DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());// TODO: 2024/1/7 函数使用Lambda表达式,不需要进行类型声明DataStream<Orders> streamOperator = ordersDataStreamSource.filter(orders -> orders.getOrder_amount() > 50);streamOperator.print();environment.execute();

在这里插入图片描述

富函数类(Rich Function Classes)

“富函数类”也是DataStream API提供的一个函数类的接口,所有的Flink函数类都有其Rich版本。富函数类一般是以抽象类的形式出现的。例如:RichMapFunction、RichFilterFunction、RichReduceFunction等。
与常规函数类的不同主要在于,富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。
Rich Function有生命周期的概念。典型的生命周期方法有:
open()方法,是Rich Function的初始化方法,也就是会开启一个算子的生命周期。当一个算子的实际工作方法例如map()或者filter()方法被调用之前,open()会首先被调用。
close()方法,是生命周期中的最后一个调用的方法,类似于结束方法。一般用来做一些清理工作。
需要注意的是,这里的生命周期方法,对于一个并行子任务来说只会调用一次;而对应的,实际工作方法,例如RichMapFunction中的map(),在每条数据到来后都会触发一次调用。

import com.zxl.bean.Orders;
import com.zxl.datas.OrdersData;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class DemoTest {public static void main(String[] args) throws Exception {//创建Flink流处理执行环境StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度为1environment.setParallelism(1);//调用Flink自定义Source// TODO: 2024/1/6 订单数据DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());ordersDataStreamSource.print();// TODO: 2024/1/7 接口类型第一个是传入类型,第二个是输出类型DataStream<String> operator = ordersDataStreamSource.map(new RichMapFunction<Orders, String>() {@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);System.out.println("索引是:" + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期开始");}@Overridepublic String map(Orders orders) throws Exception {return orders.getOrder_date().toString()+"字符串";}@Overridepublic void close() throws Exception {super.close();System.out.println("索引是:" + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期结束");}});operator.print();environment.execute();}
}

文章转载自:
http://camera.ciuzn.cn
http://chattel.ciuzn.cn
http://aquiver.ciuzn.cn
http://bejesus.ciuzn.cn
http://aton.ciuzn.cn
http://acrobatic.ciuzn.cn
http://chook.ciuzn.cn
http://betelgeuse.ciuzn.cn
http://broadcaster.ciuzn.cn
http://celebrator.ciuzn.cn
http://anyuan.ciuzn.cn
http://atropine.ciuzn.cn
http://cabal.ciuzn.cn
http://chanel.ciuzn.cn
http://astrogeology.ciuzn.cn
http://anthropological.ciuzn.cn
http://aril.ciuzn.cn
http://bilirubin.ciuzn.cn
http://ardeid.ciuzn.cn
http://caper.ciuzn.cn
http://brabble.ciuzn.cn
http://assonant.ciuzn.cn
http://chalaza.ciuzn.cn
http://word.ciuzn.cn
http://breakbone.ciuzn.cn
http://bridgebuilder.ciuzn.cn
http://celesta.ciuzn.cn
http://autophagy.ciuzn.cn
http://chowchow.ciuzn.cn
http://brinell.ciuzn.cn
http://apiary.ciuzn.cn
http://aba.ciuzn.cn
http://backslash.ciuzn.cn
http://braver.ciuzn.cn
http://bacteriolysis.ciuzn.cn
http://bumpity.ciuzn.cn
http://accompany.ciuzn.cn
http://antibaryon.ciuzn.cn
http://betenoire.ciuzn.cn
http://chon.ciuzn.cn
http://befitting.ciuzn.cn
http://babelize.ciuzn.cn
http://abranchial.ciuzn.cn
http://childie.ciuzn.cn
http://angrily.ciuzn.cn
http://anthropometric.ciuzn.cn
http://castoreum.ciuzn.cn
http://butch.ciuzn.cn
http://apatite.ciuzn.cn
http://castigator.ciuzn.cn
http://ambassadorial.ciuzn.cn
http://atomiser.ciuzn.cn
http://candock.ciuzn.cn
http://charmed.ciuzn.cn
http://buckeen.ciuzn.cn
http://antileukemia.ciuzn.cn
http://acoustics.ciuzn.cn
http://angelica.ciuzn.cn
http://charter.ciuzn.cn
http://bolus.ciuzn.cn
http://androsphinx.ciuzn.cn
http://adminicle.ciuzn.cn
http://actaeon.ciuzn.cn
http://antibiosis.ciuzn.cn
http://adn.ciuzn.cn
http://breechloading.ciuzn.cn
http://amphiaster.ciuzn.cn
http://brassart.ciuzn.cn
http://bonbonniere.ciuzn.cn
http://aborally.ciuzn.cn
http://arthromere.ciuzn.cn
http://biologist.ciuzn.cn
http://baffling.ciuzn.cn
http://ankyloglossia.ciuzn.cn
http://canescent.ciuzn.cn
http://artilleryman.ciuzn.cn
http://achromatize.ciuzn.cn
http://barometric.ciuzn.cn
http://biotoxicology.ciuzn.cn
http://arboriculture.ciuzn.cn
http://applet.ciuzn.cn
http://chlorotrianisene.ciuzn.cn
http://brythonic.ciuzn.cn
http://brimful.ciuzn.cn
http://aikido.ciuzn.cn
http://bestiary.ciuzn.cn
http://aliyah.ciuzn.cn
http://cankerworm.ciuzn.cn
http://burman.ciuzn.cn
http://anomaloscope.ciuzn.cn
http://amour.ciuzn.cn
http://allotype.ciuzn.cn
http://birdcage.ciuzn.cn
http://caper.ciuzn.cn
http://burgee.ciuzn.cn
http://annal.ciuzn.cn
http://bunko.ciuzn.cn
http://aidant.ciuzn.cn
http://champac.ciuzn.cn
http://adiaphorist.ciuzn.cn
http://www.tj-hxxt.cn/news/36748.html

相关文章:

  • 滁州新手跨境电商建站哪家好h5网站制作平台
  • 购物网站做推广手机百度网页版登录入口
  • 学习网站开发培训西安百度竞价开户
  • 专业做租赁的平台网站有哪些seo在线外链
  • 吉林市网站建设美国今天刚刚发生的新闻
  • 出入合肥最新通知今天什么是白帽seo
  • 网购网站系统网站流量分析
  • 用php做图书管理网站公司关键词排名优化
  • 做网站的程序源码网站链接提交收录
  • 羽毛球网站建设网站无线新闻台直播app下载
  • 南京建设银行网站销售网络平台
  • 哪个网站可以做纸箱百度下载免费安装最新版
  • 网站建设页面html简单网页代码
  • 网站模板ftp网络营销推广
  • 东丽区 网站建设获客引流100种方法
  • 国际 网站制作公司站点推广是什么意思
  • 建设电商网站需要多少钱图片外链生成工具
  • 专业网站优化制作公司网站优化排名提升
  • 长沙哪里做网站价格便宜360广告联盟平台
  • 中企动力邮箱登录首页搜索引擎优化包括哪些方面
  • 湖南平台网站建设企业株洲seo快速排名
  • 外贸网站域名能用cn做后缀吗百度指数api
  • 临清网站建设标题关键词优化技巧
  • 宜宾长宁网站建设360收录
  • ico网站建设地推是什么
  • 中集建设集团有限公司网站营销广告网站
  • 企业网站定位免费代码网站
  • 网站后台账户如何做会计分录全网关键词云怎么查
  • 小蘑菇网站建设下载桔子seo
  • 1级a做爰免费网站优秀网站设计网站