漯河网站建设xknt,深圳做网站平台维护的公司,外包做网站怎么拿源代码,萧山做网站公司Omid Timestamp Oracle 组件实现原理
作用
生成全局单调递增的时间戳#xff0c;支持获取操作和崩溃恢复。
功能
1.生成全局单调递增的时间戳(支持崩溃恢复)apinext返回下一个时间戳getLast返回最后一个分配的时间戳(当前时间戳)实现方式TimestampOracleImpl单调递增的时间…Omid Timestamp Oracle 组件实现原理
作用
生成全局单调递增的时间戳支持获取操作和崩溃恢复。
功能
1.生成全局单调递增的时间戳(支持崩溃恢复)apinext返回下一个时间戳getLast返回最后一个分配的时间戳(当前时间戳)实现方式TimestampOracleImpl单调递增的时间戳在分配时间戳时如果当前的最大时间戳已经用完它会触发一个后台任务来更新最大时间戳next分配下一个时间戳。如果当前的最大时间戳已经用完它会等待后台任务分配新的时间戳WorldClockOracleImpl基于世界时间(System.timestamp)生成全局单调递增的时间戳
2.存储时间戳存储方式in memoryzkhbaseapiupdateMaxTimestamp更新最大时间戳getMaxTimestamp获取最大时间戳类
TimestampStorage
存储时间戳
方法: updateMaxTimestamp 更新最大时间戳 getMaxTimestamp 获取最大时间戳
实现类:
TimestampStorageInMemoryTimestampStorage in TimestampOracleImpl (org.apache.omid.tso) 使用内存存储ZKTimestampStorage (org.apache.omid.timestamp.storage) 使用 zk 存储HBaseTimestampStorage (org.apache.omid.timestamp.storage) 使用 hbase 存储InMemoryTimestampStorage in WorldClockOracleImpl (org.apache.omid.tso) 使用内存存储ZKTimestampStorage
通过 zk curator DistributedAtomicLong 实现 DistributedAtomicLong timestamp
方法 updateMaxTimestamp(): 执行 timestamp.compareAndSet(previousMaxTimestamp, newMaxTimestamp) getMaxTimestamp(): 返回 timestamp.get()
TimestampOracle
TimestampOracle 提供了一个服务用于生成单调递增的时间戳。
方法: initialize() 用于初始化时间戳服务。
next() 返回下一个时间戳。
getLast() 返回最后一个分配的时间戳(当前时间戳)
实现类
TimestampOracle (org.apache.omid.tso)TimestampOracleImpl (org.apache.omid.tso)PausableTimestampOracle (org.apache.omid.tso)WorldClockOracleImpl (org.apache.omid.tso)TimestampOracleImpl
initialize()
1.从存储中获取 maxTimestamp; 2.初始化 AllocateTimestampBatchTask 用于分配时间戳,并执行一次.
next()
生成下一个时间戳。
Override
public long next() {lastTimestamp CommitTable.MAX_CHECKPOINTS_PER_TXN;// 当前时间戳超过分配阈值时触发执行时间戳批次分配任务if (lastTimestamp nextAllocationThreshold) {// 设置下一次分配阈值为Long.MAX_VALUE确保只有一个线程会执行分配任务nextAllocationThreshold Long.MAX_VALUE; executor.execute(allocateTimestampsBatchTask);}// 当lastTimestamp超过当前已分配的最大时间戳时等待新一批时间戳被分配if (lastTimestamp maxTimestamp) {assert (maxTimestamp maxAllocatedTimestamp);// 自旋等待直到已分配的最大时间戳被更新while (maxAllocatedTimestamp maxTimestamp) {// 自旋}assert (maxAllocatedTimestamp maxTimestamp);// 更新当前最大时间戳并计算下次分配的阈值maxTimestamp maxAllocatedTimestamp;nextAllocationThreshold maxTimestamp - TIMESTAMP_REMAINING_THRESHOLD;assert (nextAllocationThreshold lastTimestamp nextAllocationThreshold maxTimestamp);assert (lastTimestamp maxTimestamp);}// 返回新生成的时间戳return lastTimestamp;
}AllocateTimestampBatchTask
定时任务批量更新存储系统中最大的时间戳每次预分配 TIMESTAMP_BATCH 数量的时间戳。
static final long TIMESTAMP_BATCH 10_000_000 * CommitTable.MAX_CHECKPOINTS_PER_TXN; // 10 million
long newMaxTimestamp previousMaxTimestamp TIMESTAMP_BATCH;WorldClockOracleImpl
基于世界时间的单调递增时间戳。
initialize 从 TimestampStorage 中获取最大的时间戳并启动定时任务来定期更新最大时间戳。
next 用于获取下一个时间戳。 如果当前时间戳可用则直接返回 否则等待定时任务分配新的时间戳。
getLast 方法返回最后一个时间戳。
AllocateTimestampBatchTask#run
定时任务预分配时间戳。
// 预测一个未来的时间窗口内允许的最大事务时间戳.
// 当前时间System.currentTimeMillis()加上一个预设的时间间隔TIMESTAMP_INTERVAL_MS然后乘以每毫秒允许的最大事务数MAX_TX_PER_MS
long newMaxTime (System.currentTimeMillis() TIMESTAMP_INTERVAL_MS) * MAX_TX_PER_MS;next()
public long next() {long currentMsFirstTimestamp System.currentTimeMillis() * MAX_TX_PER_MS; // 通过当前时间乘以每毫秒的最大事务数来计算当前毫秒的第一个时间戳lastTimestamp CommitTable.MAX_CHECKPOINTS_PER_TXN;if (lastTimestamp currentMsFirstTimestamp) { // 如果lastTimestamp大于等于当前毫秒的第一个时间戳直接返回lastTimestampreturn lastTimestamp;}// 当前 timestamp 的第一个时间戳 最大时间戳.这里sleep等待 allocate 线程进行分配if (currentMsFirstTimestamp maxTimestamp) { // Intentional race to reduce synchronization overhead in every access to maxTimestamp while (maxAllocatedTime currentMsFirstTimestamp) { // Waiting for the interval allocationtry {Thread.sleep(1000);} catch (InterruptedException e) {continue;}}assert (maxAllocatedTime maxTimestamp);maxTimestamp maxAllocatedTime;}lastTimestamp currentMsFirstTimestamp;return lastTimestamp;
}