网站推广郑州,绵阳做公司网站,微信网页版官网手机版,精湛的企业网站建设大纲
1.Curator的可重入锁的源码
2.Curator的非可重入锁的源码
3.Curator的可重入读写锁的源码
4.Curator的MultiLock源码
5.Curator的Semaphore源码 1.Curator的可重入锁的源码
(1)InterProcessMutex获取分布式锁
(2)InterProcessMutex的初始化
(3)InterProcessMutex.…大纲
1.Curator的可重入锁的源码
2.Curator的非可重入锁的源码
3.Curator的可重入读写锁的源码
4.Curator的MultiLock源码
5.Curator的Semaphore源码 1.Curator的可重入锁的源码
(1)InterProcessMutex获取分布式锁
(2)InterProcessMutex的初始化
(3)InterProcessMutex.acquire()尝试获取锁
(4)LockInternals.attemptLock()尝试获取锁
(5)不同客户端线程获取锁时的互斥实现
(6)同一客户端线程可重入加锁的实现
(7)客户端线程释放锁的实现
(8)客户端线程释放锁后其他线程获取锁的实现
(9)InterProcessMutex就是一个公平锁 (1)InterProcessMutex获取分布式锁
public class Demo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy new ExponentialBackoffRetry(1000, 3);CuratorFramework client CuratorFrameworkFactory.newClient(127.0.0.1:2181, 5000, 3000, retryPolicy);client.start();System.out.println(已经启动Curator客户端);//获取分布式锁InterProcessMutex lock new InterProcessMutex(client, /locks/myLock);lock.acquire();Thread.sleep(1000);lock.release();}
}
(2)InterProcessMutex的初始化
设置锁的节点路径basePath 初始化一个LockInternals对象实例。
public class InterProcessMutex implements InterProcessLock, RevocableInterProcessMutex {private final LockInternals internals;private final String basePath;private static final String LOCK_NAME lock-;...public InterProcessMutex(CuratorFramework client, String path) {this(client, path, new StandardLockInternalsDriver());}public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver) {this(client, path, LOCK_NAME, 1, driver);}//初始化InterProcessMutexInterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver) {//1.设置锁的节点路径basePath PathUtils.validatePath(path);//2.初始化一个LockInternals对象实例internals new LockInternals(client, driver, path, lockName, maxLeases);}
}public class LockInternals {private final LockInternalsDriver driver;private final String lockName;private volatile int maxLeases;private final WatcherRemoveCuratorFramework client;private final String basePath;private final String path;...LockInternals(CuratorFramework client, LockInternalsDriver driver, String path, String lockName, int maxLeases) {this.driver driver;this.lockName lockName;this.maxLeases maxLeases;this.client client.newWatcherRemoveCuratorFramework();this.basePath PathUtils.validatePath(path);this.path ZKPaths.makePath(path, lockName);}...
}
(3)InterProcessMutex.acquire()尝试获取锁
LockData是InterProcessMutex的一个静态内部类。一个线程对应一个LockData实例对象用来描述线程持有的锁的具体情况。多个线程对应的LockData存放在一个叫threadData的ConcurrentMap中。LockData中有一个原子变量lockCount用于锁的重入次数计数。 在执行InterProcessMutex的acquire()方法尝试获取锁时首先会尝试取出当前线程对应的LockData数据判断是否存在。如果存在则说明锁正在被当前线程重入重入次数自增后直接返回。如果不存在则调用LockInternals的attemptLock()方法尝试获取锁。默认情况下attemptLock()方法传入的等待获取锁的时间time -1。
public class InterProcessMutex implements InterProcessLock, RevocableInterProcessMutex {private final LockInternals internals;private final String basePath;private static final String LOCK_NAME lock-;//一个线程对应一个LockData数据对象private final ConcurrentMapThread, LockData threadData Maps.newConcurrentMap();...//初始化InterProcessMutexInterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver) {//设置锁的路径basePath PathUtils.validatePath(path);//初始化LockInternalsinternals new LockInternals(client, driver, path, lockName, maxLeases);}Overridepublic void acquire() throws Exception {//获取分布式锁会一直阻塞等待直到获取成功//相同的线程可以重入锁每一次调用acquire()方法都要匹配一个release()方法的调用if (!internalLock(-1, null)) {throw new IOException(Lost connection while trying to acquire lock: basePath);}}private boolean internalLock(long time, TimeUnit unit) throws Exception {//获取当前线程Thread currentThread Thread.currentThread();//获取当前线程对应的LockData数据LockData lockData threadData.get(currentThread);if (lockData ! null) {//可重入计算lockData.lockCount.incrementAndGet();return true;}//调用LockInternals.attemptLock()方法尝试获取锁默认情况下传入的time-1表示等待获取锁的时间String lockPath internals.attemptLock(time, unit, getLockNodeBytes());if (lockPath ! null) {//获取锁成功将当前线程 其创建的临时顺序节点路径封装成一个LockData对象LockData newLockData new LockData(currentThread, lockPath);//然后把该LockData对象存放到InterProcessMutex.threadData这个Map中threadData.put(currentThread, newLockData);return true;}return false;}//LockData是InterProcessMutex的一个静态内部类private static class LockData {final Thread owningThread;final String lockPath;final AtomicInteger lockCount new AtomicInteger(1);//用于锁的重入次数计数private LockData(Thread owningThread, String lockPath) {this.owningThread owningThread;this.lockPath lockPath;}}protected byte[] getLockNodeBytes() {return null;}...
}
(4)LockInternals.attemptLock()尝试获取锁
先创建临时节点再判断是否满足获取锁的条件。 步骤一首先调用LockInternalsDriver的createsTheLock()方法创建一个临时顺序节点。其中creatingParentContainersIfNeeded()表示级联创建forPath(path)表示创建的节点路径名称withMode(CreateMode.EPHEMERAL_SEQUENTIAL)表示临时顺序节点。 步骤二然后调用LockInternals的internalLockLoop()方法检查是否获取到了锁。在LockInternals的internalLockLoop()方法的while循环中会先获取排好序的客户端线程尝试获取锁时创建的临时顺序节点名称列表。然后获取当前客户端线程尝试获取锁时创建的临时顺序节点的名称再根据名称获取在节点列表中的位置 是否可以获取锁 前一个节点的路径也就是获取一个封装好这些信息的PredicateResults对象。 具体会根据节点名称获取当前线程创建的临时顺序节点在节点列表的位置然后会比较当前线程创建的节点的位置和maxLeases的大小。其中maxLeases代表了同时允许多少个客户端可以获取到锁默认是1。如果当前线程创建的节点的位置小则表示可以获取锁。如果当前线程创建的节点的位置大则表示获取锁失败。 获取锁成功则会中断LockInternals的internalLockLoop()方法的while循环然后向外返回当前客户端线程创建的临时顺序节点路径。接着在InterProcessMutex的internalLock()方法中会将当前线程 其创建的临时顺序节点路径封装成一个LockData对象然后把该LockData对象存放到InterProcessMutex.threadData这个Map中。 获取锁失败则通过PredicateResults对象先获取前一个节点路径名称。然后通过getData()方法获取前一个节点路径在zk的信息并添加Watcher监听。该Watcher监听主要是用来唤醒在LockInternals中被wait()阻塞的线程。添加完Watcher监听后便会调用wait()方法将当前线程挂起。 所以前一个节点发生变化时便会通知添加的Watcher监听。然后便会唤醒阻塞的线程继续执行internalLockLoop()方法的while循环。while循环又会继续获取排序的节点列表 判断当前线程是否已获取锁。
public class LockInternals {private final LockInternalsDriver driver;LockInternals(CuratorFramework client, LockInternalsDriver driver, String path, String lockName, int maxLeases) {this.driver driver;this.path ZKPaths.makePath(path, lockName);//生成要创建的临时节点路径名称...}...String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {//获取当前时间final long startMillis System.currentTimeMillis();//默认情况下millisToWaitnullfinal Long millisToWait (unit ! null) ? unit.toMillis(time) : null;//默认情况下localLockNodeBytes也是nullfinal byte[] localLockNodeBytes (revocable.get() ! null) ? new byte[0] : lockNodeBytes;int retryCount 0;String ourPath null;boolean hasTheLock false;//是否已经获取到锁boolean isDone false;//是否正在获取锁while (!isDone) {isDone true;//1.这里是关键性的加锁代码会去级联创建一个临时顺序节点ourPath driver.createsTheLock(client, path, localLockNodeBytes);//2.检查是否获取到了锁hasTheLock internalLockLoop(startMillis, millisToWait, ourPath);}if (hasTheLock) {return ourPath;}return null;}private final Watcher watcher new Watcher() {Overridepublic void process(WatchedEvent event) {//唤醒LockInternals中被wait()阻塞的线程client.postSafeNotify(LockInternals.this);}};//检查是否获取到了锁private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {boolean haveTheLock false;boolean doDelete false;...while ((client.getState() CuratorFrameworkState.STARTED) !haveTheLock) {//3.获取排好序的各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表ListString children getSortedChildren();//4.获取当前客户端线程尝试获取分布式锁时创建的临时顺序节点的名称String sequenceNodeName ourPath.substring(basePath.length() 1); // 1 to include the slash//5.获取当前线程创建的节点在节点列表中的位置 是否可以获取锁 前一个节点的路径名称PredicateResults predicateResults driver.getsTheLock(client, children, sequenceNodeName, maxLeases);if (predicateResults.getsTheLock()) {//获取锁成功//返回truehaveTheLock true;} else {//获取锁失败//获取前一个节点路径名称String previousSequencePath basePath / predicateResults.getPathToWatch();synchronized(this) {//use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak//通过getData()获取前一个节点路径在zk的信息并添加watch监听client.getData().usingWatcher(watcher).forPath(previousSequencePath);//默认情况下millisToWait nullif (millisToWait ! null) {millisToWait - (System.currentTimeMillis() - startMillis);startMillis System.currentTimeMillis();if (millisToWait 0) {doDelete true;//timed out - delete our nodebreak;}wait(millisToWait);//阻塞} else {wait();//阻塞}}}}...return haveTheLock;}ListString getSortedChildren() throws Exception {//获取排好序的各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表return getSortedChildren(client, basePath, lockName, driver);}public static ListString getSortedChildren(CuratorFramework client, String basePath, final String lockName, final LockInternalsSorter sorter) throws Exception {//获取各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表ListString children client.getChildren().forPath(basePath);//对节点名称进行排序ListString sortedList Lists.newArrayList(children);Collections.sort(sortedList,new ComparatorString() {Overridepublic int compare(String lhs, String rhs) {return sorter.fixForSorting(lhs, lockName).compareTo(sorter.fixForSorting(rhs, lockName));}});return sortedList;}...
}public class StandardLockInternalsDriver implements LockInternalsDriver {...//级联创建一个临时顺序节点Overridepublic String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception {String ourPath;//默认情况下传入的lockNodeBytesnullif (lockNodeBytes ! null) {ourPath client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);} else {//创建临时顺序节点ourPath client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);}return ourPath;}//获取当前线程创建的节点在节点列表中的位置以及是否可以获取锁Overridepublic PredicateResults getsTheLock(CuratorFramework client, ListString children, String sequenceNodeName, int maxLeases) throws Exception {//根据节点名称获取当前线程创建的临时顺序节点在节点列表中的位置int ourIndex children.indexOf(sequenceNodeName);validateOurIndex(sequenceNodeName, ourIndex);//maxLeases代表的是同时允许多少个客户端可以获取到锁//getsTheLock为true表示可以获取锁getsTheLock为false表示获取锁失败boolean getsTheLock ourIndex maxLeases;//获取当前节点需要watch的前一个节点路径String pathToWatch getsTheLock ? null : children.get(ourIndex - maxLeases);return new PredicateResults(pathToWatch, getsTheLock);}...
}
(5)不同客户端线程获取锁时的互斥实现
maxLeases代表了同时允许多少个客户端可以获取到锁默认值是1。能否获取锁的判断就是线程创建的节点的位置outIndex maxLeases。当线程1创建的节点在节点列表中排第一时满足outIndex 0 maxLeases 1可以获取锁。当线程2创建的节点再节点列表中排第二时不满足outIndex 1 maxLeases 1所以不能获取锁。从而实现线程1和线程2获取锁时的互斥。 (6)同一客户端线程可重入加锁的实现
客户端线程重复获取锁时会重复调用InterProcessMutex的internalLock()方法。在InterProcessMutex的internalLock()方法中线程第一次获取锁成功会创建一个LockData对象并存放在一个Map中。线程第二次获取锁时便会从这个Map中取出这个LockData对象并对LockData对象中的重入计数器lockCount进行递增接着就返回true。以此实现可重入加锁。 (7)客户端线程释放锁的实现
客户端线程释放锁时会调用InterProcessMutex的release()方法。 首先对LockData里的重入计数器进行递减。当重入计数器大于0时直接返回。当重入计数器为0时才执行下一步删除节点的操作。 然后删除客户端线程创建的临时顺序节点client.delete().guaranteed().forPath(ourPath)。
public class InterProcessMutex implements InterProcessLock, RevocableInterProcessMutex {private final LockInternals internals;private final ConcurrentMapThread, LockData threadData Maps.newConcurrentMap();...Overridepublic void release() throws Exception {//获取当前线程Thread currentThread Thread.currentThread();//获取当前线程对应的LockData对象LockData lockData threadData.get(currentThread);if (lockData null) {throw new IllegalMonitorStateException(You do not own the lock: basePath);}//1.首先对LockData里的重入计数器lockCount进行递减int newLockCount lockData.lockCount.decrementAndGet();if (newLockCount 0) {//当重入计数器大于0时直接返回return;}if (newLockCount 0) {throw new IllegalMonitorStateException(Lock count has gone negative for lock: basePath);}try {//2.当重入计数器为0时执行删除节点的操作internals.releaseLock(lockData.lockPath);} finally {threadData.remove(currentThread);}}...
}public class LockInternals {...final void releaseLock(String lockPath) throws Exception {client.removeWatchers();revocable.set(null);deleteOurPath(lockPath);}private void deleteOurPath(String ourPath) throws Exception {//删除节点client.delete().guaranteed().forPath(ourPath);}...
}
(8)客户端线程释放锁后其他线程获取锁的实现
由于在节点列表里排第二的节点对应的线程会监听排第一的节点而当持有锁的客户端线程释放锁后排第一的节点会被删除掉。所以在节点列表里排第二的节点对应的客户端便会收到zk的通知。于是会回调执行该线程添加的Watcher的process()方法也就是唤醒该线程让其继续执行while循环获取锁。
public class LockInternals {...private final Watcher watcher new Watcher() {Overridepublic void process(WatchedEvent event) {//唤醒LockInternals中被wait()阻塞的线程client.postSafeNotify(LockInternals.this);}};//检查是否获取到了锁private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {boolean haveTheLock false;boolean doDelete false;...while ((client.getState() CuratorFrameworkState.STARTED) !haveTheLock) {//3.获取排好序的各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表ListString children getSortedChildren();//4.获取当前客户端线程尝试获取分布式锁时创建的临时顺序节点的名称String sequenceNodeName ourPath.substring(basePath.length() 1); // 1 to include the slash//5.获取当前线程创建的节点在节点列表中的位置是否可以获取锁前一个节点的路径名称PredicateResults predicateResults driver.getsTheLock(client, children, sequenceNodeName, maxLeases);if (predicateResults.getsTheLock()) {//获取锁成功//返回truehaveTheLock true;} else {//获取锁失败//获取前一个节点路径名称String previousSequencePath basePath / predicateResults.getPathToWatch();synchronized(this) {//use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak//通过getData()获取前一个节点路径在zk的信息并添加watch监听client.getData().usingWatcher(watcher).forPath(previousSequencePath);//默认情况下millisToWait nullif (millisToWait ! null) {millisToWait - (System.currentTimeMillis() - startMillis);startMillis System.currentTimeMillis();if (millisToWait 0) {doDelete true;//timed out - delete our nodebreak;}wait(millisToWait);//阻塞} else {wait();//阻塞}}}}...return haveTheLock;}...
}
(9)InterProcessMutex就是一个公平锁
因为所有客户端线程都会创建一个顺序节点然后按申请锁的顺序进行排序。最后会依次按自己所在的排序来尝试获取锁实现了所有客户端排队获取锁。 2.Curator的非可重入锁的源码
(1)Curator的非可重入锁InterProcessSemaphoreMutex的使用
(2)Curator的非可重入锁InterProcessSemaphoreMutex的源码 (1)Curator的非可重入锁InterProcessSemaphoreMutex的使用
非可重入锁同一个时间只能有一个客户端线程获取到锁其他线程都要排队而且同一个客户端线程是不可重入加锁的。
public class Demo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy new ExponentialBackoffRetry(1000, 3);final CuratorFramework client CuratorFrameworkFactory.newClient(127.0.0.1:2181,//zk的地址5000,//客户端和zk的心跳超时时间超过该时间没心跳Session就会被断开3000,//连接zk时的超时时间retryPolicy);client.start();System.out.println(已经启动Curator客户端完成zk的连接);//非可重入锁InterProcessSemaphoreMutex lock new InterProcessSemaphoreMutex(client, /locks);lock.acquire();Thread.sleep(3000);lock.release();}
}
(2)Curator的非可重入锁InterProcessSemaphoreMutex的源码
Curator的非可重入锁是基于Semaphore来实现的也就是将Semaphore允许获取Lease的客户端线程数设置为1从而实现同一时间只能有一个客户端线程获取到Lease。
public class InterProcessSemaphoreMutex implements InterProcessLock {private final InterProcessSemaphoreV2 semaphore;private final WatcherRemoveCuratorFramework watcherRemoveClient;private volatile Lease lease;public InterProcessSemaphoreMutex(CuratorFramework client, String path) {watcherRemoveClient client.newWatcherRemoveCuratorFramework();this.semaphore new InterProcessSemaphoreV2(watcherRemoveClient, path, 1);}Overridepublic void acquire() throws Exception {//获取非可重入锁就是获取Semaphore的Leaselease semaphore.acquire();}Overridepublic boolean acquire(long time, TimeUnit unit) throws Exception {Lease acquiredLease semaphore.acquire(time, unit);if (acquiredLease null) {return false;}lease acquiredLease;return true;}Overridepublic void release() throws Exception {//释放非可重入锁就是释放Semaphore的LeaseLease lease this.lease;Preconditions.checkState(lease ! null, Not acquired);this.lease null;lease.close();watcherRemoveClient.removeWatchers();}
} 3.Curator的可重入读写锁的源码
(1)Curator的可重入读写锁InterProcessReadWriteLock的使用
(2)Curator的可重入读写锁InterProcessReadWriteLock的初始化
(3)InterProcessMutex获取锁的源码
(4)先获取读锁 后获取读锁的情形分析
(5)先获取读锁 后获取写锁的情形分析
(6)先获取写锁 后获取读锁的情形分析
(7)先获取写锁 再获取写锁的情形分析 (1)Curator的可重入读写锁InterProcessReadWriteLock的使用
public class Demo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy new ExponentialBackoffRetry(1000, 3);final CuratorFramework client CuratorFrameworkFactory.newClient(127.0.0.1:2181,//zk的地址5000,//客户端和zk的心跳超时时间超过该时间没心跳Session就会被断开3000,//连接zk时的超时时间retryPolicy);client.start();System.out.println(已经启动Curator客户端完成zk的连接);//读写锁InterProcessReadWriteLock lock new InterProcessReadWriteLock(client, /locks);lock.readLock().acquire();lock.readLock().release();lock.writeLock().acquire();lock.writeLock().release();}
}
(2)Curator的可重入读写锁InterProcessReadWriteLock的初始化
读锁和写锁都是基于可重入锁InterProcessMutex的子类来实现的。读锁和写锁的获取锁和释放锁逻辑就是使用InterProcessMutex的逻辑。
public class InterProcessReadWriteLock {private final InterProcessMutex readMutex;//读锁private final InterProcessMutex writeMutex;//写锁//must be the same length. LockInternals depends on itprivate static final String READ_LOCK_NAME __READ__;private static final String WRITE_LOCK_NAME __WRIT__;...//InterProcessReadWriteLock的初始化public InterProcessReadWriteLock(CuratorFramework client, String basePath, byte[] lockData) {lockData (lockData null) ? null : Arrays.copyOf(lockData, lockData.length);//写锁的初始化writeMutex new InternalInterProcessMutex(client,basePath,WRITE_LOCK_NAME,//写锁的lockName__WRIT__lockData,1,//写锁的maxLeasesnew SortingLockInternalsDriver() {Overridepublic PredicateResults getsTheLock(CuratorFramework client, ListString children, String sequenceNodeName, int maxLeases) throws Exception {return super.getsTheLock(client, children, sequenceNodeName, maxLeases);}});//读锁的初始化readMutex new InternalInterProcessMutex(client,basePath,READ_LOCK_NAME,//读锁的lockName__READ__lockData,Integer.MAX_VALUE,//读锁的maxLeasesnew SortingLockInternalsDriver() {Overridepublic PredicateResults getsTheLock(CuratorFramework client, ListString children, String sequenceNodeName, int maxLeases) throws Exception {return readLockPredicate(children, sequenceNodeName);}});}private static class InternalInterProcessMutex extends InterProcessMutex {private final String lockName;private final byte[] lockData;InternalInterProcessMutex(CuratorFramework client, String path, String lockName, byte[] lockData, int maxLeases, LockInternalsDriver driver) {super(client, path, lockName, maxLeases, driver);this.lockName lockName;this.lockData lockData;}...}public InterProcessMutex readLock() {return readMutex;}public InterProcessMutex writeLock() {return writeMutex;}...
}
(3)InterProcessMutex获取锁的源码
public class InterProcessMutex implements InterProcessLock, RevocableInterProcessMutex {private final LockInternals internals;private final String basePath;private static final String LOCK_NAME lock-;//一个线程对应一个LockData数据对象private final ConcurrentMapThread, LockData threadData Maps.newConcurrentMap();...//初始化InterProcessMutexInterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver) {//设置锁的路径basePath PathUtils.validatePath(path);//初始化LockInternalsinternals new LockInternals(client, driver, path, lockName, maxLeases);}Overridepublic void acquire() throws Exception {//获取分布式锁会一直阻塞等待直到获取成功//相同的线程可以重入锁每一次调用acquire()方法都要匹配一个release()方法的调用if (!internalLock(-1, null)) {throw new IOException(Lost connection while trying to acquire lock: basePath);}}private boolean internalLock(long time, TimeUnit unit) throws Exception {//获取当前线程Thread currentThread Thread.currentThread();//获取当前线程对应的LockData数据LockData lockData threadData.get(currentThread);if (lockData ! null) {//可重入计算lockData.lockCount.incrementAndGet();return true;}//调用LockInternals.attemptLock()方法尝试获取锁默认情况下传入的time-1表示等待获取锁的时间String lockPath internals.attemptLock(time, unit, getLockNodeBytes());if (lockPath ! null) {//获取锁成功将当前线程 其创建的临时顺序节点路径封装成一个LockData对象LockData newLockData new LockData(currentThread, lockPath);//然后把该LockData对象存放到InterProcessMutex.threadData这个Map中threadData.put(currentThread, newLockData);return true;}return false;}//LockData是InterProcessMutex的一个静态内部类private static class LockData {final Thread owningThread;final String lockPath;final AtomicInteger lockCount new AtomicInteger(1);//用于锁的重入次数计数private LockData(Thread owningThread, String lockPath) {this.owningThread owningThread;this.lockPath lockPath;}}protected byte[] getLockNodeBytes() {return null;}...
}public class LockInternals {private final LockInternalsDriver driver;LockInternals(CuratorFramework client, LockInternalsDriver driver, String path, String lockName, int maxLeases) {this.driver driver;this.path ZKPaths.makePath(path, lockName);//生成要创建的临时节点路径名称...}...String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {//获取当前时间final long startMillis System.currentTimeMillis();//默认情况下millisToWaitnullfinal Long millisToWait (unit ! null) ? unit.toMillis(time) : null;//默认情况下localLockNodeBytes也是nullfinal byte[] localLockNodeBytes (revocable.get() ! null) ? new byte[0] : lockNodeBytes;int retryCount 0;String ourPath null;boolean hasTheLock false;//是否已经获取到锁boolean isDone false;//是否正在获取锁while (!isDone) {isDone true;//1.这里是关键性的加锁代码会去级联创建一个临时顺序节点ourPath driver.createsTheLock(client, path, localLockNodeBytes);//2.检查是否获取到了锁hasTheLock internalLockLoop(startMillis, millisToWait, ourPath);}if (hasTheLock) {return ourPath;}return null;}private final Watcher watcher new Watcher() {Overridepublic void process(WatchedEvent event) {//唤醒LockInternals中被wait()阻塞的线程client.postSafeNotify(LockInternals.this);}};//检查是否获取到了锁private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {boolean haveTheLock false;boolean doDelete false;...while ((client.getState() CuratorFrameworkState.STARTED) !haveTheLock) {//3.获取排好序的各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表ListString children getSortedChildren();//4.获取当前客户端线程尝试获取分布式锁时创建的临时顺序节点的名称String sequenceNodeName ourPath.substring(basePath.length() 1); // 1 to include the slash//5.获取当前线程创建的节点在节点列表中的位置 是否可以获取锁 前一个节点的路径名称PredicateResults predicateResults driver.getsTheLock(client, children, sequenceNodeName, maxLeases);if (predicateResults.getsTheLock()) {//获取锁成功//返回truehaveTheLock true;} else {//获取锁失败//获取前一个节点路径名称String previousSequencePath basePath / predicateResults.getPathToWatch();synchronized(this) {//use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak//通过getData()获取前一个节点路径在zk的信息并添加watch监听client.getData().usingWatcher(watcher).forPath(previousSequencePath);//默认情况下millisToWait nullif (millisToWait ! null) {millisToWait - (System.currentTimeMillis() - startMillis);startMillis System.currentTimeMillis();if (millisToWait 0) {doDelete true;//timed out - delete our nodebreak;}wait(millisToWait);//阻塞} else {wait();//阻塞}}}}...return haveTheLock;}ListString getSortedChildren() throws Exception {//获取排好序的各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表return getSortedChildren(client, basePath, lockName, driver);}public static ListString getSortedChildren(CuratorFramework client, String basePath, final String lockName, final LockInternalsSorter sorter) throws Exception {//获取各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表ListString children client.getChildren().forPath(basePath);//对节点名称进行排序ListString sortedList Lists.newArrayList(children);Collections.sort(sortedList,new ComparatorString() {Overridepublic int compare(String lhs, String rhs) {return sorter.fixForSorting(lhs, lockName).compareTo(sorter.fixForSorting(rhs, lockName));}});return sortedList;}...
}public class StandardLockInternalsDriver implements LockInternalsDriver {...//级联创建一个临时顺序节点Overridepublic String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception {String ourPath;//默认情况下传入的lockNodeBytesnullif (lockNodeBytes ! null) {ourPath client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);} else {//创建临时顺序节点ourPath client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);}return ourPath;}//获取当前线程创建的节点在节点列表中的位置以及是否可以获取锁Overridepublic PredicateResults getsTheLock(CuratorFramework client, ListString children, String sequenceNodeName, int maxLeases) throws Exception {//根据节点名称获取当前线程创建的临时顺序节点在节点列表中的位置int ourIndex children.indexOf(sequenceNodeName);validateOurIndex(sequenceNodeName, ourIndex);//maxLeases代表的是同时允许多少个客户端可以获取到锁//getsTheLock为true表示可以获取锁getsTheLock为false表示获取锁失败boolean getsTheLock ourIndex maxLeases;//获取当前节点需要watch的前一个节点路径String pathToWatch getsTheLock ? null : children.get(ourIndex - maxLeases);return new PredicateResults(pathToWatch, getsTheLock);}...
}
(4)先获取读锁 后获取读锁的情形分析
当线程创建完临时顺序节点并获取到排好序的节点列表children后执行LockInternalsDriver的getsTheLock()方法获取能否成功加锁的信息时会执行到InterProcessReadWriteLock的readLockPredicate()方法。 由于此时firstWriteIndex Integer.MAX_VALUE所以无论多少线程尝试获取读锁都能满足ourIndex firstWriteIndex也就是getsTheLock的值会为true即表示可以获取读锁。 所以读读不互斥。
public class InterProcessReadWriteLock {...//sequenceNodeName是当前线程创建的临时顺序节点的路径名称private PredicateResults readLockPredicate(ListString children, String sequenceNodeName) throws Exception {if (writeMutex.isOwnedByCurrentThread()) {return new PredicateResults(null, true);}int index 0;int firstWriteIndex Integer.MAX_VALUE;int ourIndex -1;for (String node : children) {if (node.contains(WRITE_LOCK_NAME)) {firstWriteIndex Math.min(index, firstWriteIndex);} else if (node.startsWith(sequenceNodeName)) {//找出当前线程创建的临时顺序节点在节点列表中的位置用ourIndex表示ourIndex index;break;}index;}StandardLockInternalsDriver.validateOurIndex(sequenceNodeName, ourIndex);boolean getsTheLock (ourIndex firstWriteIndex);String pathToWatch getsTheLock ? null : children.get(firstWriteIndex);return new PredicateResults(pathToWatch, getsTheLock);}...
}
(5)先获取读锁 后获取写锁的情形分析
一.假设客户端线程1首先成功获取了读锁
那么在/locks目录下此时已经有了如下这个读锁的临时顺序节点。
/locks/43f3-4c2f-ba98-07a641d351f2-__READ__0000000004
二.然后另一个客户端线程2过来尝试获取写锁
于是该线程2会也会先在/locks目录下创建出如下写锁的临时顺序节点
/locks/9361-4fb7-8420-a8d4911d2c99-__WRIT__0000000005
接着该线程会获取/locks目录的当前子节点列表并进行排序结果如下
[43f3-4c2f-ba98-07a641d351f2-__READ__0000000004,
9361-4fb7-8420-a8d4911d2c99-__WRIT__0000000005]
然后会执行StandardLockInternalsDriver的getsTheLock()方法。由于初始化写锁时设置了其maxLeases是1而在StandardLockInternalsDriver的getsTheLock()方法中判断线程能成功获取写锁的依据是ourIndex maxLeases。即如果要成功获取写锁那么线程创建的节点在子节点列表里必须排第一。 而此时由于之前已有线程获取过一个读锁而后来又有其他线程往里面创建一个写锁的临时顺序节点。所以写锁的临时顺序节点在子节点列表children里排第二ourIndex是1。所以index 1 maxLeases 1条件不成立。 因此此时客户端线程2获取写锁失败。于是该线程便会给前一个节点添加一个监听器并调用wait()方法把自己挂起。如果前面一个节点被删除释放了锁那么该线程就会被唤醒从而再次尝试判断自己创建的节点是否在当前子节点列表中排第一。如果是那么就表示获取写锁成功。
public class StandardLockInternalsDriver implements LockInternalsDriver {...//获取当前线程创建的节点在节点列表中的位置以及是否可以获取锁Overridepublic PredicateResults getsTheLock(CuratorFramework client, ListString children, String sequenceNodeName, int maxLeases) throws Exception {//根据节点名称获取当前线程创建的临时顺序节点在节点列表中的位置int ourIndex children.indexOf(sequenceNodeName);validateOurIndex(sequenceNodeName, ourIndex);//maxLeases代表的是同时允许多少个客户端可以获取到锁//getsTheLock为true表示可以获取锁getsTheLock为false表示获取锁失败boolean getsTheLock ourIndex maxLeases;//获取当前节点需要watch的前一个节点路径String pathToWatch getsTheLock ? null : children.get(ourIndex - maxLeases);return new PredicateResults(pathToWatch, getsTheLock);}...
}
(6)先获取写锁 后获取读锁的情形分析
一.假设客户端线程1先获取了写锁
那么在/locks目录下此时已经有了如下这个写锁的临时顺序节点。
/locks/4383-466e-9b86-fda522ea061a-__WRIT__0000000006
二.然后另一个客户端线程2过来尝试获取读锁
于是该线程2会也会先在/locks目录下创建出如下读锁的临时顺序节点
/locks/5ba2-488f-93a4-f85fafd5cc32-__READ__0000000007
接着该线程会获取/locks目录的当前子节点列表并进行排序结果如下
[4383-466e-9b86-fda522ea061a-__WRIT__0000000006,
5ba2-488f-93a4-f85fafd5cc32-__READ__0000000007]
然后会执行LockInternalsDriver的getsTheLock()方法获取能否加锁的信息也就是会执行InterProcessReadWriteLock的readLockPredicate()方法。
public class InterProcessReadWriteLock {...//sequenceNodeName是当前线程创建的临时顺序节点的路径名称private PredicateResults readLockPredicate(ListString children, String sequenceNodeName) throws Exception {//如果是同一个客户端线程先加写锁再加读锁是可以成功的不会互斥if (writeMutex.isOwnedByCurrentThread()) {return new PredicateResults(null, true);}int index 0;int firstWriteIndex Integer.MAX_VALUE;int ourIndex -1;for (String node : children) {if (node.contains(WRITE_LOCK_NAME)) {firstWriteIndex Math.min(index, firstWriteIndex);} else if (node.startsWith(sequenceNodeName)) {//找出当前线程创建的临时顺序节点在节点列表中的位置用ourIndex表示ourIndex index;break;}index;}StandardLockInternalsDriver.validateOurIndex(sequenceNodeName, ourIndex);boolean getsTheLock (ourIndex firstWriteIndex);String pathToWatch getsTheLock ? null : children.get(firstWriteIndex);return new PredicateResults(pathToWatch, getsTheLock);}...
}
在InterProcessReadWriteLock的readLockPredicate()方法中如果是同一个客户端线程先获取写锁再获取读锁是不会互斥的。如果是不同的客户端线程线程1先获取写锁线程2再获取读锁则互斥。因为线程2执行readLockPredicate()方法在遍历子节点列表(children)时如果在子节点列表(children)中发现了一个写锁会设置firstWriteIndex0。而此时线程2创建的临时顺序节点的ourIndex1所以不满足ourIndex(1) firstWriteIndex(0)于是线程2获取读锁失败。 总结获取读锁时在当前线程创建的节点前面如果还有写锁对应的节点那么firstWriteIndex就会被重置为具体位置。如果没有写锁对应的节点那么firstWriteIndex就是MAX_VALUE。而只要firstWriteIndex为MAX_VALUE那么就可以不断允许获取读锁。 (7)先获取写锁 再获取写锁的情形分析
如果客户端线程1先获取了写锁然后后面客户端线程2来获取这个写锁。此时线程2会发现自己创建的节点排在节点列表中的第二不是第一。于是获取写锁失败进行阻塞挂起。等线程1释放了写锁后才会唤醒线程2继续尝试获取写锁。 4.Curator的MultiLock源码
(1)Curator的MultiLock的使用
(2)Curator的MultiLock的源码 (1)Curator的MultiLock的使用
public class Demo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy new ExponentialBackoffRetry(1000, 3);final CuratorFramework client CuratorFrameworkFactory.newClient(127.0.0.1:2181,//zk的地址5000,//客户端和zk的心跳超时时间超过该时间没心跳Session就会被断开3000,//连接zk时的超时时间retryPolicy);client.start();System.out.println(已经启动Curator客户端完成zk的连接);//MultiLockInterProcessLock lock1 new InterProcessMutex(client, /locks/lock_01);InterProcessLock lock2 new InterProcessMutex(client, /locks/lock_02);InterProcessLock lock3 new InterProcessMutex(client, /locks/lock_03);ListInterProcessLock locks new ArrayListInterProcessLock();locks.add(lock1);locks.add(lock2);locks.add(lock3);InterProcessMultiLock multiLock new InterProcessMultiLock(locks);}
}
(2)Curator的MultiLock的源码
MultiLock原理依次遍历获取每个锁阻塞直到获取每个锁为止然后返回true。如果过程中有报错依次释放已经获取到的锁然后返回false。
public class InterProcessMultiLock implements InterProcessLock {private final ListInterProcessLock locks;public InterProcessMultiLock(ListInterProcessLock locks) {this.locks ImmutableList.copyOf(locks);}//获取锁Overridepublic void acquire() throws Exception {acquire(-1, null);}Overridepublic boolean acquire(long time, TimeUnit unit) throws Exception {Exception exception null;ListInterProcessLock acquired Lists.newArrayList();boolean success true;//依次遍历获取每个锁阻塞直到获取每个锁为止for (InterProcessLock lock : locks) {try {if (unit null) {lock.acquire();acquired.add(lock);} else {if (lock.acquire(time, unit)) {acquired.add(lock);} else {success false;break;}}} catch (Exception e) {ThreadUtils.checkInterrupted(e);success false;exception e;}}if (!success) {for (InterProcessLock lock : reverse(acquired)) {try {lock.release();} catch (Exception e) {ThreadUtils.checkInterrupted(e);// ignore}}}if (exception ! null) {throw exception;}return success;}Overridepublic synchronized void release() throws Exception {Exception baseException null;for (InterProcessLock lock : reverse(locks)) {try {lock.release();} catch (Exception e) {ThreadUtils.checkInterrupted(e);if (baseException null) {baseException e;} else {baseException new Exception(baseException);}}}if (baseException ! null) {throw baseException;}}...
} 5.Curator的Semaphore源码
(1)基于InterProcessSemaphoreV2使用Semaphore
(2)InterProcessSemaphoreV2的初始化
(3)InterProcessSemaphoreV2.acquire()方法获取Semaphore的Lease
(4)InterProcessSemaphoreV2.returnLease()方法释放Semaphore的Lease Semaphore信号量就是指定同时可以有多个线程获取到锁。 (1)基于InterProcessSemaphoreV2使用Semaphore
public class Demo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy new ExponentialBackoffRetry(1000, 3);final CuratorFramework client CuratorFrameworkFactory.newClient(127.0.0.1:2181,//zk的地址5000,//客户端和zk的心跳超时时间超过该时间没心跳Session就会被断开3000,//连接zk时的超时时间retryPolicy);client.start();System.out.println(已经启动Curator客户端完成zk的连接);//获取SemaphoreInterProcessSemaphoreV2 semaphore new InterProcessSemaphoreV2(client, /semaphore, 3);Lease lease semaphore.acquire();//获取Semaphore的一个锁Thread.sleep(3000);semaphore.returnLease(lease);//向Semaphore返还一个锁}
}
(2)InterProcessSemaphoreV2的初始化
public class InterProcessSemaphoreV2 {private final WatcherRemoveCuratorFramework client;private final InterProcessMutex lock;private final String leasesPath;private volatile int maxLeases;...//maxLeases表示该实例可以允许获取的lease数量public InterProcessSemaphoreV2(CuratorFramework client, String path, int maxLeases) {this(client, path, maxLeases, null);}//初始化InterProcessSemaphoreV2时传入的参数path /semaphore参数maxLeases 3private InterProcessSemaphoreV2(CuratorFramework client, String path, int maxLeases, SharedCountReader count) {this.client client.newWatcherRemoveCuratorFramework();path PathUtils.validatePath(path);//锁的path是ZKPaths.makePath(path, LOCK_PARENT) /semaphore/locks//初始化一个InterProcessMutex分布式锁this.lock new InterProcessMutex(client, ZKPaths.makePath(path, LOCK_PARENT));this.maxLeases (count ! null) ? count.getCount() : maxLeases;//lease的path是/semaphore/leasesthis.leasesPath ZKPaths.makePath(path, LEASE_PARENT);...}...
}
(3)InterProcessSemaphoreV2.acquire()方法获取Semaphore的Lease
客户端线程尝试获取Semaphore的一个Lease。 步骤一首先会获取初始化时创建的锁InterProcessMutex
锁的路径是/semaphore/locks。当多个客户端线程同时执行acquire()获取Lease时只会有一个线程成功而其他线程会基于锁路径下的临时顺序节点来排队获取锁。 步骤二获取锁成功后才会尝试获取Semaphore的Lease
Lease的路径是/semaphore/leases。此时会先到/semaphore/leases目录下创建一个临时顺序节点然后会调用InterProcessSemaphoreV2的makeLease()方法创建一个Lease。这个Lease对象就是客户端线程成功获取Semaphore的一个Lease。 创建完Lease对象后接着会进入一个for循环会先获取/semaphore/leases目录下的所有临时顺序节点并添加监听。然后判断/semaphore/leases目录下节点的数量是否大于maxLeases。如果临时顺序节点的数量小于maxLeases那么说明当前客户端线程成功获取Semaphore的Lease于是退出循环。如果临时顺序节点的数量大于maxLeases那么当前客户端线程就要调用wait()进行阻塞等待。
public class InterProcessSemaphoreV2 {private final InterProcessMutex lock;private final Watcher watcher new Watcher() {Overridepublic void process(WatchedEvent event) {//唤醒在InterProcessSemaphoreV2对象中执行wait()而被阻塞的线程client.postSafeNotify(InterProcessSemaphoreV2.this);}};...public Lease acquire() throws Exception {CollectionLease leases acquire(1, 0, null);return leases.iterator().next();}public CollectionLease acquire(int qty, long time, TimeUnit unit) throws Exception {long startMs System.currentTimeMillis();boolean hasWait (unit ! null);long waitMs hasWait ? TimeUnit.MILLISECONDS.convert(time, unit) : 0;Preconditions.checkArgument(qty 0, qty cannot be 0);ImmutableList.BuilderLease builder ImmutableList.builder();boolean success false;try {while (qty-- 0) {int retryCount 0;long startMillis System.currentTimeMillis();boolean isDone false;while (!isDone) {switch (internalAcquire1Lease(builder, startMs, hasWait, waitMs)) {case CONTINUE: {isDone true;break;}case RETURN_NULL: {return null;}case RETRY_DUE_TO_MISSING_NODE: {if (!client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) {throw new KeeperException.NoNodeException(Sequential path not found - possible session loss);}//try againbreak;}}}}success true;} finally {if (!success) {returnAll(builder.build());}}return builder.build();}private InternalAcquireResult internalAcquire1Lease(ImmutableList.BuilderLease builder, long startMs, boolean hasWait, long waitMs) throws Exception {if (client.getState() ! CuratorFrameworkState.STARTED) {return InternalAcquireResult.RETURN_NULL;}if (hasWait) {long thisWaitMs getThisWaitMs(startMs, waitMs);if (!lock.acquire(thisWaitMs, TimeUnit.MILLISECONDS)) {return InternalAcquireResult.RETURN_NULL;}} else {//1.首先获取一个分布式锁lock.acquire();}Lease lease null;boolean success false;try {//2.尝试获取Semaphore的Lease创建一个临时顺序节点PathAndBytesableString createBuilder client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL);String path (nodeData ! null) ? createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME), nodeData) : createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME));String nodeName ZKPaths.getNodeFromPath(path);lease makeLease(path);...try {synchronized(this) {for(;;) {ListString children;//3.获取./lease目录下的所有临时顺序节点并添加watcher监听children client.getChildren().usingWatcher(watcher).forPath(leasesPath);...//4.判断临时顺序节点的数量是否大于maxLeases//maxLeases表示最多允许多少个客户端线程获取Semaphore的Leaseif (children.size() maxLeases) {//如果临时顺序节点的数量小于maxLeases//那么说明当前客户端线程成功获取Semaphore的Lease于是退出循环break;}//如果临时顺序节点的数量大于maxLeases//那么当前客户端线程就要调用wait()进行阻塞等待if (hasWait) {long thisWaitMs getThisWaitMs(startMs, waitMs);if (thisWaitMs 0) {return InternalAcquireResult.RETURN_NULL;}...wait(thisWaitMs);} else {...wait();}}success true;}} finally {if (!success) {returnLease(lease);}client.removeWatchers();}} finally {//释放掉之前获取的锁lock.release();}builder.add(Preconditions.checkNotNull(lease));return InternalAcquireResult.CONTINUE;}...
}
(4)InterProcessSemaphoreV2.returnLease()方法释放Semaphore的Lease
执行InterProcessSemaphoreV2的returnLease()方法时最终会执行makeLease()生成的Lease对象的close()方法而close()方法会删除在/semaphore/leases目录下创建的临时顺序节点。 当/semaphore/leases目录下的节点发生变化时那些对该目录进行Watcher监听的客户端就会收到通知于是就会执行Watcher里的process()方法唤醒执行wait()时被阻塞的线程从而让这些没有成功获取Semaphore的Lease的线程继续尝试获取Lease。
public class InterProcessSemaphoreV2 {...public void returnLease(Lease lease) {//执行Lease的close()方法CloseableUtils.closeQuietly(lease);}private Lease makeLease(final String path) {return new Lease() {Overridepublic void close() throws IOException {try {client.delete().guaranteed().forPath(path);} catch (KeeperException.NoNodeException e) {log.warn(Lease already released, e);} catch (Exception e) {ThreadUtils.checkInterrupted(e);throw new IOException(e);}}Overridepublic byte[] getData() throws Exception {return client.getData().forPath(path);}Overridepublic String getNodeName() {return ZKPaths.getNodeFromPath(path);}};}...
}
文章转载自: http://www.morning.eshixi.com.gov.cn.eshixi.com http://www.morning.dwzwm.cn.gov.cn.dwzwm.cn http://www.morning.phnbd.cn.gov.cn.phnbd.cn http://www.morning.qddtd.cn.gov.cn.qddtd.cn http://www.morning.ddjp.cn.gov.cn.ddjp.cn http://www.morning.nkcfh.cn.gov.cn.nkcfh.cn http://www.morning.c7510.cn.gov.cn.c7510.cn http://www.morning.grbp.cn.gov.cn.grbp.cn http://www.morning.xq3nk42mvv.cn.gov.cn.xq3nk42mvv.cn http://www.morning.czgfn.cn.gov.cn.czgfn.cn http://www.morning.crqpl.cn.gov.cn.crqpl.cn http://www.morning.mmhyx.cn.gov.cn.mmhyx.cn http://www.morning.lmrjn.cn.gov.cn.lmrjn.cn http://www.morning.sqmbb.cn.gov.cn.sqmbb.cn http://www.morning.tsqrc.cn.gov.cn.tsqrc.cn http://www.morning.tkfnp.cn.gov.cn.tkfnp.cn http://www.morning.wrbf.cn.gov.cn.wrbf.cn http://www.morning.amonr.com.gov.cn.amonr.com http://www.morning.dwrbn.cn.gov.cn.dwrbn.cn http://www.morning.drywd.cn.gov.cn.drywd.cn http://www.morning.zyrp.cn.gov.cn.zyrp.cn http://www.morning.tbqdm.cn.gov.cn.tbqdm.cn http://www.morning.redhoma.com.gov.cn.redhoma.com http://www.morning.jtmql.cn.gov.cn.jtmql.cn http://www.morning.gtnyq.cn.gov.cn.gtnyq.cn http://www.morning.ngmjn.cn.gov.cn.ngmjn.cn http://www.morning.cfnsn.cn.gov.cn.cfnsn.cn http://www.morning.sogou66.cn.gov.cn.sogou66.cn http://www.morning.pbzgj.cn.gov.cn.pbzgj.cn http://www.morning.clkjn.cn.gov.cn.clkjn.cn http://www.morning.bpmtl.cn.gov.cn.bpmtl.cn http://www.morning.hqrkq.cn.gov.cn.hqrkq.cn http://www.morning.sltfk.cn.gov.cn.sltfk.cn http://www.morning.wqgr.cn.gov.cn.wqgr.cn http://www.morning.lhsdf.cn.gov.cn.lhsdf.cn http://www.morning.yhjrc.cn.gov.cn.yhjrc.cn http://www.morning.sjbty.cn.gov.cn.sjbty.cn http://www.morning.srwny.cn.gov.cn.srwny.cn http://www.morning.ypbp.cn.gov.cn.ypbp.cn http://www.morning.yxbdl.cn.gov.cn.yxbdl.cn http://www.morning.trzmb.cn.gov.cn.trzmb.cn http://www.morning.ggtkk.cn.gov.cn.ggtkk.cn http://www.morning.xfncq.cn.gov.cn.xfncq.cn http://www.morning.jfjqs.cn.gov.cn.jfjqs.cn http://www.morning.mpsnb.cn.gov.cn.mpsnb.cn http://www.morning.rfyff.cn.gov.cn.rfyff.cn http://www.morning.ghfrb.cn.gov.cn.ghfrb.cn http://www.morning.mnlk.cn.gov.cn.mnlk.cn http://www.morning.wktbz.cn.gov.cn.wktbz.cn http://www.morning.hqgkx.cn.gov.cn.hqgkx.cn http://www.morning.zgqysw.cn.gov.cn.zgqysw.cn http://www.morning.rnnwd.cn.gov.cn.rnnwd.cn http://www.morning.gybnk.cn.gov.cn.gybnk.cn http://www.morning.ptmgq.cn.gov.cn.ptmgq.cn http://www.morning.gkxyy.cn.gov.cn.gkxyy.cn http://www.morning.gfprf.cn.gov.cn.gfprf.cn http://www.morning.rpfpx.cn.gov.cn.rpfpx.cn http://www.morning.rycbz.cn.gov.cn.rycbz.cn http://www.morning.ttxnj.cn.gov.cn.ttxnj.cn http://www.morning.tgxrm.cn.gov.cn.tgxrm.cn http://www.morning.kjsft.cn.gov.cn.kjsft.cn http://www.morning.yxplz.cn.gov.cn.yxplz.cn http://www.morning.fnmtc.cn.gov.cn.fnmtc.cn http://www.morning.ttcmdsg.cn.gov.cn.ttcmdsg.cn http://www.morning.hlzpb.cn.gov.cn.hlzpb.cn http://www.morning.phxns.cn.gov.cn.phxns.cn http://www.morning.cpnlq.cn.gov.cn.cpnlq.cn http://www.morning.qstkk.cn.gov.cn.qstkk.cn http://www.morning.lcxzg.cn.gov.cn.lcxzg.cn http://www.morning.hxbjt.cn.gov.cn.hxbjt.cn http://www.morning.slkqd.cn.gov.cn.slkqd.cn http://www.morning.yrhpg.cn.gov.cn.yrhpg.cn http://www.morning.rzmlc.cn.gov.cn.rzmlc.cn http://www.morning.rjqtq.cn.gov.cn.rjqtq.cn http://www.morning.wmhlz.cn.gov.cn.wmhlz.cn http://www.morning.sqlh.cn.gov.cn.sqlh.cn http://www.morning.plchy.cn.gov.cn.plchy.cn http://www.morning.qmwzz.cn.gov.cn.qmwzz.cn http://www.morning.xdwcg.cn.gov.cn.xdwcg.cn http://www.morning.qbpqw.cn.gov.cn.qbpqw.cn