一、简介
Curator为Netflix开源的对ZooKeeper进行操作的框架。与原生客户端相比,Curator进行了高度的抽象和封装,同时简化了开发步骤,屏蔽了底层细节,并在一定程度上进行了功能增强,提供包括:连接重连、反复注册Watcher 和 NodeExistsException异常等功能。
对于Curator来说,其包含如下几个包:
- curator-framework: 其属于对zookeeper底层api的封装。
- curator-client:其用于提供一些客户端操作,如重试策略等。
- curator-recipes:其封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等。
下面给出curator操作相关的Maven依赖:
<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework 对 zookeeper 的底层 api 的一些封装-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.4.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-client 提供一些客户端的操作,例如重试策略等 -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>5.4.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes 封装了一些高级特性,如:Cache 事件监听、选举、分布式锁、分布式计数器等等-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.4.0</version>
</dependency>
下面介绍一下Curator的常用API。
二、常用操作
2.1 创建会话
创建会话的方式有两种,分别是:静态方法创建 和 Fluent API创建。
使用静态方法创建的示例如下:
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework curator= CuratorFrameworkFactory.newClient("192.168.56.103:2181", 5000, 3000, retryPolicy);
对于CuratorFrameworkFactory的newClient静态方法而言,其主要包含四个参数,其对应的含义分别为:
- connectionString:其表示服务器列表信息,其格式为:host1:port1, host2:port2, …
- retryPolicy:其表示为重试策略,内置的有四种,当然也可通过实现RetryPolicy来自定义。
- sessionTimeoutMs:其表示会话的超时时间,单位为毫秒,默认为6000ms。
- connectionTimeoutMs:其表示连接创建的超时时间,单位为毫秒,默认为6000ms。
下面给出对应的示例:
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class CreateSessionTest {
/*
借助zkclient完成会话的创建
*/
public static void main(String[] args) {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("192.168.217.128:2181", retryPolicy);
curatorFramework.start();
System.out.println("会话被建立");
}
}
执行结果如下:
当然,也可采用Fluent方式进行创建,对应的示例如下:
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class CreateSessionTest {
/*
借助zkclient完成会话的创建
*/
public static void main(String[] args) {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.217.128:2181")
.sessionTimeoutMs(50000)
.connectionTimeoutMs(30000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.namespace("base")
.build();
client.start();
System.out.println("会话建立了......");
}
}
执行结果如下:
2.2 创建数据节点
对于ZooKeeper来说,其节点分为四种类型,分别为:持久化、持久带序列、临时和临时带序列。同样的,Curator也提供了对应的模式,分为CreateMode的四个枚举值:PERSISTENT(持久化)、PERSISTENT_SEQUENTIAL(持久带序号)、EPHEMERAL(临时的)和EPHEMERAL_SEQUENTIAL(临时带序号)。
关于Curator创建节点的API为:
public T forPath(String path)
//创建节点,并赋值内容
public T forPath(String path, byte[] data)
//判断节点是否存在,节点存在了,创建时仍然会报错
public ExistsBuilder checkExists()
下面给出对应的示例代码:
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import java.nio.charset.StandardCharsets;
public class CreateNodeTest {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.217.128:2181")
.sessionTimeoutMs(50000)
.connectionTimeoutMs(30000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.namespace("base")
.build();
client.start();
System.out.println("会话建立了......");
String nodePath = "/test-persistent";
Stat exists = client.checkExists().forPath(nodePath);
if (exists != null) {
System.out.println("节点【" + nodePath + "】已存在,无法新增");
return;
}
// 默认为创建永久节点
client.create().forPath(nodePath, "默认为持久节点".getBytes(StandardCharsets.UTF_8));
nodePath = "/test-persistent/child-1";
// 手动指定创建节点的类型
client.create()
.withMode(CreateMode.PERSISTENT)
.forPath(nodePath, "手动指定节点类型".getBytes(StandardCharsets.UTF_8));
nodePath = "/test-persistent/child-2/hello";
String node = client.create()
.creatingParentsIfNeeded()
.forPath(nodePath, "若父节点不存在,则自动创建".getBytes(StandardCharsets.UTF_8));
System.out.println(node);
}
}
执行结果如下:
使用PrettyZoo查看结果如下:
2.3 检查节点是否存在
关于检查节点是否存在的API为:
curator.checkExists().forPath("path")
下面给出对应的示例:
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;
public class NodeExistsTest {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.217.128:2181")
.sessionTimeoutMs(50000)
.connectionTimeoutMs(30000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.namespace("base")
.build();
client.start();
System.out.println("会话建立了......");
Stat stat = client.checkExists().forPath("/test-persistent");
System.out.println("/test-persistent节点是否存在:" + (stat != null));
}
}
执行结果如下:
2.4 获取节点内容
关于获取节点内容的相关API为:
//获取某个节点数据
client.getData().forPath(nodePath)
//读取zookeeper的数据,并放到Stat中
client.getData().storingStatIn(stat1).forPath(nodePath)
上述方法对应参数的含义为:
- path:指定待读取的节点
- stat:指定数据节点的节点状态信息
下面给出对应的示例代码:
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;
public class GetNodeDataTest {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.217.128:2181")
.sessionTimeoutMs(50000)
.connectionTimeoutMs(30000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.namespace("base")
.build();
client.start();
System.out.println("会话建立了......");
String path = "/test-persistent";
// 获取节点时同时获取状态信息
Stat stat = new Stat();
byte[] data = client.getData().storingStatIn(stat).forPath(path);
System.out.println("获取到节点的数据内容为:" + new String(data));
System.out.println("获取到节点状态信息为:" + stat);
}
}
执行结果如下:
2.5 获取子节点列表
关于获取子节点列表的API如下:
curator.getChildren().forPath("path")
下面给出对应的示例:
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.util.List;
public class GetChildNodesTest {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.217.128:2181")
.connectionTimeoutMs(50000)
.sessionTimeoutMs(30000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.namespace("base")
.build();
client.start();
System.out.println("会话被创建......");
String path = "/test-persistent";
List<String> children = client.getChildren().forPath(path);
System.out.println("【" + path + "】的子节点列表为:" + children);
}
}
执行结果如下:
2.6 设置或更新节点信息
设置或更新节点内容的API如下:
//更新节点
client.setData().forPath(nodePath, data.getBytes())
//指定版本号,更新节点
client.setData().withVersion(-1).forPath(nodePath, data.getBytes())
//异步设置某个节点数据
client.setData().inBackground().forPath(nodePath, data.getBytes())
下面给出对应的示例:
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;
import java.nio.charset.StandardCharsets;
public class UpdateNodeDataTest {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.217.128:2181")
.connectionTimeoutMs(5000)
.sessionTimeoutMs(3000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.namespace("base")
.build();
client.start();
System.out.println("会话被创建......");
// 状态信息
Stat stat = new Stat();
String path = "/test-persistent";
client.getData().storingStatIn(stat).forPath(path);
System.out.println("获取到的节点状态为:" + stat);
// 更新节点内容
int version = client.setData().withVersion(stat.getAversion()).forPath(path, "修改后的内容".getBytes(StandardCharsets.UTF_8)).getVersion();
System.out.println("当前的最新版本为:" + version);
byte[] data = client.getData().forPath(path);
System.out.println("修改后的节点数据为:" + new String(data));
// 此时会报错
client.setData().withVersion(stat.getVersion()).forPath(path, "再次修改内容".getBytes(StandardCharsets.UTF_8));
}
}
执行结果如下:
2.7 删除节点
删除节点对应的API为:
//删除节点
client.delete().forPath(nodePath);
//删除节点,即使出现网络故障,zookeeper也可以保证删除该节点
client.delete().guaranteed().forPath(nodePath);
//级联删除节点(如果当前节点有子节点,子节点也可以一同删除) client.delete().deletingChildrenIfNeeded().forPath(nodePath);
下面给出对应的示例:
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class DeleteNodeTest {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.217.128:2181")
.connectionTimeoutMs(50000)
.sessionTimeoutMs(30000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.namespace("base")
.build();
client.start();
System.out.println("会话2被创建......");
String path = "/test-persistent";
client.delete().deletingChildrenIfNeeded().withVersion(-1)
.forPath(path);
System.out.println("节点删除成功:" + path);
}
}
执行结果如下:
2.8 缓存
Zookeeper原生就支持通过注册Watcher来进行事件监听,但基于原生实现的事件监听只能使用一次,开发者需反复注册。为解决该问题,Curator将事件监听封装为Cache,Cache可看作为对事件监听的本地缓存视图,其能为开发者自动重复注册监听。对于Curator而言,其提供了三种Watcher(Cache)来监听结点的变化。
2.8.1 PathChildren Cache
PathChildren Cache 用于监控ZNode子节点的变更. 当子节点增加、 更新,删除时, Path Cache便会改变其状态, 并包含最新的子节点、以及子节点的数据和状态。此外,状态的变更将通过PathChildrenCacheListener来进行通知。
使用时会涉及到四个类,分别为:PathChildrenCache、PathChildrenCacheEvent、PathChildrenCacheListener、ChildData。创建PathCache可通过如下构造函数进行:
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)
当然,若想使用cache,则需调用其start方法,使用完毕后需调用close方法进行关闭。启动时,可通过StartMode来进行设置。对于StartMode而言,可选值如下:
- NORMAL:表示进行正常的初始化。
- BUILD_INITIAL_CACHE:表示在调用 start() 前会调用 rebuild()。
- POST_INITIALIZED_EVENT: 表示当 Cache 初始化后会发送PathChildrenCacheEvent.Type#INITIALIZED 事件此外,还可通过addListener方法添加监听缓存变化的监听器,通过getCurrentData方法获取List对象。
下面给出PathCache使用的示例:
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
public class PathCacheTest {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.217.128:2181")
.sessionTimeoutMs(3000)
.connectionTimeoutMs(5000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.namespace("base")
.build();
client.start();
System.out.println("会话已创建......");
PathChildrenCache cache = new PathChildrenCache(client, "/test-persistent", true);
cache.start();
PathChildrenCacheListener listener = (cli, event) -> {
System.out.println("事件类型为:" + event.getType());
ChildData data = event.getData();
if (data != null) {
System.out.println("节点数据为:" + data.getPath() + " = " + new String(data.getData()));
}
};
cache.getListenable().addListener(listener);
client.create().creatingParentsIfNeeded().forPath("/test-persistent/pathCache1", "01".getBytes(StandardCharsets.UTF_8));
TimeUnit.MILLISECONDS.sleep(10);
client.create().creatingParentsIfNeeded().forPath("/test-persistent/pathCache2", "02".getBytes(StandardCharsets.UTF_8));
TimeUnit.MILLISECONDS.sleep(10);
client.setData().forPath("/test-persistent/pathCache1", "01_V2".getBytes(StandardCharsets.UTF_8));
TimeUnit.MILLISECONDS.sleep(10);
for (ChildData childData : cache.getCurrentData()) {
System.out.println("getCurrentData: " + childData.getPath() + " = " + new String(childData.getData()));
}
client.delete().forPath("/test-persistent/pathCache1");
TimeUnit.MILLISECONDS.sleep(10);
client.delete().forPath("/test-persistent/pathCache2");
TimeUnit.MILLISECONDS.sleep(10);
cache.close();
client.close();
System.out.println("OK");
}
}
执行结果如下:
需要说明的是,若 new PathChildrenCache(client, PATH, true)中的参数cacheData设置为false时,则示例中的event.getData().getData()将返回null,也就是说,此时的cache不会缓存节点数据。
需要说明的是,PathCache无法触发太过频繁的事件。
2.8.2 Node Cache
Node Cache与Path Cache类似,Node Cache只会监听某个特定的节点,其会涉及如下三个类:
- NodeCache: 表示Node的缓存实现类
- NodeCacheListener:表示节点监听器
- ChildData:表示节点数据
需要说明的还是,Node Cache依然需使用start()方法启动,使用完后调用close()方法关闭。通过getCurrentData()方法可获得节点的当前状态,根据其状态可得节点当前的值。
下面给出对应的示例:
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
public class NodeCacheTest {
public static void main(String[] args) throws Exception{
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.217.128:2181")
.connectionTimeoutMs(5000)
.sessionTimeoutMs(3000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.namespace("base")
.build();
client.start();
System.out.println("会话已创建......");
String path = "/test-persistent";
client.create().creatingParentsIfNeeded().forPath(path);
NodeCache cache = new NodeCache(client, path);
NodeCacheListener listener = () -> {
ChildData data = cache.getCurrentData();
if (data != null) {
System.out.println("节点数据:" + new String(data.getData()));
} else {
System.out.println("节点不存在!");
}
};
cache.getListenable().addListener(listener);
cache.start();
client.setData().forPath(path, "01".getBytes(StandardCharsets.UTF_8));
TimeUnit.MILLISECONDS.sleep(100);
client.setData().forPath(path, "02".getBytes(StandardCharsets.UTF_8));
TimeUnit.MILLISECONDS.sleep(100);
client.delete().deletingChildrenIfNeeded().forPath(path);
TimeUnit.SECONDS.sleep(2);
cache.close();
}
}
执行结果如下:
2.8.3 Tree Cache
Tree Cache可监控整棵树的所有节点,包括当前节点和其子节点,可认为是PathCache 和 NodeCache 的组合,其会涉及如下四个类:
- TreeCache:表示Tree Cache实现类
- TreeCacheListener:表示监听器类
- TreeCacheEvent:表示触发的事件类
- ChildData:表示节点数据
下面给出对应的代码:
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
public class TreeCacheTest {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.217.128:2181")
.sessionTimeoutMs(3000)
.connectionTimeoutMs(5000)
.namespace("base")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
client.start();
System.out.println("会话被创建......");
String path = "/test-persistent";
client.create().creatingParentsIfNeeded().forPath(path);
TreeCache cache = new TreeCache(client, path);
TreeCacheListener listener = (cli, event) -> {
String treePath = event.getData() != null ? event.getData().getPath() : null;
System.out.println("事件类型:" + event.getType() + ", 路径:" + treePath);
TreeCacheEvent.Type eventType = event.getType();
if (TreeCacheEvent.Type.NODE_ADDED == eventType || TreeCacheEvent.Type.NODE_UPDATED == eventType) {
byte[] data = cli.getData().forPath(treePath);
System.out.println("当前节点的数据为:" + new String(data));
}
};
cache.getListenable().addListener(listener);
cache.start();
client.setData().forPath(path, "01".getBytes(StandardCharsets.UTF_8));
TimeUnit.MILLISECONDS.sleep(100);
client.setData().forPath(path, "02".getBytes(StandardCharsets.UTF_8));
TimeUnit.MILLISECONDS.sleep(100);
client.create().forPath(path + "/SubTree", "SubTree".getBytes(StandardCharsets.UTF_8));
client.setData().forPath(path + "/SubTree", "SubTree2".getBytes(StandardCharsets.UTF_8));
TimeUnit.MILLISECONDS.sleep(100);
client.delete().deletingChildrenIfNeeded().forPath(path);
TimeUnit.SECONDS.sleep(2);
cache.close();
client.close();
System.out.println("OK!");
}
}
执行结果如下:
2.8.4 CuratorCache
需要说明的是,在Curator5.0之后,之前的PathCache、NodeCache以及TreeCache均被置为弃用状态。取而代之的则为CuratorCache。
CuratorCache会试图将节点的数据保存到本地缓存中。 其既可缓存指定的单个节点,也可缓存以指定节点为根的整个子树(默认缓存方案)。此外,还可为CuratorCache实例注册监听器,这样,当相关节点发生更改时会接收到通知, 从而响应更新、创建、删除等事件。
下面给出对应的示例:
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
public class CuratorCacheTest {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.217.128:2181")
.sessionTimeoutMs(3000)
.connectionTimeoutMs(5000)
.namespace("base")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
client.start();
System.out.println("会话被创建......");
String path = "/test-persistent";
client.create().creatingParentsIfNeeded().forPath(path);
CuratorCache cache = CuratorCache.build(client, path);
CuratorCacheListener listener = CuratorCacheListener.builder()
.forCreates(node -> {
System.out.println(String.format("Node Created: [%s]", node));
}).forChanges(((oldNode, node) -> {
System.out.println(String.format("Node Changed, Old: [%s], New: [%s]", oldNode, node));
})).forDeletes(oldNode -> {
System.out.println(String.format("Node deleted, Old Value: [%s]", oldNode));
}).forInitialized(() -> System.out.println("Cache initialized!")).build();
cache.listenable().addListener(listener);
cache.start();
client.setData().forPath(path, "01".getBytes(StandardCharsets.UTF_8));
TimeUnit.MILLISECONDS.sleep(100);
client.setData().forPath(path, "02".getBytes(StandardCharsets.UTF_8));
TimeUnit.MILLISECONDS.sleep(100);
client.create().forPath(path + "/SubTree", "SubTree".getBytes(StandardCharsets.UTF_8));
client.setData().forPath(path + "/SubTree", "SubTree2".getBytes(StandardCharsets.UTF_8));
TimeUnit.MILLISECONDS.sleep(100);
client.delete().deletingChildrenIfNeeded().forPath(path);
TimeUnit.SECONDS.sleep(2);
cache.close();
client.close();
System.out.println("OK!");
}
}
执行结果如下: