当前位置: 首页 > news >正文

哈尔滨 做网站企业培训有哪些方面

哈尔滨 做网站,企业培训有哪些方面,恋爱网站建设,相册管理网站模板下载失败背景: kafka的文件系统数据源可以支持精准一次的一致性,本文就从源码看下如何TextInputFormat如何支持状态的精准一致性 TextInputFormat源码解析 首先flink会把输入的文件进行切分,分成多个数据块的形式,每个数据源算子任务会被分配以读取…

背景:

kafka的文件系统数据源可以支持精准一次的一致性,本文就从源码看下如何TextInputFormat如何支持状态的精准一致性

TextInputFormat源码解析

首先flink会把输入的文件进行切分,分成多个数据块的形式,每个数据源算子任务会被分配以读取其中的数据块,但是不是所有的文件都能进行分块,判断文件是否可以进行分块的代码如下:

protected boolean testForUnsplittable(FileStatus pathFile) {if (getInflaterInputStreamFactory(pathFile.getPath()) != null) {unsplittable = true;return true;}return false;
}private InflaterInputStreamFactory<?> getInflaterInputStreamFactory(Path path) {String fileExtension = extractFileExtension(path.getName());if (fileExtension != null) {return getInflaterInputStreamFactory(fileExtension);} else {return null;}
}

在这里插入图片描述

后缀名称是.gz,.bzip2等的文件都没法切分,如果可以切分,切分的具体代码如下所示:

while (samplesTaken < numSamples && fileNum < allFiles.size()) {// make a split for the sample and use it to read a recordFileStatus file = allFiles.get(fileNum);
// 根据偏移量进行切分FileInputSplit split = new FileInputSplit(0, file.getPath(), offset, file.getLen() - offset, null);// we open the split, read one line, and take its lengthtry {open(split);if (readLine()) {totalNumBytes += this.currLen + this.delimiter.length;samplesTaken++;}} finally {// close the file stream, do not release the bufferssuper.close();}
// 偏移量迁移offset += stepSize;// skip to the next file, if necessarywhile (fileNum < allFiles.size()&& offset >= (file = allFiles.get(fileNum)).getLen()) {offset -= file.getLen();fileNum++;}
}

再来看一下TextInputFormat如何支持checkpoint操作,保存文件的偏移量的代码:

@Override
public void snapshotState(StateSnapshotContext context) throws Exception {super.snapshotState(context);checkState(checkpointedState != null, "The operator state has not been properly initialized.");int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();// 算子列表状态checkpointedState.clear();// 获取文件的当前读取的偏移List<T> readerState = getReaderState();try {for (T split : readerState) {//保存到检查点路径中checkpointedState.add(split);}} catch (Exception e) {checkpointedState.clear();throw new Exception("Could not add timestamped file input splits to to operator "+ "state backend of operator "+ getOperatorName()+ '.',e);}if (LOG.isDebugEnabled()) {LOG.debug("{} (taskIdx={}) checkpointed {} splits: {}.",getClass().getSimpleName(),subtaskIdx,readerState.size(),readerState);}
}

从检查点中恢复状态的代码如下:

public void initializeState(StateInitializationContext context) throws Exception {super.initializeState(context);checkState(checkpointedState == null, "The reader state has already been initialized.");// 初始化算子操作状态checkpointedState =context.getOperatorStateStore().getListState(new ListStateDescriptor<>("splits", new JavaSerializer<>()));int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();LOG.info("Restoring state for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIdx);splits = splits == null ? new PriorityQueue<>() : splits;for (T split : checkpointedState.get()) {//从检查点状态中恢复各个切分的分块splits.add(split);}
}
http://www.tj-hxxt.cn/news/118455.html

相关文章:

  • wordpress自豪的采用常州seo关键词排名
  • 互利互通网站建设外贸网站建站和推广
  • 网站架构图用什么做盐城seo营销
  • go 做视频网站网站优化策略分析
  • 《美食天下》网站的建设北京建站工作室
  • 深圳做网上商城网站广州seo排名收费
  • 万户 网站建设免费网站建设
  • 湖北潜江资讯网宁波seo在线优化方案
  • 宝塔面板做织梦网站想要网站推广版
  • dw做网站背景音乐百度权重怎么看
  • 建网站素材seo关键词排名技巧
  • wordpress短链接插件seo服务商
  • 找个美工做淘宝网站需要多少钱今日最新闻
  • 网站建设费用表广州网络推广平台
  • 梅林多丽工业区做网站湖南网站设计外包费用
  • 网页与网站设计说明百度seo排名帝搜软件
  • 免费行情app福州短视频seo平台
  • 网站备案ip查询网站查询小说风云榜
  • 郑州公司网站设计广州seo工资
  • 建设邮费自己的网站 要不要购买服务器的深圳华强北新闻最新消息今天
  • 青岛个人建站模板中国站长站
  • 兰州 网站建设百度搜索排名服务
  • 京住房和城乡建设委员会网站搜狗站长平台打不开
  • 网站开发技术可行性分析防疫优化措施
  • c2c wordpressseo技术公司
  • PHP 网站开发 重点知识免费推广的平台
  • 唐山正规做网站的公司b站推广网站2024年
  • 17网站一起做网店后台淘宝seo是什么意思啊
  • 做网站如何使用数据库苏州网站优化公司
  • 企业咨询图片aso搜索排名优化