博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
利用ZooKeeper简单实现分布式锁
阅读量:2489 次
发布时间:2019-05-11

本文共 6736 字,大约阅读时间需要 22 分钟。

1.分布式锁的由来:

在程序开发过程中不得不考虑的就是并发问题。在java中对于同一个jvm而言,jdk已经提供了lock和同步等但是在分布式情况下,往往存在多个进程对一些资源产生竞争关系,而这些进程往往在不同的机器上,这个时候jdk中提供的已经不能满足分布式锁顾明思议就是可以满足分布式情况下的并发锁。 下面我们讲解怎么利用zk实现分布式锁。

2.实现思路:

利用zk实现:

当很多进程需要访问共享资源时,我们可以通过zk来实现分布式锁。主要步骤是:

  1. 建立一个节点,假如名为:lock 。节点类型为持久节点(PERSISTENT)
  2. 每当进程需要访问共享资源时,会调用分布式锁的lock()或tryLock()方法获得锁,这个时候会在第一步创建的lock节点下建立相应的顺序子节点节点类型临时顺序节点(EPHEMERAL_SEQUENTIAL),通过组成特定的名字name+lock+顺序号
  3. 在建立子节点后,对lock下面的所有以name开头的子节点进行排序,判断刚刚建立的子节点顺序号是否是最小的节点假如是最小节点则获得该锁对资源进行访问
  4. 假如该节点不是最小节点,就获得该节点的上一顺序节点,并给该节点是否存在注册监听事件同时在这里阻塞等待监听事件的发生获得锁控制权
  5. 调用完共享资源后调用unlock()方法关闭zk进而可以引发监听事件释放该锁

实现的分布式锁是严格的按照顺序访问的并发锁

3.代码实现:

下面将讲解使用java实现分布式锁:

1、 建立类DistributedLock,实现java.util.concurrent.locks.Lock;和org.apache.zookeeper.Watcher接口
2、实现lock下面的方法:主要包括lock,tryLock,unlock等
3、实现watcher接口下的process方法。
4、在构造器中对zk进行初始化。

5、详细见代码注释

