网站外链分析工具,同城信息小程序源码,1920的做网站做多大,阿里巴巴国际站运营教程背景#xff1a; 最近公司出现做了一个新需求#xff0c;需求内容是加工一个营销时机#xff0c;但是加工营销时机的同时需要把数据内容里的一个idmapping存入redis用于后续的读写。 准备#xff1a; !-- 依赖 --dependencygroupIdorg.apache.fl…背景 最近公司出现做了一个新需求需求内容是加工一个营销时机但是加工营销时机的同时需要把数据内容里的一个idmapping存入redis用于后续的读写。 准备 !-- 依赖 --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka/artifactIdversion3.2.0-1.19/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-base/artifactIdversion1.19.1/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-redis_2.11/artifactIdversion1.1.5/version/dependency/dependencies
代码
package com.iterge.flink;import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;/*** Hello world!**/
Slf4j
public class FlinkDemo {//创建连接池static final JedisPool pool new JedisPool(127.0.0.0,8423);//创建redis客户端static final Jedis jedis pool.getResource();public static void main( String[] args ) throws Exception {final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//DataStreamSourceString stringDataStreamSource env.fromData(Arrays.asList(1, 2, 3));KafkaSourceString source KafkaSource.Stringbuilder().setBootstrapServers(localhost:9092).setTopics(it.erge.test.topic).setGroupId(it.erge.test.topic.1).setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSourceString stringDataStreamSource env.fromSource(source, WatermarkStrategy.noWatermarks(), Kafka Source);stringDataStreamSource.map(new RichMapFunctionString, String() {Overridepublic String map(String s) throws Exception {//读redisSystem.out.println(testjedis.get(test));//写redisjedis.setex(test,60,s);return s;}Overridepublic void close() throws Exception {super.close();jedis.close();}});stringDataStreamSource.print();env.execute(test);}
}