Zookeeper 分布式锁

什么是分布式锁?

分布式锁是一种用于在分布式系统中协调多个节点并确保互斥访问共享资源的机制。在分布式系统中,多个节点需要协同工作,但需要避免多个节点同时对共享资源进行修改或访问,以防止数据不一致或冲突。

常见的分布式锁算法包括:

(1)数据库锁:通过在分布式系统中共享一个数据库表来实现同步。进程或节点获取锁时,向数据库中插入一条记录,并返回其唯一标识符。其他进程或节点需要获取锁时,会检查数据库中是否有对应的记录,如果有,则说明已经存在该锁,需要等待。

(2)Redis锁:基于 Redis 实现的分布式锁。通过 setNX 命令实现,即在 set 之前检查是否已经存在该键,如果不存在则设置该键。

(3)ZooKeeper锁:基于 ZooKeeper 实现的分布式锁。通过 ZooKeeper 的 znode 节点来实现,进程或节点向 znode 节点发送请求获取锁,如果 znode 节点不存在,则创建该节点并返回其路径;如果 znode 节点已经存在,则说明该节点已经被占用,需要等待。

(4)文件锁:通过在分布式系统中共享一个文件来实现同步。进程或节点获取锁时,向文件中写入一个标记,其他进程或节点需要获取锁时,会检查文件中是否有对应的标记,如果有,则说明已经存在该锁,需要等待。

分布式锁使用场景

在单体项目中,JVM 中的锁即可完成需要。但是,微服务、分布式环境下,同一个服务可能部署在多台服务器上,多个 JVM 之间无法通过常用的 JVM 锁来完成同步操作,需要借用分布式锁来完成上锁、释放锁。

(1)避免重复操作:例如在订单服务中,需要根据日期来生成订单号流水,使用分布式锁可以避免多个节点同时生成相同的订单号。

(2)避免并发冲突:在分布式系统中,多个节点可能同时对共享资源进行修改,使用分布式锁可以确保只有一个节点能够对资源进行操作,防止并发冲突。

ZooKeeper 分布式锁实现原理

ZooKeeper 实现分布式锁的原理如下:

(1)利用 ZooKeeper 的临时节点特性:在 ZooKeeper 中,同一时刻不能有多个客户端创建同一个节点。因此,可以利用 ZooKeeper 的临时节点来实现分布式锁,每个客户端在获取锁时创建一个临时节点,释放锁时删除该节点。

(2)Watcher 机制:当代表锁资源的节点被删除时,ZooKeeper 可以触发 Watcher 来通知其他客户端,从而实现解锁和重新获取锁的操作。这种机制使得 ZooKeeper 分布式锁相对于其他分布式锁方案具有较好的优势,可以更加灵活地处理锁的获取和释放。

ZooKeeper 分布式锁实现方案

基于临时节点方案

该方案实现较为简单,逻辑为谁创建成功该节点,谁就持有锁,创建失败的自己进行阻塞。A线程先持有锁,B线程获取失败就会阻塞,同时对 ZooKeeper 节点设置监听,当 A 线程执行完操作后删除节点,触发监听器,B 线程此时解除阻塞,重新去获取锁。流程图如下:

Zookeeper 分布式锁

简单的完整示例代码:

package com.hxstrive.zookeeper.distributed_lock;

import org.apache.zookeeper.*;
import java.util.HashMap;      
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

/**
* 使用 ZooKeeper 实现分布式锁
* @author hxstrive.com
*/
public class ZkLockSimple {
   private static ZooKeeper zooKeeper;
   /** 锁节点路径 */
   private static final String LOCK_PATH = "/lock";
   /** 阻塞队列,用于线程间通信,当节点删除时,触发事件,向队列写入消息,另一个线程收到消息后再次尝试获取锁 */
   private final static LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();

   public static void main(String[] args) throws Exception {
       ZkLockSimple lock = new ZkLockSimple();

       // 【测试】
       // 启动十个线程对一个变量累加求值
       // 每个线程对值添加 100,结果:10 * 100 = 1000
       final Map<String, Integer> data = new HashMap<>();
       ExecutorService pool = Executors.newFixedThreadPool(10);
       for(int i = 0; i < 10; i++) {
           pool.execute(new Runnable() {
               @Override
               public void run() {
                   try {
                       // 获取锁
                       lock.lock();
                       int newVal = data.getOrDefault("my", 0) + 100;
                       data.put("my", newVal);
                   } finally {
                       // 释放锁
                       lock.unlock();
                   }
               }
           });
       }

       // 关闭线程池,等待所有线程执行完成
       pool.shutdown();
       while(!pool.isTerminated()) {}
       System.out.println("my = " + data.get("my"));
   }

