Zookeeper使用手册
本文主要介绍:
1、zookeeper的概念和基本用法。
2、如何通过Curator这个Java API客户端库访问zookeeper客户端。
1. Zookeeper概念
Zookeeper 是 Apache Hadoop 项目下的一个子项目,是一个树形目录服务。
Zookeeper 是一个分布式的、开源的分布式应用程序的协调服务。
Zookeeper 提供的主要功能包括:
2. 数据类型
ZooKeeper 是一个树形目录服务,其数据模型和Unix的文件系统目录树很类似,拥有一个层次化结构。
这里面的每一个节点都被称为: ZNode,每个节点上都会保存自己的数据和节点信息。
节点可以拥有子节点,同时也允许少量(1MB)数据存储在该节点之下。
节点可以分为四大类:
Zookeeper数据模型
3. 安装zookeeper
zookeeper的安装请参考文章《Zookeeper安装》。
4. 服务端命令
启动 ZooKeeper 服务:./zkServer.sh start
查看 ZooKeeper 服务状态:./zkServer.sh status
停止 ZooKeeper 服务:./zkServer.sh stop
重启 ZooKeeper 服务:./zkServer.sh restart
5. 客户端命令
5.1. 启动客户端
1 2 3 4
| cd /usr/zookeeper/apache-zookeeper-3.5.6-bin/bin
./zkCli.sh -server localhost:2181
|
可以在客户端命令行通过使用 quit
命令退出,断开连接。
5.2. 查看命令帮助
5.3. 展示节点信息
节点信息说明
czxid:节点被创建的事务ID
ctime:创建时间
mzxid:最后一次被更新的事务ID
mtime:修改时间
pzxid:子节点列表最后一次被更新的事务ID
cversion:子节点的版本号
dataversion:数据版本号
aclversion:权限版本号
ephemeralOwner:用于临时节点,代表临时节点的事务ID,如果为持久节点则为0
dataLength:节点存储的数据的长度
numChildren:当前节点的子节点个数
5.4. 节点操作(CURD)
创建节点
创建临时节点
1 2
| create -e /节点path value
|
临时节点在会话关闭(quit断开连接)之后就会被删除。
创建顺序节点
1 2
| create -s /节点path value
|
顺序节点的编号是所有节点都共用一个编号器(不断累加)。
创建临时顺序节点
获取节点值
设置节点值
删除单个节点
删除带有子节点的节点
6. Curator 客户端库
6.1. Curator 介绍
Curator 是 Apache ZooKeeper 的Java客户端库。
Curator 项目的目标是简化Java中 ZooKeeper 客户端的使用。
👉官网地址。使用Curator需要注意和ZooKeeper的版本兼容问题。
6.2. Curator API 常用操作
建立连接、添加节点、删除节点、修改节点、查询节点
Watch事件监听
分布式锁实现
6.2.1 Java中连接zk server
新建一个maven项目,pom.xml文件添加依赖:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>
<groupId>com.learning</groupId> <artifactId>curator-zk</artifactId> <version>1.0-SNAPSHOT</version>
<properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties>
<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13.2</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.0</version> </dependency>
<dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.21</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.21</version> </dependency>
</dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build>
</project>
|
修改 src/main/resources/log4j.properties
文件:
1 2 3 4 5 6 7
| log4j.rootLogger=ERROR,stdout
log4j.appender.stdout = org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target = System.out log4j.appender.stdout.layout = org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern = [%d{yyyy-MM-dd HH/:mm/:ss}]%-5p %c(line/:%L) %x-%m%n
|
新建一个单元测试类,测试连接zk server:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
|
@Test public void testConnect() {
CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.18.123:2181", 60 * 1000, 15 * 1000, new ExponentialBackoffRetry(3000, 10)); client.start(); }
|
6.2.2 zookeeper的CURD
完整测试代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
| package com.learning;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; import org.junit.After; import org.junit.Before; import org.junit.Test;
import java.nio.charset.StandardCharsets; import java.util.List;
public class CuratorTest {
private CuratorFramework client;
@Before public void connect() { client = CuratorFrameworkFactory.builder() .connectString("192.168.18.123:2181") .sessionTimeoutMs(60 * 1000) .connectionTimeoutMs(15 * 1000) .retryPolicy(new ExponentialBackoffRetry(3000, 10))
.build(); client.start(); }
@After public void close(){ if (client != null){ client.close(); } }
@Test public void testCreate() throws Exception {
String path = client.create().forPath("/app5"); System.out.println(path);
path = client.create().forPath("/app6", "数据".getBytes(StandardCharsets.UTF_8)); System.out.println(path);
path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/tmpNode"); System.out.println(path);
path = client.create().creatingParentsIfNeeded().forPath("/parent/childNode"); System.out.println(path); }
@Test public void testGet() throws Exception { byte[] bytes = client.getData().forPath("/app6"); System.out.println(new String(bytes));
List<String> list = client.getChildren().forPath("/"); list.forEach(child -> System.out.println(child));
Stat stat = new Stat(); client.getData().storingStatIn(stat).forPath("/app6"); System.out.println(stat); }
@Test public void testSet() throws Exception { client.setData().forPath("/app6", "dataValue".getBytes(StandardCharsets.UTF_8));
Stat stat = new Stat(); client.getData().storingStatIn(stat).forPath("/app6"); int version = stat.getVersion(); client.setData().withVersion(version).forPath("/app6", "dataValue".getBytes(StandardCharsets.UTF_8)); }
@Test public void testDelete() throws Exception { client.delete().forPath("/app6");
client.delete().deletingChildrenIfNeeded().forPath("/app6");
client.delete().guaranteed().forPath("/app6");
client.delete().inBackground((client, event) -> { System.out.println("删除后回调函数被执行..."); }).forPath("/app6"); }
}
|
6.2.3 Watch监控
ZooKeeper 允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,ZooKeeper 服务端会将事件通知到感兴趣的客户端上去,该机制是 ZooKeeper 实现分布式协调服务的重要特性。
ZooKeeper 中引入了Watcher机制来实现了发布/订阅功能能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者。
ZooKeeper提供了三种Watcher:
新建一个单元测试类,使用上一步中的连接方式进行单元测试。
监控某一个特定的节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
|
@Test public void testNodeCache() throws Exception { NodeCache nodeCache = new NodeCache(client, "/app1"); nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { System.out.println("节点发生变化..."); byte[] data = nodeCache.getCurrentData().getData(); System.out.println("节点新的值:" + new String(data)); } }); nodeCache.start(true);
while (true){
} }
|
监控一个ZNode的子节点(不包含自己)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
|
@Test public void testPathChildrenCache() throws Exception { PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/app2", true); pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception { System.out.println("子节点发生变化..."); System.out.println(pathChildrenCacheEvent); } }); pathChildrenCache.start(); while (true){
} }
|
监控整个树上的所有节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
|
@Test public void testTreeCache() throws Exception { TreeCache treeCache = new TreeCache(client, "/app2"); treeCache.getListenable().addListener(new TreeCacheListener() { @Override public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception { System.out.println("节点发生变化..."); System.out.println(treeCacheEvent); } }); treeCache.start(); while (true){
} }
|
6.2.4 分布式锁
分布式锁概念
在我们进行单机应用开发,涉及并发同步的时候,我们往往采用synchronized或者Lock的方式来解决多线程间的代码同步问题,这时多线程的运行都是在同一个JVM之下,没有任何问题。
但当我们的应用是分布式集群工作的情况下,属于多JVM下的工作环境,跨JVM之间已经无法通过多线程的锁解决同步问题。
那么就需要一种更加高级的锁机制,来处理种跨机器的进程之间的数据同步问题——这就是分布式锁。
分布式锁的几种实现方案
- 基于缓存实现:Redis、Memcache。
- 基于zookeeper实现。
- 基于数据库层面实现(通过表数据判断是否获得锁)。
zookeeper分布式锁原理
核心思想:当客户端要获取锁,则创建节点,使用完锁,则删除该节点。
客户端获取锁时,在lock节点下创建临时顺序节点。
临时 是为了能够有效的删除节点,避免锁无法释放的问题。
顺序 是为了客户端能够判断自己是否获取到了锁。
然后获取lock下面的所有子节点,客户端获取到所有的子节点之后,如果发现自己创建的子节点序号最小,那么就认为该客户端获取到了锁。使用完锁后,将该节点删除。
如果发现自己创建的节点并非lock所有子节点中最小的,说明自己还没有获取到锁,此时客户端需要找到比自己小的那个节点,同时对其注册事件监听器,监听删除事件。
如果发现比自己小的那个节点被删除,则客户端的Watcher会收到相应通知,此时再次判断自己创建的节点是否是lock子节点中序号最小的,如果是则获取到了锁,如果不是则重复以上步骤继续获取到比自己小的一个节点,并注册监听。
Curator中的五种锁方案
InterProcessSemaphoreMutex:分布式排它锁(非可重入锁)
InterProcessMutex:分布式可重入排它锁
InterProcessReadWriteLock:分布式读写锁
InterProcessMultiLock:将多个锁作为单个实体管理的容器
InterProcessSemaphoreV2:共享信号量
7. zookeeper集群
7.1 Zookeeper 集群介绍
Zookeeper集群架构图
Zookeeper集群角色
在ZooKeeper集群服中务中有三个角色:
Leader 领导者
- 处理事务请求
- 集群内部各服务器的调度者
Follower 跟随者
- 处理客户端非事务请求,转发事务请求给Leader服务器
- 参与Leader选举投票
Observer 观察者
- 处理客户端非事务请求,转发事务请求给Leader服务器
Leader选举机制
前提:可运行的机器要超过集群总数量的半数。
Serverid:服务器ID
比如有三台服务器,编号分别是1、2、3。编号越大在选择算法中的权重越大。
Zxid:数据ID
服务器中存放的最大数据ID。值越大说明数据越新,在选举算法中数据越新权重越大。
在Leader选举的过程中,如果某台ZooKeeper获得了超过半数的选票,则此ZooKeeper就可以成为Leader了。
如果有这么一种情况,顺序启动zk-server:
1)1号zk-server启动,此时还是非集群状态;
2)接着2号zk-server启动,此时选举2号为leader;
3)如果接着3号zk-server启动,此时还是2号为leader,不会重新进行选举,只有在2号发生故障的情况下才会重新选举,此时3号机就会当选leader。
7.2 Zookeeper 集群搭建
参考文章《Zookeeper集群搭建》。