个人网站备案都需要什么,项目之家,农场会员营销网站建设,哈尔滨网络科技公司网站目录 简要说明前置条件maven依赖样例代码 简要说明 
在flink1.14.4 和 flink cdc2.2.1下#xff0c;采用flink sql方式#xff0c;postgresql同步表数据#xff0c;本文采用的是上传jar包#xff0c;利用flink REST api的方式进行sql执行。 
前置条件 
1.开启logical 确保你… 目录 简要说明前置条件maven依赖样例代码 简要说明 
在flink1.14.4 和 flink cdc2.2.1下采用flink sql方式postgresql同步表数据本文采用的是上传jar包利用flink REST api的方式进行sql执行。 
前置条件 
1.开启logical 确保你的 postgresql.conf 文件中的相关设置允许逻辑复制和插件的使用。特别是下面几个配置项 wal_level 应该设置为 logical。 max_replication_slots 需要大于0。 配置文件修改完毕后重启 PostgreSQL 服务 SHOW wal_level; 命令查看日志等级是否修改 2.创建逻辑复制槽 SELECT * FROM pg_create_logical_replication_slot(‘flink_slot’, ‘pgoutput’); flink_slot 为槽名 pgoutput 是从PostgreSQL 10开始提供的一个内置输出插件用于逻辑解码 验证逻辑复制槽SELECT * FROM pg_replication_slots; 查询逻辑复制状态SELECT * FROM pg_stat_replication; 3.更改复制标识包含更新和删除之前值目的是为了确保表 xxxx(tableName) 在实时同步过程中能够正确地捕获并同步更新和删除的数据变化。如果不执行这两条语句那么 xxxx 表可能无法实时同步时丢失更新和删除的数据行信息从而影响同步的准确性 ALTER TABLE xxxx REPLICA IDENTITY FULL; 4.修改类加载机制 在flink的flink-conf.yaml文件classloader.resolve-order: child-first将 child-first 改为 parent-first 
maven依赖 
propertiesproject.build.sourceEncodingUTF-8/project.build.sourceEncodingflink.version1.14.4/flink.versionflink-cdc.version2.2.1/flink-cdc.versionscala.binary.version2.12/scala.binary.version/properties
dependencies!-- flink --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java_${scala.binary.version}/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-jdbc_${scala.binary.version}/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-api-java/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-api-java-bridge_${scala.binary.version}/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients_${scala.binary.version}/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-planner_2.12/artifactIdversion1.14.4/version!--scopeprovided/scope--/dependency!-- flink cdc --dependencygroupIdcom.ververica/groupIdartifactIdflink-sql-connector-mysql-cdc/artifactIdversion${flink-cdc.version}/version/dependencydependencygroupIdcom.ververica/groupIdartifactIdflink-sql-connector-oracle-cdc/artifactIdversion${flink-cdc.version}/version/dependencydependencygroupIdcom.ververica/groupIdartifactIdflink-sql-connector-postgres-cdc/artifactIdversion${flink-cdc.version}/version/dependencydependencygroupIdcom.ververica/groupIdartifactIdflink-sql-connector-sqlserver-cdc/artifactIdversion${flink-cdc.version}/version/dependency!-- database driver --!-- postgresql --dependencygroupIdorg.postgresql/groupIdartifactIdpostgresql/artifactIdversion42.2.5/version/dependency!-- json --dependencygroupIdcom.fasterxml.jackson.core/groupIdartifactIdjackson-databind/artifactIdversion2.9.9.3/version/dependency!-- lombok --dependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdversion1.18.24/version/dependency!-- log --dependencygroupIdorg.slf4j/groupIdartifactIdslf4j-log4j12/artifactIdversion1.7.7/versionscoperuntime/scope/dependencydependencygroupIdlog4j/groupIdartifactIdlog4j/artifactIdversion1.2.17/versionscoperuntime/scope/dependency!-- junit --dependencygroupIdjunit/groupIdartifactIdjunit/artifactIdversion4.12/versionscopetest/scope/dependency样例代码 
sql:
CREATE TABLE new_table1_37877 (
id INT,
name STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
debezium.database.tablename.case.insensitivefalse,
debezium.log.mining.continuous.minetrue,
password*****,
hostname***.**.**.***,
debezium.log.mining.strategyonline_catalog,
connectorpostgres-cdc,
port5432,
schema-namepublic,
database-nametest,
table-namenew_table1,
username******,
slot.nameflink_slot,
decoding.plugin.namepgoutput
);
CREATE TABLE new_table1_bak_37877 (
id INT,
name STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
password*****,
connectorjdbc,
table-namepublic.new_table1_bak,
urljdbc:postgresql://地址:5432/test,
username用户
);
insert into new_table1_bak_37877 select * from new_table1_37877;
参数类
Data
public class InputOutputParams {/*** 作业名称*/private String jobName;/*** 代码文本分号分隔的flink sql语句*/private String codeText;}
main方法
public class FlinkMain {/*** flink job 运行入口** param args 运行参数*/public static void main(String[] args) throws IOException {if (args  null || args.length  0) {throw new RuntimeException(运行参数为空);}// 取第一个参数(必须是json字符串)为运行参数String json  args[0];ObjectMapper objectMapper new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);InputOutputParams params  objectMapper.readValue(json, InputOutputParams.class);// 获取执行环境StreamExecutionEnvironment env  StreamExecutionEnvironment.getExecutionEnvironment();// 开启快照点每 3 * 60秒保存一次快照env.enableCheckpointing(3 * 60 * 1000L);//检查点可容忍失败阈值env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);//检查点超时时间env.getCheckpointConfig().setCheckpointTimeout(10 * 60 * 1000);// 同一时间只允许一个 checkpoint 进行env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 开启在 job 中止后仍然保留的 externalized checkpointsenv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 重启策略最多尝试重启3次每次重启的时间间隔为20秒env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(20L, TimeUnit.SECONDS)));env.setParallelism(1);EnvironmentSettings settings  EnvironmentSettings.newInstance().inStreamingMode().build();// 获取表执行环境StreamTableEnvironment tEnv  StreamTableEnvironment.create(env, settings);tEnv.getConfig().getConfiguration().setString(pipeline.name, params.getJobName());// 执行操作sqlString codeText  params.getCodeText();if (codeText  null || codeText.trim().isEmpty()) {throw new RuntimeException(flink sql is empty);}String[] flinkSqlArr  codeText.split(;);for (String flinkSql : flinkSqlArr) {if (flinkSql ! null  !flinkSql.trim().isEmpty()) {tEnv.executeSql(flinkSql);}}}
} 
将项目打包成不带依赖的jar 
buildpluginsplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-dependency-plugin/artifactIdversion2.10/versionexecutionsexecutionidcopy-dependencies/idphasepackage/phasegoals!-- 复制依赖jar包 --goalcopy-dependencies/goal/goalsconfiguration!-- 依赖jar包输出目录 --outputDirectory${project.build.directory}/lib/outputDirectory/configuration/execution/executions/pluginplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-jar-plugin/artifactIdversion2.4/versionconfigurationarchivemanifest!-- main方法所在主类 --mainClasscom.test.FlinkMain/mainClass/manifest/archive/configuration/plugin/plugins/build然后将lib下的依赖全部拷贝到flink的lib下将刚才打包好的jar界面上传  然后通过postman调用flink的REST api接口提交sql接口文档地址https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/rest_api/  文章转载自: http://www.morning.c7495.cn.gov.cn.c7495.cn http://www.morning.ppqzb.cn.gov.cn.ppqzb.cn http://www.morning.dnmgr.cn.gov.cn.dnmgr.cn http://www.morning.lywcd.cn.gov.cn.lywcd.cn http://www.morning.ntzbr.cn.gov.cn.ntzbr.cn http://www.morning.tsmcc.cn.gov.cn.tsmcc.cn http://www.morning.lfgql.cn.gov.cn.lfgql.cn http://www.morning.gnkdp.cn.gov.cn.gnkdp.cn http://www.morning.rshijie.com.gov.cn.rshijie.com http://www.morning.xrrbj.cn.gov.cn.xrrbj.cn http://www.morning.djbhz.cn.gov.cn.djbhz.cn http://www.morning.xgchm.cn.gov.cn.xgchm.cn http://www.morning.npbnc.cn.gov.cn.npbnc.cn http://www.morning.fhqsm.cn.gov.cn.fhqsm.cn http://www.morning.flqkp.cn.gov.cn.flqkp.cn http://www.morning.jxzfg.cn.gov.cn.jxzfg.cn http://www.morning.xuejitest.com.gov.cn.xuejitest.com http://www.morning.gpnfg.cn.gov.cn.gpnfg.cn http://www.morning.rdkgw.cn.gov.cn.rdkgw.cn http://www.morning.qmxsx.cn.gov.cn.qmxsx.cn http://www.morning.rycd.cn.gov.cn.rycd.cn http://www.morning.lcqrf.cn.gov.cn.lcqrf.cn http://www.morning.mjbjq.cn.gov.cn.mjbjq.cn http://www.morning.mdgpp.cn.gov.cn.mdgpp.cn http://www.morning.rxdsq.cn.gov.cn.rxdsq.cn http://www.morning.mxgpp.cn.gov.cn.mxgpp.cn http://www.morning.hhrpy.cn.gov.cn.hhrpy.cn http://www.morning.fbtgp.cn.gov.cn.fbtgp.cn http://www.morning.rqsr.cn.gov.cn.rqsr.cn http://www.morning.tgbx.cn.gov.cn.tgbx.cn http://www.morning.lkpzx.cn.gov.cn.lkpzx.cn http://www.morning.xhfky.cn.gov.cn.xhfky.cn http://www.morning.cwrnr.cn.gov.cn.cwrnr.cn http://www.morning.hwsgk.cn.gov.cn.hwsgk.cn http://www.morning.srgbr.cn.gov.cn.srgbr.cn http://www.morning.gbjxj.cn.gov.cn.gbjxj.cn http://www.morning.ckwrn.cn.gov.cn.ckwrn.cn http://www.morning.ndtzy.cn.gov.cn.ndtzy.cn http://www.morning.rbnnq.cn.gov.cn.rbnnq.cn http://www.morning.yhywx.cn.gov.cn.yhywx.cn http://www.morning.rykx.cn.gov.cn.rykx.cn http://www.morning.krdxz.cn.gov.cn.krdxz.cn http://www.morning.wxccm.cn.gov.cn.wxccm.cn http://www.morning.sgwr.cn.gov.cn.sgwr.cn http://www.morning.dnqpq.cn.gov.cn.dnqpq.cn http://www.morning.zzaxr.cn.gov.cn.zzaxr.cn http://www.morning.qynnw.cn.gov.cn.qynnw.cn http://www.morning.dsgdt.cn.gov.cn.dsgdt.cn http://www.morning.ypbp.cn.gov.cn.ypbp.cn http://www.morning.prls.cn.gov.cn.prls.cn http://www.morning.chgmm.cn.gov.cn.chgmm.cn http://www.morning.msbmp.cn.gov.cn.msbmp.cn http://www.morning.kbdrq.cn.gov.cn.kbdrq.cn http://www.morning.bkwd.cn.gov.cn.bkwd.cn http://www.morning.jtwck.cn.gov.cn.jtwck.cn http://www.morning.gnwpg.cn.gov.cn.gnwpg.cn http://www.morning.rnngz.cn.gov.cn.rnngz.cn http://www.morning.xfhms.cn.gov.cn.xfhms.cn http://www.morning.fewhope.com.gov.cn.fewhope.com http://www.morning.zkzjm.cn.gov.cn.zkzjm.cn http://www.morning.brcdf.cn.gov.cn.brcdf.cn http://www.morning.zzhqs.cn.gov.cn.zzhqs.cn http://www.morning.rpstb.cn.gov.cn.rpstb.cn http://www.morning.kndyz.cn.gov.cn.kndyz.cn http://www.morning.xfrqf.cn.gov.cn.xfrqf.cn http://www.morning.bctr.cn.gov.cn.bctr.cn http://www.morning.rwrn.cn.gov.cn.rwrn.cn http://www.morning.jcfqg.cn.gov.cn.jcfqg.cn http://www.morning.wslpk.cn.gov.cn.wslpk.cn http://www.morning.xbrxk.cn.gov.cn.xbrxk.cn http://www.morning.sjwiki.com.gov.cn.sjwiki.com http://www.morning.khcpx.cn.gov.cn.khcpx.cn http://www.morning.fkwgk.cn.gov.cn.fkwgk.cn http://www.morning.qwqzk.cn.gov.cn.qwqzk.cn http://www.morning.coatingonline.com.cn.gov.cn.coatingonline.com.cn http://www.morning.tbnn.cn.gov.cn.tbnn.cn http://www.morning.tnfyj.cn.gov.cn.tnfyj.cn http://www.morning.twhgn.cn.gov.cn.twhgn.cn http://www.morning.mlzyx.cn.gov.cn.mlzyx.cn http://www.morning.bhrkx.cn.gov.cn.bhrkx.cn