ZooKeeper 实现分布式锁
ZooKeeper 是一个典型的分布式数据一致性处理方案,分布式应用程序可以基于 ZooKeeper 实现诸如数据发布/订阅、负载均衡、分布式协调/通知、集群管理、Master 选举、分布式锁等功能。
节点
在详情 ZooKeeper 分布式锁前需要先理解一下 ZooKeeper 中节点(Znode),ZooKeeper 的数据存储数据模型是一棵树(Znode Tree),由斜杠(/)的进行分割的路径,就是一个 Znode(如 /locks/my_lock)。每个 Znode 上都会保存自己的数据内容,同时还会保存一系列属性信息。
Znode 又分为以下四种类型:
| 类型 | 形容 |
|---|---|
| 持久节点 | 节点创立后,会一直存在,不会因用户端会话失效而删除 |
| 持久顺序节点 | 基本特性与持久节点一致,创立节点的过程中,ZooKeeper 会在其名字后自动追加一个单调增长的数字后缀,作为新的节点名 |
| 临时节点 | 用户端会话失效或者连接关闭后,该节点会被自动删除 |
| 临时顺序节点 | 基本特性与临时节点一致,创立节点的过程中,ZooKeeper 会在其名字后自动追加一个单调增长的数字后缀,作为新的节点名 |
锁原理
ZooKeeper 分布式锁是基于 临时顺序节点 来实现的,锁可了解为 ZooKeeper 上的一个节点,当需要获取锁时,就在这个锁节点下创立一个临时顺序节点。当存在多个用户端同时来获取锁,就按顺序依次创立多个临时顺序节点,但只有排列序号是第一的那个节点能获取锁成功,其余节点则按顺序分别监听前一个节点的变化,当被监听者释放锁时,监听者即可以马上取得锁。
而且用临时顺序节点的另外一个意图是假如某个用户端创立临时顺序节点后,自己意外宕机了也没关系,ZooKeeper 感知到某个用户端宕机后会自动删除对应的临时顺序节点,相当于自动释放锁。
lock
如上图:ClientA 和 ClientB 同时想获取锁,所以都在 locks 节点下创立了一个临时节点 1 和 2,而 1 是当前 locks 节点下排列序号第一的节点,所以 ClientA 获取锁成功,而 ClientB 处于等待状态,这时 ZooKeeper 中的 2 节点会监听 1 节点,当 1节点锁释放(节点被删除)时,2 就变成了 locks 节点下排列序号第一的节点,这样 ClientB 就获取锁成功了。
代码测试
请确保 ZooKeeper 服务已启动,ZooKeeper 的搭建可参考 Kafka 集群 中的 ZooKeeper 集群部分
以下是基于 C# 的测试,Java 可使用 Curator 框架,实现原理和上面形容是一致的,有兴趣可以看看源码,应该也不难了解。
创立 .NET Core 控制台程序
NuGet 安装 ZooKeeperNetEx.Recipes
创立 ZooKeeper Client
private const int CONNECTION_TIMEOUT = 50000;private const string CONNECTION_STRING = "127.0.0.1:2181";private ZooKeeper CreateClient(){ var zooKeeper = new ZooKeeper(CONNECTION_STRING, CONNECTION_TIMEOUT, NullWatcher.Instance); Stopwatch sw = new Stopwatch(); sw.Start(); while (sw.ElapsedMilliseconds < CONNECTION_TIMEOUT) { var state = zooKeeper.getState(); if (state == ZooKeeper.States.CONNECTED || state == ZooKeeper.States.CONNECTING) { break; } } sw.Stop(); return zooKeeper;}class NullWatcher : Watcher{ public static readonly NullWatcher Instance = new NullWatcher(); private NullWatcher() { } public override Task process(WatchedEvent @event) { return Task.CompletedTask; }}增加 Lock 方法
/// <summary>/// 加锁/// </summary>/// <param name="key">加锁的节点名</param>/// <param name="lockAcquiredAction">加锁成功后需要执行的逻辑</param>/// <param name="lockReleasedAction">锁释放后需要执行的逻辑,可为空</param>/// <returns></returns>public async Task Lock(string key, Action lockAcquiredAction, Action lockReleasedAction = null){ // 获取 ZooKeeper Client ZooKeeper keeper = CreateClient(); // 指定锁节点 WriteLock writeLock = new WriteLock(keeper, $"/{key}", null); var lockCallback = new LockCallback(() => { lockAcquiredAction.Invoke(); writeLock.unlock(); }, lockReleasedAction); // 绑定锁获取和释放的监听对象 writeLock.setLockListener(lockCallback); // 获取锁(获取失败时会监听上一个临时节点) await writeLock.Lock();}class LockCallback : LockListener{ private readonly Action _lockAcquiredAction; private readonly Action _lockReleasedAction; public LockCallback(Action lockAcquiredAction, Action lockReleasedAction) { _lockAcquiredAction = lockAcquiredAction; _lockReleasedAction = lockReleasedAction; } /// <summary> /// 获取锁成功回调 /// </summary> /// <returns></returns> public Task lockAcquired() { _lockAcquiredAction?.Invoke(); return Task.FromResult(0); } /// <summary> /// 释放锁成功回调 /// </summary> /// <returns></returns> public Task lockReleased() { _lockReleasedAction?.Invoke(); return Task.FromResult(0); }}多线程模拟测试
static async Task RunAsync(){ Parallel.For(1, 10, async (i) => { await new ZooKeeprDistributedLock().Lock("locks", () => { Console.WriteLine($"第{i}个请求,获取锁成功:{DateTime.Now},线程Id:{Thread.CurrentThread.ManagedThreadId}"); Thread.Sleep(1000); // 业务逻辑... }, () => { Console.WriteLine($"第{i}个请求,释放锁成功:{DateTime.Now},线程Id:{Thread.CurrentThread.ManagedThreadId}"); Console.WriteLine("-------------------------------"); }); }); await Task.CompletedTask;}
result
尽管模拟的是多线程并行执行,但最终都会依赖锁的获取和释放而串行执行实际业务逻辑。
参考链接
- Apache ZooKeeper .NET async Client
- 七张图彻底讲清楚ZooKeeper分布式锁的实现原理
1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长!
2. 分享目的仅供大家学习和交流,您必须在下载后24小时内删除!
3. 不得使用于非法商业用途,不得违反国家法律。否则后果自负!
4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系管理员处理!
6. 本站资源售价只是摆设,本站源码仅提供给会员学习使用!
7. 如遇到加密压缩包,请使用360解压,如遇到无法解压的请联系管理员
开心源码网 » ZooKeeper 实现分布式锁