怎么对网站的数据库做管理,wordpress修改成中文字体,速卖通,住建部2017建设工程合同范本背景
在flink中可以通过使用事务性数据汇实现精准一次的保证#xff0c;本文基于Kakfa的事务处理来看一下在Flink 内部如何实现基于两阶段提交协议的事务性数据汇.
flink kafka事务性数据汇的实现
1。首先在开始进行快照的时候也就是收到checkpoint通知的时候#xff0c;在…背景
在flink中可以通过使用事务性数据汇实现精准一次的保证本文基于Kakfa的事务处理来看一下在Flink 内部如何实现基于两阶段提交协议的事务性数据汇.
flink kafka事务性数据汇的实现
1。首先在开始进行快照的时候也就是收到checkpoint通知的时候在snapshot方法中会开启一个新的事务代码如下 public void snapshotState(FunctionSnapshotContext context) throws Exception {// this is like the pre-commit of a 2-phase-commit transaction// we are ready to commit and remember the transactioncheckState(currentTransactionHolder ! null,bug: no transaction object when performing state snapshot);long checkpointId context.getCheckpointId();LOG.debug({} - checkpoint {} triggered, flushing transaction {},name(),context.getCheckpointId(),currentTransactionHolder);preCommit(currentTransactionHolder.handle);// 调用kafkaProducer.flush();清理上一个事务的状态(注意不是提交),只是确保前一个事务的所有资源清理完毕pendingCommitTransactions.put(checkpointId, currentTransactionHolder);LOG.debug({} - stored pending transactions {}, name(), pendingCommitTransactions);
// 调用producer.beginTransaction();方法开启一个新的kafka事务currentTransactionHolder beginTransactionInternal();LOG.debug({} - started new transaction {}, name(), currentTransactionHolder);state.clear();state.add(new State(this.currentTransactionHolder,new ArrayList(pendingCommitTransactions.values()),userContext));}2.其次在JobManager通知检查点完成的通知方法,也就是notifyCheckpointComplete方法中提交事务
IteratorMap.EntryLong, TransactionHolderTXN pendingTransactionIterator pendingCommitTransactions.entrySet().iterator();Throwable firstError null;while (pendingTransactionIterator.hasNext()) {Map.EntryLong, TransactionHolderTXN entry pendingTransactionIterator.next();Long pendingTransactionCheckpointId entry.getKey();TransactionHolderTXN pendingTransaction entry.getValue();if (pendingTransactionCheckpointId checkpointId) {continue;}LOG.info({} - checkpoint {} complete, committing transaction {} from checkpoint {},name(),checkpointId,pendingTransaction,pendingTransactionCheckpointId);logWarningIfTimeoutAlmostReached(pendingTransaction);try {//调用producer.commitTransaction()方法提交事务commit(pendingTransaction.handle);} catch (Throwable t) {if (firstError null) {firstError t;}}LOG.debug({} - committed checkpoint transaction {}, name(), pendingTransaction);pendingTransactionIterator.remove();}if (firstError ! null) {throw new FlinkRuntimeException(Committing one of transactions failed, logging first encountered failure,firstError);}至此一个两阶段提交的flink事务性数据汇完成了这个事务性数据汇可以构成端到端一致性的一部分