本文共 6736 字,大约阅读时间需要 22 分钟。
在程序开发过程中不得不考虑的就是并发问题。在java中对于同一个jvm而言,jdk已经提供了lock和同步等
。但是在分布式情况下
,往往存在
多个进程对一些资源产生竞争关系
,而这些进程往往在不同的机器上
,这个时候jdk中提供的已经不能满足
。分布式锁
顾明思议就是可以满足分布式情况下的并发锁
。 下面我们讲解怎么利用zk实现分布式锁。
利用zk实现:
当很多进程需要访问共享资源时,我们可以通过zk来实现分布式锁。主要步骤是:lock
。节点类型为持久节点
(PERSISTENT)调用分布式锁的lock()或tryLock()方法获得锁
,这个时候会在
第一步创建的lock节点下建立相应的顺序子节点
,节点类型
为临时顺序节点
(EPHEMERAL_SEQUENTIAL),通过组成特定的名字name+lock+顺序号
。以name开头的子节点进行排序
,判断刚刚建立
的子节点顺序号是否是最小的节点
,假如是最小节点
,则获得该锁
,对资源进行访问
。不是最小节点
,就获得该节点的上一顺序节点
,并给该节点是否存在注册监听事件
。同时在这里阻塞
。等待监听事件的发生
,获得锁控制权
。调用完共享资源后
,调用unlock()方法
,关闭zk
,进而可以引发监听事件
,释放该锁
。实现的分布式锁是严格的按照顺序访问的并发锁。
下面将讲解使用java实现分布式锁:
1、 建立类DistributedLock,实现java.util.concurrent.locks.Lock;和org.apache.zookeeper.Watcher接口 2、实现lock下面的方法:主要包括lock,tryLock,unlock等 3、实现watcher接口下的process方法。 4、在构造器中对zk进行初始化。5、详细见代码注释
package cn.wpeace.zktest;import java.io.IOException;import java.util.ArrayList;import java.util.Collections;import java.util.List;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooDefs;import org.apache.zookeeper.ZooKeeper;import org.apache.zookeeper.Watcher.Event.KeeperState;import org.apache.zookeeper.data.Stat;/** * @author peace * */public class DistributedLock implements Lock, Watcher{ private ZooKeeper zk; private String root = "/locks";//根 private String lockName;//竞争资源的标志 private String waitNode;//等待前一个锁 private String myZnode;//当前锁 private CountDownLatch latch;//计数器 private CountDownLatch connectedSignal=new CountDownLatch(1); private int sessionTimeout = 30000; /** * 创建分布式锁,使用前请确认config配置的zookeeper服务可用 * @param config 192.168.1.127:2181 * @param lockName 竞争资源标志,lockName中不能包含单词_lock_ */ public DistributedLock(String config, String lockName){ this.lockName = lockName; // 创建一个与服务器的连接 try { zk = new ZooKeeper(config, sessionTimeout, this); connectedSignal.await(); Stat stat = zk.exists(root, false);//此去不执行 Watcher if(stat == null){ // 创建根节点 zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); } } catch (IOException e) { throw new LockException(e); } catch (KeeperException e) { throw new LockException(e); } catch (InterruptedException e) { throw new LockException(e); } } /** * zookeeper节点的监视器 */ public void process(WatchedEvent event) { //建立连接用 if(event.getState()==KeeperState.SyncConnected){ connectedSignal.countDown(); return; } //其他线程放弃锁的标志 if(this.latch != null) { this.latch.countDown(); } } public void lock() { try { if(this.tryLock()){ System.out.println("Thread " + Thread.currentThread().getId() + " " +myZnode + " get lock true"); return; } else{ waitForLock(waitNode, sessionTimeout);//等待锁 } } catch (KeeperException e) { throw new LockException(e); } catch (InterruptedException e) { throw new LockException(e); } } public boolean tryLock() { try { String splitStr = "_lock_"; if(lockName.contains(splitStr)) throw new LockException("lockName can not contains \\u000B"); //创建临时子节点 myZnode = zk.create(root + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(myZnode + " is created "); //取出所有子节点 ListsubNodes = zk.getChildren(root, false); //取出所有lockName的锁 List lockObjNodes = new ArrayList (); for (String node : subNodes) { String _node = node.split(splitStr)[0]; if(_node.equals(lockName)){ lockObjNodes.add(node); } } Collections.sort(lockObjNodes); if(myZnode.equals(root+"/"+lockObjNodes.get(0))){ //如果是最小的节点,则表示取得锁 System.out.println(myZnode + "==" + lockObjNodes.get(0)); return true; } //如果不是最小的节点,找到比自己小1的节点 String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1); waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1);//找到前一个子节点 } catch (KeeperException e) { throw new LockException(e); } catch (InterruptedException e) { throw new LockException(e); } return false; } public boolean tryLock(long time, TimeUnit unit) { try { if(this.tryLock()){ return true; } return waitForLock(waitNode,time); } catch (Exception e) { e.printStackTrace(); } return false; } private boolean waitForLock(String lower, long waitTime) throws InterruptedException, KeeperException { Stat stat = zk.exists(root + "/" + lower,true);//同时注册监听。 //判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听 if(stat != null){ System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + lower); this.latch = new CountDownLatch(1); this.latch.await(waitTime, TimeUnit.MILLISECONDS);//等待,这里应该一直等待其他线程释放锁 this.latch = null; } return true; } public void unlock() { try { System.out.println("unlock " + myZnode); zk.delete(myZnode,-1); myZnode = null; zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } public void lockInterruptibly() throws InterruptedException { this.lock(); } public Condition newCondition() { return null; } public class LockException extends RuntimeException { private static final long serialVersionUID = 1L; public LockException(String e){ super(e); } public LockException(Exception e){ super(e); } }}
1、直接使用该类:
DistributedLock lock = new DistributedLock("192.168.1.127:2181","lock");lock.lock();//共享资源 if(lock != null) lock.unlock();
2、执行步骤:
0506
转载地址:http://crbrb.baihongyu.com/