Zookeeper使用手册

Zookeeper使用手册

本文主要介绍:

1、zookeeper的概念和基本用法。

2、如何通过Curator这个Java API客户端库访问zookeeper客户端。

1. Zookeeper概念

Zookeeper 是 Apache Hadoop 项目下的一个子项目,是一个树形目录服务。

Zookeeper 是一个分布式的、开源的分布式应用程序的协调服务。

Zookeeper 提供的主要功能包括:

  • 配置管理

  • 分布式锁

  • 集群管理

配置中心

分布式锁

集群管理(注册中心)

2. 数据类型

ZooKeeper 是一个树形目录服务,其数据模型和Unix的文件系统目录树很类似,拥有一个层次化结构。

这里面的每一个节点都被称为: ZNode,每个节点上都会保存自己的数据和节点信息。

节点可以拥有子节点,同时也允许少量(1MB)数据存储在该节点之下。

节点可以分为四大类:

  • PERSISTENT 持久化节点

  • EPHEMERAL 临时节点 :-e

  • PERSISTENT_SEQUENTIAL 持久化顺序节点 :-s

  • EPHEMERAL_SEQUENTIAL 临时顺序节点 :-es

Zookeeper数据模型

zookeeper数据模型

3. 安装zookeeper

zookeeper的安装请参考文章《Zookeeper安装》

4. 服务端命令

  • 启动 ZooKeeper 服务:./zkServer.sh start

  • 查看 ZooKeeper 服务状态:./zkServer.sh status

  • 停止 ZooKeeper 服务:./zkServer.sh stop

  • 重启 ZooKeeper 服务:./zkServer.sh restart

5. 客户端命令

5.1. 启动客户端

1
2
3
4
# 进入zookeeper目录
cd /usr/zookeeper/apache-zookeeper-3.5.6-bin/bin
# 启动客户端,并连接到zookeeper服务端,./zkCli.sh –server ip:port 如:
./zkCli.sh -server localhost:2181

客户端连接成功

可以在客户端命令行通过使用 quit 命令退出,断开连接。

5.2. 查看命令帮助

1
2
# 查看命令帮助
help

5.3. 展示节点信息

1
2
# 显示指定目录下节点
ls /path

查看根目录下的节点

1
2
# 显示指定节点信息以及子节点
ls -s /path

查看指定目录下的节点信息

查看节点详细信息

节点信息说明

czxid:节点被创建的事务ID

ctime:创建时间

mzxid:最后一次被更新的事务ID

mtime:修改时间

pzxid:子节点列表最后一次被更新的事务ID

cversion:子节点的版本号

dataversion:数据版本号

aclversion:权限版本号

ephemeralOwner:用于临时节点,代表临时节点的事务ID,如果为持久节点则为0

dataLength:节点存储的数据的长度

numChildren:当前节点的子节点个数

5.4. 节点操作(CURD)

创建节点

1
2
# 创建节点并赋值value
create /节点path value

创建节点与赋值

创建临时节点

1
2
# 创建临时节点并赋值value
create -e /节点path value

临时节点在会话关闭(quit断开连接)之后就会被删除。

创建顺序节点

1
2
# 创建顺序节点并赋值value
create -s /节点path value

创建顺序节点

顺序节点的编号是所有节点都共用一个编号器(不断累加)。

创建临时顺序节点

image-20230315213937061

获取节点值

1
2
# 获取节点值
get /节点path

获取节点数据

设置节点值

1
2
# 设置节点值
set /节点path value

设置节点值

删除单个节点

1
2
# 删除单个节点
delete /节点path

删除单个节点

删除带有子节点的节点

1
2
# 删除带有子节点的节点
deleteall /节点path

不能用delete命令删除非空节点

6. Curator 客户端库

6.1. Curator 介绍

Curator 是 Apache ZooKeeper 的Java客户端库。

Curator 项目的目标是简化Java中 ZooKeeper 客户端的使用。

👉官网地址。使用Curator需要注意和ZooKeeper的版本兼容问题。

6.2. Curator API 常用操作

  • 建立连接、添加节点、删除节点、修改节点、查询节点

  • Watch事件监听

  • 分布式锁实现

6.2.1 Java中连接zk server

新建一个maven项目,pom.xml文件添加依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.learning</groupId>
<artifactId>curator-zk</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>

<dependencies>
<!--junit-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>

<!--Curator-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>

<!--日志-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
</dependency>

</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>

</project>

修改 src/main/resources/log4j.properties 文件:

1
2
3
4
5
6
7
# 日志级别
log4j.rootLogger=ERROR,stdout