package cn.wpeace.zktest;import java.io.IOException;import java.util.ArrayList;import java.util.Collections;import java.util.List;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooDefs;import org.apache.zookeeper.ZooKeeper;import org.apache.zookeeper.Watcher.Event.KeeperState;import org.apache.zookeeper.data.Stat;/** * @author peace * */public class DistributedLock implements Lock, Watcher{    private ZooKeeper zk;    private String root = "/locks";//根    private String lockName;//竞争资源的标志    private String waitNode;//等待前一个锁    private String myZnode;//当前锁    private CountDownLatch latch;//计数器    private CountDownLatch connectedSignal=new CountDownLatch(1);    private int sessionTimeout = 30000;     /**     * 创建分布式锁,使用前请确认config配置的zookeeper服务可用     * @param config 192.168.1.127:2181     * @param lockName 竞争资源标志,lockName中不能包含单词_lock_     */    public DistributedLock(String config, String lockName){        this.lockName = lockName;        // 创建一个与服务器的连接         try {            zk = new ZooKeeper(config, sessionTimeout, this);            connectedSignal.await();            Stat stat = zk.exists(root, false);//此去不执行 Watcher            if(stat == null){                // 创建根节点                zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);             }        } catch (IOException e) {            throw new LockException(e);        } catch (KeeperException e) {            throw new LockException(e);        } catch (InterruptedException e) {            throw new LockException(e);        }    }    /**     * zookeeper节点的监视器     */    public void process(WatchedEvent event) {        //建立连接用        if(event.getState()==KeeperState.SyncConnected){            connectedSignal.countDown();            return;        }        //其他线程放弃锁的标志        if(this.latch != null) {              this.latch.countDown();          }    }    public void lock() {           try {            if(this.tryLock()){                System.out.println("Thread " + Thread.currentThread().getId() + " " +myZnode + " get lock true");                return;            }            else{                waitForLock(waitNode, sessionTimeout);//等待锁            }        } catch (KeeperException e) {            throw new LockException(e);        } catch (InterruptedException e) {            throw new LockException(e);        }     }    public boolean tryLock() {        try {            String splitStr = "_lock_";            if(lockName.contains(splitStr))                throw new LockException("lockName can not contains \\u000B");            //创建临时子节点            myZnode = zk.create(root + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);            System.out.println(myZnode + " is created ");            //取出所有子节点            List
subNodes = zk.getChildren(root, false); //取出所有lockName的锁 List
lockObjNodes = new ArrayList
(); for (String node : subNodes) { String _node = node.split(splitStr)[0]; if(_node.equals(lockName)){ lockObjNodes.add(node); } } Collections.sort(lockObjNodes); if(myZnode.equals(root+"/"+lockObjNodes.get(0))){ //如果是最小的节点,则表示取得锁 System.out.println(myZnode + "==" + lockObjNodes.get(0)); return true; } //如果不是最小的节点,找到比自己小1的节点 String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1); waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1);//找到前一个子节点 } catch (KeeperException e) { throw new LockException(e); } catch (InterruptedException e) { throw new LockException(e); } return false; } public boolean tryLock(long time, TimeUnit unit) { try { if(this.tryLock()){ return true; } return waitForLock(waitNode,time); } catch (Exception e) { e.printStackTrace(); } return false; } private boolean waitForLock(String lower, long waitTime) throws InterruptedException, KeeperException { Stat stat = zk.exists(root + "/" + lower,true);//同时注册监听。 //判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听 if(stat != null){ System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + lower); this.latch = new CountDownLatch(1); this.latch.await(waitTime, TimeUnit.MILLISECONDS);//等待,这里应该一直等待其他线程释放锁 this.latch = null; } return true; } public void unlock() { try { System.out.println("unlock " + myZnode); zk.delete(myZnode,-1); myZnode = null; zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } public void lockInterruptibly() throws InterruptedException { this.lock(); } public Condition newCondition() { return null; } public class LockException extends RuntimeException { private static final long serialVersionUID = 1L; public LockException(String e){ super(e); } public LockException(Exception e){ super(e); } }}

4.测试:

1、直接使用该类:

DistributedLock lock   = new DistributedLock("192.168.1.127:2181","lock");lock.lock();//共享资源 if(lock != null)     lock.unlock();

2、执行步骤:

  1. 启动zk:zkServer.sh start
  2. 启动测试代码
  3. 执行结果:
0506

转载地址:http://crbrb.baihongyu.com/

你可能感兴趣的文章
asp.net core结合Gitlab-CI实现自动化部署
查看>>
RDIFramework.NET ━ .NET快速信息化系统开发框架 V2.7 版本发布
查看>>
EasyNVR H5无插件摄像机直播解决方案前端解析之:关于直播页面和视频列表页面切换的问题...
查看>>
django搭建一个小型的服务器运维网站-拿来即用的bootstrap模板
查看>>
redis事务
查看>>
Java_基础语法之dowhile语句
查看>>
HDU 2175 汉诺塔IX
查看>>
PAT 甲级 1021 Deepest Root
查看>>
查找代码错误.java
查看>>
vc获取特殊路径(SpecialFolder)
查看>>
单例模式
查看>>
int(3)和int(11)区别
查看>>
201521123061 《Java程序设计》第十一周学习总结
查看>>
代码小思考
查看>>
Unity中的销毁方法
查看>>
ceph删除pool提示(you must first set the mon_allow_pool_delete config option to true)解决办法...
查看>>
2016-7-15(1)使用gulp构建一个项目
查看>>
CSS 设计指南(第3版) 初读笔记
查看>>
markdown学习/mou
查看>>
CentOS 搭建 LAMP服务器
查看>>