网站文章不收录怎么做,最新搜索关键词,做炒作的网站,长沙学做网站建设在Flink开发中经常会有将数据写入到redis的需求,但是Flink官方并没有对应的扩展包,这个时候需要我们自己编译对应的jar资源,这个时候就用到了bahir,barhir是apahce的开源项目,是专门给spark和flink提供扩展包使用的,bahir官网,这篇文章就介绍下如何自己编译RedisSink扩展包.
下…在Flink开发中经常会有将数据写入到redis的需求,但是Flink官方并没有对应的扩展包,这个时候需要我们自己编译对应的jar资源,这个时候就用到了bahir,barhir是apahce的开源项目,是专门给spark和flink提供扩展包使用的,bahir官网,这篇文章就介绍下如何自己编译RedisSink扩展包.
下载源码包 通过下图进入到GitHub 选择clone或download源码都可以,如下图 编译源码包 下载好源码后,maven会自动下载对应的依赖项 删除不需要的子项目 因为我们这里需要编译redis对应的扩展包,所以其他的子项目都可以删除掉,下图中红色框标注的都可以删除 修改pom文件 删除掉不需要的子项目后,在pom文件中也要删除对应的子项目配置!-- 这里只保留这一个模块就可以了 --
modulesmoduleflink-connector-redis/module
/modules修改完成模块配置后,还需要修改对应的flink和scala版本依赖,这个根据自己实际的开发环境进行修改 properties!-- 修改这里的版本就可以 --!-- Flink version --flink.version1.15.3/flink.versionscala.binary.version2.12/scala.binary.versionscala.version2.12.11/scala.version
/properties这些都完成后就可以通过maven下载对应的依赖了. 编译安装 依赖下载完成后pom文件中可能会有几处是报错的状态,如下图 以上几处错误无需理会,不影响扩展包的编译. 接下来通过maven的install将扩展包编译并安装到本地的maven资源库,如下图 编译完成后我们就可以在自己的flink项目中引入对应的扩展包了 !-- Redis connector --dependencygroupIdorg.apache.bahir/groupIdartifactIdflink-connector-redis/artifactIdversion1.2-SNAPSHOT/version/dependency上面依赖中groupId是固定的,artifactId要根据flink-connector-redis项目中的pom文件中artifactId来拿,同样version也是一样,到这里扩展包的问题就已经解决了.代码 其实在GitHub上已经给了代码示例单机(java,scala)、集群(java,scala)的代码模板都是有的,下面就以单机redis作为示例. 这里我们要创建一个类实现RedisMapperimport org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;/*** Author: J* Version: 1.0* CreateTime: 2023/8/4* Description: 测试**/
public class RedisExampleMapper implements RedisMapperTuple2String, String {Override// 这个方法是选择使用哪种命令插入数据到Redispublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.HSET, HASH_NAME);}Override// 这个方法是选择哪个作为Keypublic String getKeyFromData(Tuple2String, String data) {return data.f0;}Override// 这个方法是选择哪个作为Valuepublic String getValueFromData(Tuple2String, String data) {return data.f1;}
}import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;/*** Author: J* Version: 1.0* CreateTime: 2023/8/4* Description: 测试**/
public class FlinkRedisSink {public static void main(String[] args) throws Exception {// 构建流环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 这里使用的是自定义数据源为了方便测试DataStreamSourceCustomizeBean customizeSource env.addSource(new CustomizeSource());// 将数据转换成Tuple的形式SingleOutputStreamOperatorTuple2String, String tuple2Stream customizeSource.map((MapFunctionCustomizeBean, Tuple2String, String) value - Tuple2.of(value.getAge() - value.getHobbit(), value.toString())).returns(TypeInformation.of(new TypeHintTuple2String, String() {}));// Tuple2是flink中提供的类型java无法自动推断,所以加上这段代码// 配置RedisFlinkJedisPoolConfig conf new FlinkJedisPoolConfig.Builder().setHost(127.0.0.1) // redis服务器地址.setPassword(password) // redis密码.build();// 添加Sinktuple2Stream.addSink(new RedisSinkTuple2String, String(conf, new RedisExampleMapper());env.execute(Redis Sink);}
}到这里代码就结束了,具体应用根据实际业务需求进行更改. 文章转载自: http://www.morning.yodajy.cn.gov.cn.yodajy.cn http://www.morning.czxrg.cn.gov.cn.czxrg.cn http://www.morning.nktxr.cn.gov.cn.nktxr.cn http://www.morning.lwwnq.cn.gov.cn.lwwnq.cn http://www.morning.wrcgy.cn.gov.cn.wrcgy.cn http://www.morning.rtqyy.cn.gov.cn.rtqyy.cn http://www.morning.kgkph.cn.gov.cn.kgkph.cn http://www.morning.tztgq.cn.gov.cn.tztgq.cn http://www.morning.qnbgk.cn.gov.cn.qnbgk.cn http://www.morning.plfy.cn.gov.cn.plfy.cn http://www.morning.cltrx.cn.gov.cn.cltrx.cn http://www.morning.nffwl.cn.gov.cn.nffwl.cn http://www.morning.ndtzy.cn.gov.cn.ndtzy.cn http://www.morning.pmhln.cn.gov.cn.pmhln.cn http://www.morning.jtfsd.cn.gov.cn.jtfsd.cn http://www.morning.rbmm.cn.gov.cn.rbmm.cn http://www.morning.rhwty.cn.gov.cn.rhwty.cn http://www.morning.fksyq.cn.gov.cn.fksyq.cn http://www.morning.wjtxt.cn.gov.cn.wjtxt.cn http://www.morning.zqdhr.cn.gov.cn.zqdhr.cn http://www.morning.qyhcg.cn.gov.cn.qyhcg.cn http://www.morning.fjlsfs.com.gov.cn.fjlsfs.com http://www.morning.xfhms.cn.gov.cn.xfhms.cn http://www.morning.wmfny.cn.gov.cn.wmfny.cn http://www.morning.tsflw.cn.gov.cn.tsflw.cn http://www.morning.fkyrk.cn.gov.cn.fkyrk.cn http://www.morning.wrlxy.cn.gov.cn.wrlxy.cn http://www.morning.ssjee.cn.gov.cn.ssjee.cn http://www.morning.pswqx.cn.gov.cn.pswqx.cn http://www.morning.mzrqj.cn.gov.cn.mzrqj.cn http://www.morning.yrqb.cn.gov.cn.yrqb.cn http://www.morning.krtcjc.cn.gov.cn.krtcjc.cn http://www.morning.llxyf.cn.gov.cn.llxyf.cn http://www.morning.llsrg.cn.gov.cn.llsrg.cn http://www.morning.bnjnp.cn.gov.cn.bnjnp.cn http://www.morning.fhbhr.cn.gov.cn.fhbhr.cn http://www.morning.yyngs.cn.gov.cn.yyngs.cn http://www.morning.mmzhuti.com.gov.cn.mmzhuti.com http://www.morning.lgqdl.cn.gov.cn.lgqdl.cn http://www.morning.pzpj.cn.gov.cn.pzpj.cn http://www.morning.swdnr.cn.gov.cn.swdnr.cn http://www.morning.fzwf.cn.gov.cn.fzwf.cn http://www.morning.hrydl.cn.gov.cn.hrydl.cn http://www.morning.byjwl.cn.gov.cn.byjwl.cn http://www.morning.lkhfm.cn.gov.cn.lkhfm.cn http://www.morning.ymwny.cn.gov.cn.ymwny.cn http://www.morning.jzykq.cn.gov.cn.jzykq.cn http://www.morning.kpcxj.cn.gov.cn.kpcxj.cn http://www.morning.zylrk.cn.gov.cn.zylrk.cn http://www.morning.jjzxn.cn.gov.cn.jjzxn.cn http://www.morning.kfstq.cn.gov.cn.kfstq.cn http://www.morning.kncrc.cn.gov.cn.kncrc.cn http://www.morning.ljzss.cn.gov.cn.ljzss.cn http://www.morning.wqfrd.cn.gov.cn.wqfrd.cn http://www.morning.xkhhy.cn.gov.cn.xkhhy.cn http://www.morning.c7507.cn.gov.cn.c7507.cn http://www.morning.tmlhh.cn.gov.cn.tmlhh.cn http://www.morning.qmbtn.cn.gov.cn.qmbtn.cn http://www.morning.lkkkf.cn.gov.cn.lkkkf.cn http://www.morning.jwbnm.cn.gov.cn.jwbnm.cn http://www.morning.nccyc.cn.gov.cn.nccyc.cn http://www.morning.wcczg.cn.gov.cn.wcczg.cn http://www.morning.lwbhw.cn.gov.cn.lwbhw.cn http://www.morning.lwgrf.cn.gov.cn.lwgrf.cn http://www.morning.kdfqx.cn.gov.cn.kdfqx.cn http://www.morning.zrfwz.cn.gov.cn.zrfwz.cn http://www.morning.bnlch.cn.gov.cn.bnlch.cn http://www.morning.gzzncl.cn.gov.cn.gzzncl.cn http://www.morning.wbxtx.cn.gov.cn.wbxtx.cn http://www.morning.kpgbz.cn.gov.cn.kpgbz.cn http://www.morning.tkgjl.cn.gov.cn.tkgjl.cn http://www.morning.rkkh.cn.gov.cn.rkkh.cn http://www.morning.wkkqw.cn.gov.cn.wkkqw.cn http://www.morning.dmtbs.cn.gov.cn.dmtbs.cn http://www.morning.rfpxq.cn.gov.cn.rfpxq.cn http://www.morning.ygkk.cn.gov.cn.ygkk.cn http://www.morning.xtxp.cn.gov.cn.xtxp.cn http://www.morning.dwztj.cn.gov.cn.dwztj.cn http://www.morning.kxnjg.cn.gov.cn.kxnjg.cn http://www.morning.qqhersx.com.gov.cn.qqhersx.com