log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%d{yyyy-MM-dd HH/:mm/:ss}]%-5p %c(line/:%L) %x-%m%n

新建一个单元测试类,测试连接zk server:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 连接zk server
*/
@Test
public void testConnect() {
/*
* 参数说明
* @param connectString zk的server的地址和端口(多个用,隔开)
* @param sessionTimeoutMs 会话超时时间(ms)
* @param connectionTimeoutMs 连接超时时间(ms)
* @param retryPolicy 重试策略
* @return client
*/
CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.18.123:2181", 60 * 1000, 15 * 1000, new ExponentialBackoffRetry(3000, 10));
client.start();
}

6.2.2 zookeeper的CURD

完整测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package com.learning;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.nio.charset.StandardCharsets;
import java.util.List;

public class CuratorTest {

private CuratorFramework client;

/**
* 连接zk server(链式)
*/
@Before
public void connect() {
client = CuratorFrameworkFactory.builder()
.connectString("192.168.18.123:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(new ExponentialBackoffRetry(3000, 10))
// .namespace("app1") //指定默认的起始节点,就不用每次都去写根节点
.build();
client.start();
}

/**
* 释放资源
*/
@After
public void close(){
if (client != null){
client.close();
}
}

/**
* 创建节点 测试
* @throws Exception
*/
@Test
public void testCreate() throws Exception {
// 注意,这边如果一直转圈圈无法创建,请检查是否已经关闭linux防火墙或者是否开放端口

// 创建默认节点,如果数据为空,那么默认数据是当前客户端的ip地址
String path = client.create().forPath("/app5");
System.out.println(path);

// 创建带数据的节点
path = client.create().forPath("/app6", "数据".getBytes(StandardCharsets.UTF_8));
System.out.println(path);

// 创建临时节点
path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/tmpNode");
System.out.println(path);

// 创建多级节点,如果父节点不存在则创建父节点
path = client.create().creatingParentsIfNeeded().forPath("/parent/childNode");
System.out.println(path);
}

/**
* 查询节点 测试
* @throws Exception
*/
@Test
public void testGet() throws Exception {
// 节点数据:get
byte[] bytes = client.getData().forPath("/app6");
System.out.println(new String(bytes));

// 获取子节点:ls
List<String> list = client.getChildren().forPath("/");
list.forEach(child -> System.out.println(child));

// 获取状态信息:ls -s
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("/app6");
// 状态信息会封装进 Stat 对象
System.out.println(stat);
}

/**
* 修改节点数据 测试
* @throws Exception
*/
@Test
public void testSet() throws Exception {
// 修改节点数据。
client.setData().forPath("/app6", "dataValue".getBytes(StandardCharsets.UTF_8));

// 通过版本号修改数据,CAS原理
// 先查询版本号信息
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("/app6");
// 如果修改失败直接返回异常
int version = stat.getVersion();
client.setData().withVersion(version).forPath("/app6", "dataValue".getBytes(StandardCharsets.UTF_8));
}

/**
* 删除节点数据 测试
* @throws Exception
*/
@Test
public void testDelete() throws Exception {
// 删除单个节点
client.delete().forPath("/app6");

// 删除含子节点的节点
client.delete().deletingChildrenIfNeeded().forPath("/app6");

// 必须成功的删除
client.delete().guaranteed().forPath("/app6");

// 带回调函数的删除
client.delete().inBackground((client, event) -> {
System.out.println("删除后回调函数被执行...");
}).forPath("/app6");
}

}

6.2.3 Watch监控

ZooKeeper 允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,ZooKeeper 服务端会将事件通知到感兴趣的客户端上去,该机制是 ZooKeeper 实现分布式协调服务的重要特性。

ZooKeeper 中引入了Watcher机制来实现了发布/订阅功能能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者。

ZooKeeper提供了三种Watcher:

  • NodeCache:只是监控某一个特定的节点。

  • PathChildrenCache:监控一个ZNode的子节点(不包含自己)。

  • TreeCache:可以监控整个树上的所有节点,类似于PathChildrenCacheNodeCache的组合。

新建一个单元测试类,使用上一步中的连接方式进行单元测试。

监控某一个特定的节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 监控某一个特定的节点
*/
@Test
public void testNodeCache() throws Exception {
NodeCache nodeCache = new NodeCache(client, "/app1");
// 注册监听器
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("节点发生变化...");
byte[] data = nodeCache.getCurrentData().getData();
System.out.println("节点新的值:" + new String(data));
}
});
// 开始监听
nodeCache.start(true);

// 单元测试,让程序一直运行,测试监听
while (true){

}
}

