wordpress原创保护,seo优化推广技巧,广东研发网站建设平台,二手房地产中介网站建设1、实时查询维表
实时查询维表是指用户在 Flink 算子中直接访问外部数据库#xff0c;比如用 MySQL 来进行关联#xff0c;这种方式是同步方式#xff0c;数据保证是最新的。但是#xff0c;当我们的流计算数据过大#xff0c;会对外 部系统带来巨大的访问压力#xff0…1、实时查询维表
实时查询维表是指用户在 Flink 算子中直接访问外部数据库比如用 MySQL 来进行关联这种方式是同步方式数据保证是最新的。但是当我们的流计算数据过大会对外 部系统带来巨大的访问压力一旦出现比如连接失败、线程池满等情况由于我们是同步调用所以一般会导致线程阻塞、Task 等待数据返回影响整体任务的吞吐量。而且这种方案对外部系统的 QPS 要求较高在大数据实时计算场景下QPS 远远高于普通的后台系统峰值高达十万到几十万整体作业瓶颈转移到外部系统 public class DimSync extends RichMapFunctionfplOverview, String {private static final Logger LOGGER LoggerFactory.getLogger(DimSync.class);private Connection conn null;public void open(Configuration parameters) throws Exception {super.open(parameters);conn DriverManager.getConnection(jdbc:test:3306/mysqldb?characterEncodingUTF-8, root, qyllt1314#);}Overridepublic String map(fplOverview fplOverview) throws Exception {JSONObject jsonObject JSONObject.parseObject(fplOverview.toJson());String dp_id jsonObject.getString(dp_id);//根据 dp_id 查询 上周的 fpl_amount,ywPreparedStatement pst conn.prepareStatement(select max(fpl_amount) as fpl_amount,max(yearweek(datatime)) as yw \n from fpl_overview \n where datatime date_sub(curdate(), interval weekday(curdate()) 7 Day) # 上周第一天\n and datatime date_sub(curdate(), interval weekday(curdate()) 0 Day) # 上周最后一天1 \n and dp_id ? \n group by dp_id);pst.setString(1,dp_id);ResultSet resultSet pst.executeQuery();String fpl_amount null;String yw null ;while (resultSet.next()){fpl_amount resultSet.getString(1);yw resultSet.getString(2);}pst.close();jsonObject.put(lastweek_fpl_amount,fpl_amount);jsonObject.put(lastweek_yw,yw)return jsonObject.toString();}public void close() throws Exception {super.close();conn.close();}2、LRU 缓存 (flink 异步Id)
利用 Flink 的 RichAsyncFunction 读取 mysql 的数据到缓存中我们在关联维度表时先去查询缓存如果缓存中不存在这条数据就利用客户端去查询 mysql然后插入到缓存中。 public class JDBCAsyncFunction extends RichAsyncFunctionfplOverview, JsonObject {private SQLClient client;Overridepublic void open(Configuration parameters) throws Exception {Vertx vertx Vertx.vertx(new VertxOptions().setWorkerPoolSize(10).setEventLoopPoolSize(10));JsonObject config new JsonObject().put(url, jdbc:mysql://rm-bp161be65d56kbt4nzo.mysql.rds.aliyuncs.com:3306/mysqldb?characterEncodingUTF-8;useSSLfalse).put(driver_class, com.mysql.cj.jdbc.Driver).put(max_pool_size, 10).put(user, root).put(password, );client JDBCClient.createShared(vertx, config);}Overridepublic void close() throws Exception {client.close();}Overridepublic void asyncInvoke(fplOverview fplOverview, ResultFutureJsonObject resultFuture) throws Exception {client.getConnection(conn - {if (conn.failed()) {return;}final SQLConnection connection conn.result();// 执行sqlconnection.query(select max(fpl_amount) as fpl_amount,max(yearweek(datatime)) as yw \n from fpl_overview \n where datatime date_sub(curdate(), interval weekday(curdate()) 7 Day) # 上周第一天\n and datatime date_sub(curdate(), interval weekday(curdate()) 0 Day) # 上周最后一天1 \n and dp_id fplOverview.getDp_id() group by dp_id , res2 - {ResultSet rs new ResultSet();if (res2.succeeded()) {rs res2.result();}else{System.out.println(查询数据库出错);}ListJsonObject stores new ArrayList();for (JsonObject json : rs.getRows()) {stores.add(json);}connection.close();resultFuture.complete(stores);});});}}
3、预加载全量mysql数据
预加载全量mysql数据 使用 ScheduledExecutorService 每隔 5 分钟拉取一次维表数据这种方式适用于那些实时场景不是很高维表数据较小的场景
public class WholeLoad extends RichMapFunctionfplOverview,String {private static final Logger LOGGER LoggerFactory.getLogger(WholeLoad.class);// 定义map的结果key为关联字段private static MapString,String cache ;Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);cache new HashMap();ScheduledExecutorService executor Executors.newScheduledThreadPool(2);executor.scheduleAtFixedRate(new Runnable() {Overridepublic void run() {try {load();} catch (Exception e) {e.printStackTrace();}}},0,5, TimeUnit.MINUTES); //从现在开始每隔5分钟查询数据}Overridepublic String map(fplOverview fplOverview) throws Exception {JSONObject jsonObject JSONObject.parseObject(fplOverview.toJson());String dp_id jsonObject.getString(dp_id);// 获取对应id的结果String rs cache.get(dp_id);JSONObject rsObject JSONObject.parseObject(rs);jsonObject.putAll(rsObject);return jsonObject.toString();}public void load() throws Exception {Class.forName(com.mysql.jdbc.Driver);Connection con DriverManager.getConnection(jdbc:mysql://test:3306/mysqldb?characterEncodingUTF-8, root, qyllt1314#);// 执行查询的SQLPreparedStatement statement con.prepareStatement(select dp_id,max(fpl_amount) as fpl_amount,max(yearweek(datatime)) as yw \n from fpl_overview \n where datatime date_sub(curdate(), interval weekday(curdate()) 7 Day) # 上周第一天\n and datatime date_sub(curdate(), interval weekday(curdate()) 0 Day) # 上周最后一天1 \n group by dp_id);ResultSet rs statement.executeQuery();while (rs.next()) {// 查询结果放入缓存String dp_id rs.getString(dp_id);String fpl_amount rs.getString(fpl_amount);String yw rs.getString(yw);JSONObject jsonObject JSONObject.parseObject({});jsonObject.put(lastweek_fpl_amount,fpl_amount);jsonObject.put(lastweek_yw,yw);cache.put(dp_id,jsonObject.toString());}System.out.println(数据输出测试:cache.toString());con.close();}
}