当前位置: 首页 > news >正文

网站建设步骤图片素材网页开发者选项在哪里

网站建设步骤图片素材,网页开发者选项在哪里,wordpress意见反馈功能,咸阳网站网站建设大纲 Souceschemadescriptor Sinkschemadescriptor Execute完整代码参考资料 《0基础学习PyFlink——使用PyFlink的Sink将结果输出到Mysql》一文中,我们讲到如何通过定义Souce、Sink和Execute三个SQL#xff0c;来实现数据读取、清洗、计算和入库。 如下图所示SQL是最高层级的… 大纲 Souceschemadescriptor Sinkschemadescriptor Execute完整代码参考资料 《0基础学习PyFlink——使用PyFlink的Sink将结果输出到Mysql》一文中,我们讲到如何通过定义Souce、Sink和Execute三个SQL来实现数据读取、清洗、计算和入库。 如下图所示SQL是最高层级的抽象在它之下是Table API。本文我们会将例子中的SQL翻译成Table API来实现等价的功能。 Souce # create table source (# word STRING# ) with (# connector filesystem,# format csv,# path {}# )# .format(input_path)下面的SQL分为两部分 Table结构该表只有一个名字为word类型为string的字段。连接器是“文件系统”filesystem类型格式是csv的文件。这样输入就会按csv格式进行解析。 SQL中的Table对应于Table API中的schema。它用于定义表的结构比如有哪些类型的字段和主键等。 上述整个SQL整体对应于descriptor。即我们可以认为descriptor是表结构连接器。 我们可以让不同的表和不同的连接器结合形成不同的descriptor。这是一个组合关系我们将在下面看到它们的组合方式。 schema # define the source schemasource_schema Schema.new_builder() \.column(word, DataTypes.STRING()) \.build()new_builder()会返回一个Schema.Builder对象 column(self, column_name: str, data_type: Union[str, DataType])方法用于声明该表存在哪些类型、哪些名字的字段同时返回之前的Builder对象 最后的build(self)方法返回Schema.Builder对象构造的Schema对象。 descriptor # Create a source descriptorsource_descriptor TableDescriptor.for_connector(filesystem) \.schema(source_schema) \.option(path, input_path) \.format(csv) \.build()for_connector(connector: str)方法返回一个TableDescriptor.Builder对象 schema(self, schema: Schema)将上面生成的source_schema 对象和descriptor关联 option(self, key: Union[str, ConfigOption], value)用于指定一些参数比如本例用于指定输入文件的路径 format(self, format: Union[str, ‘FormatDescriptor’], format_option: ConfigOption[str] None)用于指定内容的格式这将指导怎么解析和入库 build(self)方法返回TableDescriptor.Builder对象构造的TableDescriptor对象。 Sink # CREATE TABLE WordsCountTableSink (# word STRING,# count BIGINT,# PRIMARY KEY (word) NOT ENFORCED# ) WITH (# connector jdbc,# url jdbc:mysql://127.0.0.1:3306/words_count_db?useSSLfalse,# table-name WordsCountTable,# drivercom.mysql.jdbc.Driver,# usernameadmin,# passwordpwd123# );# schema sink_schema Schema.new_builder() \.column(word, DataTypes.STRING().not_null()) \.column(count, DataTypes.BIGINT()) \.primary_key(word) \.build()大部分代码在之前已经解释过了。我们主要关注于区别点 primary_key(self, *column_names: str) 用于指定表的主键。主键的类型需要使用调用not_null()以表明其非空。 descriptor # Create a sink descriptorsink_descriptor TableDescriptor.for_connector(jdbc) \.schema(sink_schema) \.option(url, jdbc:mysql://127.0.0.1:3306/words_count_db?useSSLfalse) \.option(table-name, WordsCountTable) \.option(driver, com.mysql.jdbc.Driver) \.option(username, admin) \.option(password, pwd123) \.build()这块代码主要是通过option来设置一些连接器相关的设置。可以看到这是用KV形式设计的这样就可以让option方法有很大的灵活性以应对不同连接器千奇百怪的设置。 Execute 使用下面的代码将表创建出来以供后续使用。 t_env.create_table(source, source_descriptor) tab t_env.from_path(source) t_env.create_temporary_table(WordsCountTableSink, sink_descriptor)# execute insert# insert into WordsCountTableSink# select word, count(1) as count# from source# group by word# tab.group_by(col(word)) \.select(col(word), lit(1).count) \.execute_insert(WordsCountTableSink) \.wait()这儿需要介绍的就是lit。它用于生成一个表达式诸如sum、max、avg和count等。 execute_insert(self, table_path_or_descriptor: Union[str, TableDescriptor], overwrite: bool False)用于将之前的计算结果插入到Sink表中 完整代码 import argparse import logging import sysfrom pyflink.common import Configuration from pyflink.table import (EnvironmentSettings, TableEnvironment, Schema) from pyflink.table.types import DataTypes from pyflink.table.table_descriptor import TableDescriptor from pyflink.table.expressions import lit, coldef word_count(input_path):config Configuration()# write all the data to one fileconfig.set_string(parallelism.default, 1)env_settings EnvironmentSettings \.new_instance() \.in_batch_mode() \.with_configuration(config) \.build()t_env TableEnvironment.create(env_settings)# create table source (# word STRING# ) with (# connector filesystem,# format csv,# path {}# )# # define the source schemasource_schema Schema.new_builder() \.column(word, DataTypes.STRING()) \.build()# Create a source descriptorsource_descriptor TableDescriptor.for_connector(filesystem) \.schema(source_schema) \.option(path, input_path) \.format(csv) \.build()t_env.create_table(source, source_descriptor)# CREATE TABLE WordsCountTableSink (# word STRING,# count BIGINT,# PRIMARY KEY (word) NOT ENFORCED# ) WITH (# connector jdbc,# url jdbc:mysql://127.0.0.1:3306/words_count_db?useSSLfalse,# table-name WordsCountTable,# drivercom.mysql.jdbc.Driver,# usernameadmin,# passwordpwd123# );# # define the sink schemasink_schema Schema.new_builder() \.column(word, DataTypes.STRING().not_null()) \.column(count, DataTypes.BIGINT()) \.primary_key(word) \.build()# Create a sink descriptorsink_descriptor TableDescriptor.for_connector(jdbc) \.schema(sink_schema) \.option(url, jdbc:mysql://127.0.0.1:3306/words_count_db?useSSLfalse) \.option(table-name, WordsCountTable) \.option(driver, com.mysql.jdbc.Driver) \.option(username, admin) \.option(password, pwd123) \.build()t_env.create_temporary_table(WordsCountTableSink, sink_descriptor)# execute insert# insert into WordsCountTableSink# select word, count(1) as count# from source# group by word# tab t_env.from_path(source)tab.group_by(col(word)) \.select(col(word), lit(1).count) \.execute_insert(WordsCountTableSink) \.wait()if __name__ __main__:logging.basicConfig(streamsys.stdout, levellogging.INFO, format%(message)s)parser argparse.ArgumentParser()parser.add_argument(--input,destinput,requiredFalse,helpInput file to process.)argv sys.argv[1:]known_args, _ parser.parse_known_args(argv)word_count(known_args.input)参考资料 https://nightlies.apache.org/flink/flink-docs-master/zh/docs/concepts/overview/https://nightlies.apache.org/flink/flink-docs-release-1.17/api/python//reference/pyflink.table/descriptors.html
http://www.tj-hxxt.cn/news/137270.html