监控一个ZNode的子节点(不包含自己)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* 监控一个ZNode的子节点(不包含自己)
*/
@Test
public void testPathChildrenCache() throws Exception {
PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/app2", true);
// 注册监听器
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
System.out.println("子节点发生变化...");
System.out.println(pathChildrenCacheEvent);
}
});
// 开始监听
pathChildrenCache.start();
// 单元测试,让程序一直运行,测试监听
while (true){

}
}

子节点变化类型

监控整个树上的所有节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* 可以监控整个树上的所有节点
*/
@Test
public void testTreeCache() throws Exception {
TreeCache treeCache = new TreeCache(client, "/app2");
// 注册监听器
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
System.out.println("节点发生变化...");
System.out.println(treeCacheEvent);
}
});
// 开始监听
treeCache.start();
// 单元测试,让程序一直运行,测试监听
while (true){

}
}

6.2.4 分布式锁

分布式锁概念

在我们进行单机应用开发,涉及并发同步的时候,我们往往采用synchronized或者Lock的方式来解决多线程间的代码同步问题,这时多线程的运行都是在同一个JVM之下,没有任何问题。

但当我们的应用是分布式集群工作的情况下,属于多JVM下的工作环境,跨JVM之间已经无法通过多线程的锁解决同步问题。

那么就需要一种更加高级的锁机制,来处理种跨机器的进程之间的数据同步问题——这就是分布式锁。

分布式锁

分布式锁的几种实现方案

  • 基于缓存实现:Redis、Memcache。
  • 基于zookeeper实现。
  • 基于数据库层面实现(通过表数据判断是否获得锁)。

分布式锁的几种实现方案

zookeeper分布式锁原理

核心思想:当客户端要获取锁,则创建节点,使用完锁,则删除该节点。

  1. 客户端获取锁时,在lock节点下创建临时顺序节点。

    临时 是为了能够有效的删除节点,避免锁无法释放的问题。

    顺序 是为了客户端能够判断自己是否获取到了锁。

  2. 然后获取lock下面的所有子节点,客户端获取到所有的子节点之后,如果发现自己创建的子节点序号最小,那么就认为该客户端获取到了锁。使用完锁后,将该节点删除。

  3. 如果发现自己创建的节点并非lock所有子节点中最小的,说明自己还没有获取到锁,此时客户端需要找到比自己小的那个节点,同时对其注册事件监听器,监听删除事件。

  4. 如果发现比自己小的那个节点被删除,则客户端的Watcher会收到相应通知,此时再次判断自己创建的节点是否是lock子节点中序号最小的,如果是则获取到了锁,如果不是则重复以上步骤继续获取到比自己小的一个节点,并注册监听。

zookeeper分布式锁原理

Curator中的五种锁方案

  • InterProcessSemaphoreMutex:分布式排它锁(非可重入锁)

  • InterProcessMutex:分布式可重入排它锁

  • InterProcessReadWriteLock:分布式读写锁

  • InterProcessMultiLock:将多个锁作为单个实体管理的容器

  • InterProcessSemaphoreV2:共享信号量

7. zookeeper集群

7.1 Zookeeper 集群介绍

Zookeeper集群架构图

Zookeeper集群

Zookeeper集群角色

在ZooKeeper集群服中务中有三个角色:

  • Leader 领导者

    1. 处理事务请求
    2. 集群内部各服务器的调度者
  • Follower 跟随者

    1. 处理客户端非事务请求,转发事务请求给Leader服务器
    2. 参与Leader选举投票
  • Observer 观察者

    1. 处理客户端非事务请求,转发事务请求给Leader服务器

Leader选举机制

前提:可运行的机器要超过集群总数量的半数。

  • Serverid:服务器ID

    比如有三台服务器,编号分别是1、2、3。编号越大在选择算法中的权重越大。

  • Zxid:数据ID

    服务器中存放的最大数据ID。值越大说明数据越新,在选举算法中数据越新权重越大。

  • 在Leader选举的过程中,如果某台ZooKeeper获得了超过半数的选票,则此ZooKeeper就可以成为Leader了。

如果有这么一种情况,顺序启动zk-server:

1)1号zk-server启动,此时还是非集群状态;

2)接着2号zk-server启动,此时选举2号为leader;

3)如果接着3号zk-server启动,此时还是2号为leader,不会重新进行选举,只有在2号发生故障的情况下才会重新选举,此时3号机就会当选leader。

7.2 Zookeeper 集群搭建

参考文章《Zookeeper集群搭建》


Zookeeper使用手册
https://binbiubiu.github.io/20230315231727/
作者
Binbiubiu
发布于
2023年3月15日