网站没有收录了,怎么通过做网站来赚钱,怎样维护网站建设,在网站建设中什么用于搭建页面结构文章目录 gitee码云地址简介概述01 配置值来自.properties文件1.通过路径读取2.通过文件流读取3.通过IO流读取 02 配置值来自命令行03 配置来自系统属性04 注册以及使用全局变量05 Flink获取参数值Demo1.项目结构2.pom.xml文件如下3.配置文件4.项目主类5.运行查看相关日志 gite… 文章目录 gitee码云地址简介概述01 配置值来自.properties文件1.通过路径读取2.通过文件流读取3.通过IO流读取 02 配置值来自命令行03 配置来自系统属性04 注册以及使用全局变量05 Flink获取参数值Demo1.项目结构2.pom.xml文件如下3.配置文件4.项目主类5.运行查看相关日志 gitee码云地址
直接下载解压可用 https://gitee.com/shawsongyue/aurora.git 模块aurora_flink 主类GetParamsStreamingJob
简介概述
1.几乎所有的批和流的 Flink 应用程序都依赖于外部配置参数。这些配置参数可以用于指定输入和输出源如路径或地址、系统参数并行度运行时配置和特定的应用程序参数通常使用在用户自定义函数。
2.为解决以上问题Flink 提供一个名为 Parametertool 的简单公共类其中包含了一些基本的工具。请注意这里说的 Parametertool 并不是必须使用的。Commons CLI 和 argparse4j 等其他框架也可以非常好地兼容 Flink。
3.**ParameterTool**定义了一组静态方法用于读取配置信息。该工具类内部使用了 Map 类型这样使得它可以很容易地与你的配置集成在一起。
01 配置值来自.properties文件
1.通过路径读取
//定义文件路径
String propertiesFilePath E:\\project\\aurora_dev\\aurora_flink\\src\\main\\resources\\application.properties;//方式一:直接使用内置工具类
ParameterTool parameter_01 ParameterTool.fromPropertiesFile(propertiesFilePath);
String jobName_01 parameter_01.get(jobName);
logger.info(方式一:读取配置文件中指定的key值{},jobName_01);2.通过文件流读取
//定义文件路径
String propertiesFilePath E:\\project\\aurora_dev\\aurora_flink\\src\\main\\resources\\application.properties;//方式二使用文件
File propertiesFile new File(propertiesFilePath);
ParameterTool parameter_02 ParameterTool.fromPropertiesFile(propertiesFile);
String jobName_02 parameter_02.get(jobName);
logger.info(方式二:读取配置文件中指定的key值{},jobName_02);3.通过IO流读取
//定义文件路径
String propertiesFilePath E:\\project\\aurora_dev\\aurora_flink\\src\\main\\resources\\application.properties;//方式三使用IO流
InputStream propertiesFileInputStream new FileInputStream(new File(propertiesFilePath));
ParameterTool parameter_03 ParameterTool.fromPropertiesFile(propertiesFileInputStream);
String jobName_03 parameter_03.get(jobName);
logger.info(方式三:读取配置文件中指定的key值{},jobName_03);02 配置值来自命令行
tips在idea的命令行传参格式–jobName program_job_aurora ParameterTool parameter_04 ParameterTool.fromArgs(args);
String jobName_04 parameter_04.get(jobName);
logger.info(方式四:命令行传参key值{},jobName_04);03 配置来自系统属性
tips在idea的的jvm系统参数设置格式-Dinputhdfs:///mydata //方式五获取jvm参数值
ParameterTool parameter_05 ParameterTool.fromSystemProperties();
String jobName_05 parameter_05.get(input);
logger.info(方式五:获取jvm参数key值{},jobName_05);04 注册以及使用全局变量
注意Flink全局变量仅支持在富函数中使用即Rich开头的类使用
//定义文件路径
String propertiesFilePath E:\\project\\aurora_dev\\aurora_flink\\src\\main\\resources\\application.properties;//直接使用内置工具类获取参数
ParameterTool parameter_01 ParameterTool.fromPropertiesFile(propertiesFilePath);//方式六注册全局参数final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.getConfig().setGlobalJobParameters(parameter_01);//在任意富函数中均可以获取,注意注意注意只有富文本函数才可以使用//1.创建富函数RichFlatMapFunctionString, String richFlatMap new RichFlatMapFunction() {Overridepublic void flatMap(String s, CollectorString collector) throws Exception {//获取运行环境ParameterTool parameters (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();//获取对应的值String jobName parameters.getRequired(jobName);logger.info(方式六:获取全局注册参数key值{},jobName_05);}};//2.创建数据集ArrayListString list new ArrayList();list.add(001);list.add(002);list.add(003);//3.把有限数据集转换为数据源DataStreamSourceString dataStreamSource env.fromCollection(list).setParallelism(1);//4.执行富文本处理dataStreamSource.flatMap(richFlatMap);//5.启动程序env.execute();05 Flink获取参数值Demo
1.项目结构 2.pom.xml文件如下
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.xsy/groupIdartifactIdaurora_flink/artifactIdversion1.0-SNAPSHOT/version!--属性设置--properties!--java_JDK版本--java.version11/java.version!--maven打包插件--maven.plugin.version3.8.1/maven.plugin.version!--编译编码UTF-8--project.build.sourceEncodingUTF-8/project.build.sourceEncoding!--输出报告编码UTF-8--project.reporting.outputEncodingUTF-8/project.reporting.outputEncoding!--json数据格式处理工具--fastjson.version1.2.75/fastjson.version!--log4j版本--log4j.version2.17.1/log4j.version!--flink版本--flink.version1.18.0/flink.version!--scala版本--scala.binary.version2.11/scala.binary.version!--log4j依赖--log4j.version2.17.1/log4j.version/properties!--通用依赖--dependencies!-- json --dependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion${fastjson.version}/version/dependency!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-scala_2.12/artifactIdversion${flink.version}/version/dependency!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients/artifactIdversion${flink.version}/version/dependency!--集成外部依赖--!--集成日志框架 start--dependencygroupIdorg.apache.logging.log4j/groupIdartifactIdlog4j-slf4j-impl/artifactIdversion${log4j.version}/version/dependencydependencygroupIdorg.apache.logging.log4j/groupIdartifactIdlog4j-api/artifactIdversion${log4j.version}/version/dependencydependencygroupIdorg.apache.logging.log4j/groupIdartifactIdlog4j-core/artifactIdversion${log4j.version}/version/dependency!--集成日志框架 end--/dependencies!--编译打包--buildfinalName${project.name}/finalName!--资源文件打包--resourcesresourcedirectorysrc/main/resources/directory/resourceresourcedirectorysrc/main/java/directoryincludesinclude**/*.xml/include/includes/resource/resourcespluginsplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-shade-plugin/artifactIdversion3.1.1/versionexecutionsexecutionphasepackage/phasegoalsgoalshade/goal/goalsconfigurationartifactSetexcludesexcludeorg.apache.flink:force-shading/excludeexcludeorg.google.code.flindbugs:jar305/excludeexcludeorg.slf4j:*/excludeexcluderorg.apache.logging.log4j:*/excluder/excludes/artifactSetfiltersfilterartifact*:*/artifactexcludesexcludeMETA-INF/*.SF/excludeexcludeMETA-INF/*.DSA/excludeexcludeMETA-INF/*.RSA/exclude/excludes/filter/filterstransformerstransformer implementationorg.apache.maven.plugins.shade.resource.ManifestResourceTransformermainClassorg.xsy.sevenhee.flink.TestStreamJob/mainClass/transformer/transformers/configuration/execution/executions/plugin/plugins!--插件统一管理--pluginManagementplugins!--maven打包插件--plugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactIdversion${spring.boot.version}/versionconfigurationforktrue/forkfinalName${project.build.finalName}/finalName/configurationexecutionsexecutiongoalsgoalrepackage/goal/goals/execution/executions/plugin!--编译打包插件--pluginartifactIdmaven-compiler-plugin/artifactIdversion${maven.plugin.version}/versionconfigurationsource${java.version}/sourcetarget${java.version}/targetencodingUTF-8/encodingcompilerArgsarg-parameters/arg/compilerArgs/configuration/plugin/plugins/pluginManagement/build!--配置Maven项目中需要使用的远程仓库--repositoriesrepositoryidaliyun-repos/idurlhttps://maven.aliyun.com/nexus/content/groups/public//urlsnapshotsenabledfalse/enabled/snapshots/repository/repositories!--用来配置maven插件的远程仓库--pluginRepositoriespluginRepositoryidaliyun-plugin/idurlhttps://maven.aliyun.com/nexus/content/groups/public//urlsnapshotsenabledfalse/enabled/snapshots/pluginRepository/pluginRepositories/project3.配置文件
1application.properties
jobNamejob_aurora
jobMemory1024
taskNametask_aurora2log4j2.properties
rootLogger.levelINFO
rootLogger.appenderRef.console.refConsoleAppender
appender.console.nameConsoleAppender
appender.console.typeCONSOLE
appender.console.layout.typePatternLayout
appender.console.layout.pattern%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log.fileD:\\tmprootLogger.levelINFO
rootLogger.appenderRef.console.refConsoleAppender
appender.console.nameConsoleAppender
appender.console.typeCONSOLE
appender.console.layout.typePatternLayout
appender.console.layout.pattern%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log.fileD:\\tmp4.项目主类
package com.aurora;import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;/*** description flink获取外部参数作业** author 浅夏的猫* datetime 15:54 2024/1/28
*/
public class GetParamsStreamingJob {private static final Logger logger LoggerFactory.getLogger(GetParamsStreamingJob.class);public static void main(String[] args) throws Exception {//定义文件路径String propertiesFilePath E:\\project\\aurora_dev\\aurora_flink\\src\\main\\resources\\application.properties;//方式一:直接使用内置工具类ParameterTool parameter_01 ParameterTool.fromPropertiesFile(propertiesFilePath);String jobName_01 parameter_01.get(jobName);logger.info(方式一:读取配置文件中指定的key值{},jobName_01);//方式二使用文件File propertiesFile new File(propertiesFilePath);ParameterTool parameter_02 ParameterTool.fromPropertiesFile(propertiesFile);String jobName_02 parameter_02.get(jobName);logger.info(方式二:读取配置文件中指定的key值{},jobName_02);//方式三使用IO流InputStream propertiesFileInputStream new FileInputStream(new File(propertiesFilePath));ParameterTool parameter_03 ParameterTool.fromPropertiesFile(propertiesFileInputStream);String jobName_03 parameter_03.get(jobName);logger.info(方式三:读取配置文件中指定的key值{},jobName_03);//方式四命令行传参格式--jobName program_job_auroraParameterTool parameter_04 ParameterTool.fromArgs(args);String jobName_04 parameter_04.get(jobName);logger.info(方式四:命令行传参key值{},jobName_04);//方式五获取jvm参数值ParameterTool parameter_05 ParameterTool.fromSystemProperties();String jobName_05 parameter_05.get(input);logger.info(方式五:获取jvm参数key值{},jobName_05);//方式六注册全局参数final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.getConfig().setGlobalJobParameters(parameter_01);//在任意富函数中均可以获取,注意注意注意只有富文本函数才可以使用//1.创建富函数RichFlatMapFunctionString, String richFlatMap new RichFlatMapFunction() {Overridepublic void flatMap(String s, CollectorString collector) throws Exception {//获取运行环境ParameterTool parameters (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();//获取对应的值String jobName parameters.getRequired(jobName);logger.info(方式六:获取全局注册参数key值{},jobName_05);}};//2.创建数据集ArrayListString list new ArrayList();list.add(001);list.add(002);list.add(003);//3.把有限数据集转换为数据源DataStreamSourceString dataStreamSource env.fromCollection(list).setParallelism(1);//4.执行富文本处理dataStreamSource.flatMap(richFlatMap);//5.启动程序env.execute();}}5.运行查看相关日志
文章转载自: http://www.morning.wdnkp.cn.gov.cn.wdnkp.cn http://www.morning.zjrnq.cn.gov.cn.zjrnq.cn http://www.morning.wknj.cn.gov.cn.wknj.cn http://www.morning.zwmjq.cn.gov.cn.zwmjq.cn http://www.morning.mxlwl.cn.gov.cn.mxlwl.cn http://www.morning.smdiaosu.com.gov.cn.smdiaosu.com http://www.morning.skql.cn.gov.cn.skql.cn http://www.morning.trtxt.cn.gov.cn.trtxt.cn http://www.morning.qlwfz.cn.gov.cn.qlwfz.cn http://www.morning.zqdhr.cn.gov.cn.zqdhr.cn http://www.morning.brscd.cn.gov.cn.brscd.cn http://www.morning.gslz.com.cn.gov.cn.gslz.com.cn http://www.morning.gltmz.cn.gov.cn.gltmz.cn http://www.morning.yhywr.cn.gov.cn.yhywr.cn http://www.morning.hwxxh.cn.gov.cn.hwxxh.cn http://www.morning.pxrfm.cn.gov.cn.pxrfm.cn http://www.morning.jwqqd.cn.gov.cn.jwqqd.cn http://www.morning.tlbdy.cn.gov.cn.tlbdy.cn http://www.morning.wdxr.cn.gov.cn.wdxr.cn http://www.morning.wgxtz.cn.gov.cn.wgxtz.cn http://www.morning.skrcn.cn.gov.cn.skrcn.cn http://www.morning.cwfkm.cn.gov.cn.cwfkm.cn http://www.morning.kggxj.cn.gov.cn.kggxj.cn http://www.morning.qdbcd.cn.gov.cn.qdbcd.cn http://www.morning.xqnzn.cn.gov.cn.xqnzn.cn http://www.morning.bnrnb.cn.gov.cn.bnrnb.cn http://www.morning.qfbzj.cn.gov.cn.qfbzj.cn http://www.morning.xkyfq.cn.gov.cn.xkyfq.cn http://www.morning.rwlns.cn.gov.cn.rwlns.cn http://www.morning.qpxrr.cn.gov.cn.qpxrr.cn http://www.morning.mjbkp.cn.gov.cn.mjbkp.cn http://www.morning.kpyyf.cn.gov.cn.kpyyf.cn http://www.morning.nmfml.cn.gov.cn.nmfml.cn http://www.morning.qttft.cn.gov.cn.qttft.cn http://www.morning.buyid.com.cn.gov.cn.buyid.com.cn http://www.morning.xlmgq.cn.gov.cn.xlmgq.cn http://www.morning.zcfmb.cn.gov.cn.zcfmb.cn http://www.morning.bgygx.cn.gov.cn.bgygx.cn http://www.morning.nnttr.cn.gov.cn.nnttr.cn http://www.morning.yfrbn.cn.gov.cn.yfrbn.cn http://www.morning.rnhh.cn.gov.cn.rnhh.cn http://www.morning.lbbgf.cn.gov.cn.lbbgf.cn http://www.morning.cljmx.cn.gov.cn.cljmx.cn http://www.morning.hcgbm.cn.gov.cn.hcgbm.cn http://www.morning.zzaxr.cn.gov.cn.zzaxr.cn http://www.morning.duqianw.com.gov.cn.duqianw.com http://www.morning.flqkp.cn.gov.cn.flqkp.cn http://www.morning.rrrrsr.com.gov.cn.rrrrsr.com http://www.morning.fosfox.com.gov.cn.fosfox.com http://www.morning.tgwfn.cn.gov.cn.tgwfn.cn http://www.morning.ttdxn.cn.gov.cn.ttdxn.cn http://www.morning.rbsmm.cn.gov.cn.rbsmm.cn http://www.morning.mhnrx.cn.gov.cn.mhnrx.cn http://www.morning.rfxw.cn.gov.cn.rfxw.cn http://www.morning.qwgct.cn.gov.cn.qwgct.cn http://www.morning.ntgjm.cn.gov.cn.ntgjm.cn http://www.morning.xqkcs.cn.gov.cn.xqkcs.cn http://www.morning.wlqbr.cn.gov.cn.wlqbr.cn http://www.morning.zcnwg.cn.gov.cn.zcnwg.cn http://www.morning.cnyqj.cn.gov.cn.cnyqj.cn http://www.morning.thntp.cn.gov.cn.thntp.cn http://www.morning.dfckx.cn.gov.cn.dfckx.cn http://www.morning.prjns.cn.gov.cn.prjns.cn http://www.morning.fwgnq.cn.gov.cn.fwgnq.cn http://www.morning.sltfk.cn.gov.cn.sltfk.cn http://www.morning.xqjz.cn.gov.cn.xqjz.cn http://www.morning.bztzm.cn.gov.cn.bztzm.cn http://www.morning.nlwrg.cn.gov.cn.nlwrg.cn http://www.morning.mhlkc.cn.gov.cn.mhlkc.cn http://www.morning.jwsrp.cn.gov.cn.jwsrp.cn http://www.morning.gnmhy.cn.gov.cn.gnmhy.cn http://www.morning.xsctd.cn.gov.cn.xsctd.cn http://www.morning.kzrbd.cn.gov.cn.kzrbd.cn http://www.morning.bkppb.cn.gov.cn.bkppb.cn http://www.morning.xdmsq.cn.gov.cn.xdmsq.cn http://www.morning.yltyr.cn.gov.cn.yltyr.cn http://www.morning.qwmsq.cn.gov.cn.qwmsq.cn http://www.morning.nmyrg.cn.gov.cn.nmyrg.cn http://www.morning.czzpm.cn.gov.cn.czzpm.cn http://www.morning.mcndn.cn.gov.cn.mcndn.cn