相关文章:

  • 做淘宝客如何建自己的网站国外市场网站推广公司
  • html5效果网站有没有便宜的注册代理
  • 织梦网站突然打开很慢职业生涯规划网站开发背景
  • 建设网站cms网上推广兼职
  • 网站建设愿景品牌网站建设平台
  • 展会网站制作企业装修展厅公司
  • 那个网站开发三味wordpress post 模板
  • 哪里做公司网站图文广告开店培训班
  • 网站整站开发视频教程wordpress文章重复
  • 网站开发培训程序员南浔哪有做网站的
  • 宣威市住房与城乡建设局网站wordpress cron
  • 百度网站免费电话汽车网站网页设计
  • 功能型网站开发廊坊学校网站建设
  • 网站改版方案原则餐饮品牌策划设计公司
  • 网站开发相关期刊北京建设教育网站
  • 响应式网站建设模板文化馆 网站 设计
  • 企业网站开发意义人人建站网
  • 校园网站psd会所网站模板
  • 网站前端建设报价单电子商务网站建设有哪些流程图
  • 网站开发从零到上线天眼查网站建设公司
  • 山东建设局网站 王局为什么做电影网站没有流量吗
  • thinkphp做直播网站办公室电脑局域网组建
  • 永州商城网站建设wordpress 排除分类
  • 温州企业网站建设要多少钱wordpress 更换中文
  • 长春专业做网站公司排名网站建设方案协议书
  • 南京网站优化哪家好培训机构的网站建设
  • html网页设计网站开发报告做视频的背景音乐哪里下载网站
  • 黑色风格网站主页面新公司需要做网站
  • 网站未备案 打不开建设网站哪个模板网站
  • 做简单的网站链接前端网页模板