   public ZkLockSimple() {
       try {
           CountDownLatch countDownLatch = new CountDownLatch(1);
           zooKeeper = new ZooKeeper("127.0.0.1:2181", 2000, new Watcher() {
               public void process(WatchedEvent watchedEvent) {
                   if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
                       countDownLatch.countDown(); // 连接成功
                   }

                   // 监听节点是否被删除
                   if (watchedEvent.getType() == Event.EventType.NodeDeleted && LOCK_PATH.equals(watchedEvent.getPath())) {
                       // 发送消息,告知等待者,/lock 节点已经被删除了,可以重新获取锁
                       queue.add(watchedEvent.getPath());
                   }
               }
           });
           System.out.println(zooKeeper);
           countDownLatch.await(); // 等待连接创建成功
           System.out.println("成功建立 ZooKeeper 连接...");
       } catch (Exception e) {
           e.printStackTrace();
       }
   }

   /**
    * 获取锁
    */
   public boolean lock() {
       try {
           // 创建临时节点,如果创建成功,则认为获取锁成功
           zooKeeper.create(LOCK_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
           // 对新添加的节点添加监听
           zooKeeper.addWatch(LOCK_PATH, AddWatchMode.PERSISTENT);
           System.out.println("成功获取到锁-" + Thread.currentThread().getName());
           return true;
       } catch (Exception e) {
           try {
               // 如果失败,则等待 /lock 节点被删除
               // 等待有消息放入队列
               queue.take();
           } catch (Exception e2) {}
           // 再次,尝试去获取锁
           return lock();
       }
   }

   /**
    * 释放锁
    */
   public void unlock() {
       try {
           zooKeeper.delete(LOCK_PATH, -1);
           System.out.println("释放锁-" + Thread.currentThread().getName());
       } catch (Exception e) {
           e.printStackTrace();
           System.out.println("释放锁失败");
       }
   }

}

运行示例,输出如下:

成功建立 ZooKeeper 连接...
成功获取到锁-pool-2-thread-2
释放锁-pool-2-thread-2
成功获取到锁-pool-2-thread-3
释放锁-pool-2-thread-3
成功获取到锁-pool-2-thread-8
释放锁-pool-2-thread-8
成功获取到锁-pool-2-thread-10
释放锁-pool-2-thread-10
成功获取到锁-pool-2-thread-7
释放锁-pool-2-thread-7
成功获取到锁-pool-2-thread-6
释放锁-pool-2-thread-6
成功获取到锁-pool-2-thread-4
释放锁-pool-2-thread-4
成功获取到锁-pool-2-thread-9
释放锁-pool-2-thread-9
成功获取到锁-pool-2-thread-1
释放锁-pool-2-thread-1
成功获取到锁-pool-2-thread-5
释放锁-pool-2-thread-5
my = 1000

注意,上面虽然实现了分布式锁,但是缺点是每次去竞争锁,都只会有一个线程拿到锁,当线程数庞大时会发生 “惊群” 现象,Zookeeper 节点可能会运行缓慢甚至宕机。这是因为其他线程没获取到锁时都会监听 /lockPath 节点,当 A 线程释放完毕,海量的线程都同时停止阻塞,去争抢锁,这种操作十分耗费资源,且性能大打折扣。

什么是 “惊群” 现象?

在计算机科学中,多线程中的“惊群”现象指的是当多个线程竞争某个共享资源时,其中一个线程的行为可能会导致其他线程被唤醒并竞争同一资源,从而导致性能下降或资源浪费的情况。

举个例子,假设有多个线程在等待同一个锁。当某个线程释放锁时,操作系统会选择一个等待线程来获得锁。然而,其他等待线程也会被唤醒并竞争锁,这可能会导致大量线程被唤醒,但最终只有一个线程能够获得锁,其他线程会重新进入等待状态,这种现象就称为“惊群”现象。

为了避免“惊群”现象,可以使用更精细的同步机制,如信号量、条件变量等,以减少不必要的唤醒和竞争。此外,也可以通过调整线程调度策略或使用更高效的同步机制来减少“惊群”现象的发生。

说说我的看法
全部评论(
没有评论
关于
本网站属于个人的非赢利性网站,转载的文章遵循原作者的版权声明,如果原文没有版权声明,请来信告知:hxstrive@outlook.com
公众号