Zookeeper使用详解之Curator操作篇(Curator是什么)

Zookeeper使用详解之Curator操作篇(Curator是什么)

一、简介

Curator为Netflix开源的对ZooKeeper进行操作的框架。与原生客户端相比,Curator进行了高度的抽象和封装,同时简化了开发步骤,屏蔽了底层细节,并在一定程度上进行了功能增强,提供包括:连接重连、反复注册Watcher 和 NodeExistsException异常等功能。

对于Curator来说,其包含如下几个包:

  • curator-framework: 其属于对zookeeper底层api的封装。
  • curator-client:其用于提供一些客户端操作,如重试策略等。
  • curator-recipes:其封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等。

下面给出curator操作相关的Maven依赖:

<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework 对 zookeeper 的底层 api 的一些封装-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.4.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-client 提供一些客户端的操作,例如重试策略等 -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>5.4.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes 封装了一些高级特性,如:Cache 事件监听、选举、分布式锁、分布式计数器等等-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.4.0</version>
</dependency>

下面介绍一下Curator的常用API。

二、常用操作

2.1 创建会话

创建会话的方式有两种,分别是:静态方法创建 和 Fluent API创建。

使用静态方法创建的示例如下:

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework curator= CuratorFrameworkFactory.newClient("192.168.56.103:2181", 5000, 3000, retryPolicy);

对于CuratorFrameworkFactory的newClient静态方法而言,其主要包含四个参数,其对应的含义分别为:

  • connectionString:其表示服务器列表信息,其格式为:host1:port1, host2:port2, …
  • retryPolicy:其表示为重试策略,内置的有四种,当然也可通过实现RetryPolicy来自定义。
  • sessionTimeoutMs:其表示会话的超时时间,单位为毫秒,默认为6000ms。
  • connectionTimeoutMs:其表示连接创建的超时时间,单位为毫秒,默认为6000ms。

下面给出对应的示例:

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class CreateSessionTest {
/*
借助zkclient完成会话的创建
*/
public static void main(String[] args) {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("192.168.217.128:2181", retryPolicy);
curatorFramework.start();
System.out.println("会话被建立");
}
}

执行结果如下:

当然,也可采用Fluent方式进行创建,对应的示例如下:

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class CreateSessionTest {
/*
借助zkclient完成会话的创建
*/
public static void main(String[] args) {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.217.128:2181")
.sessionTimeoutMs(50000)
.connectionTimeoutMs(30000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.namespace("base")
.build();
client.start();
System.out.println("会话建立了......");
}
}

执行结果如下:

2.2 创建数据节点

对于ZooKeeper来说,其节点分为四种类型,分别为:持久化、持久带序列、临时和临时带序列。同样的,Curator也提供了对应的模式,分为CreateMode的四个枚举值:PERSISTENT(持久化)、PERSISTENT_SEQUENTIAL(持久带序号)、EPHEMERAL(临时的)和EPHEMERAL_SEQUENTIAL(临时带序号)。

关于Curator创建节点的API为:

public T forPath(String path) 
//创建节点,并赋值内容
public T forPath(String path, byte[] data)
//判断节点是否存在,节点存在了,创建时仍然会报错
public ExistsBuilder checkExists()

下面给出对应的示例代码:

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import java.nio.charset.StandardCharsets;
public class CreateNodeTest {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.217.128:2181")
.sessionTimeoutMs(50000)
.connectionTimeoutMs(30000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.namespace("base")
.build();
client.start();
System.out.println("会话建立了......");
String nodePath = "/test-persistent";
Stat exists = client.checkExists().forPath(nodePath);
if (exists != null) {
System.out.println("节点【" + nodePath + "】已存在,无法新增");
return;
}
// 默认为创建永久节点
client.create().forPath(nodePath, "默认为持久节点".getBytes(StandardCharsets.UTF_8));
nodePath = "/test-persistent/child-1";
// 手动指定创建节点的类型
client.create()
.withMode(CreateMode.PERSISTENT)
.forPath(nodePath, "手动指定节点类型".getBytes(StandardCharsets.UTF_8));
nodePath = "/test-persistent/child-2/hello";
String node = client.create()
.creatingParentsIfNeeded()
.forPath(nodePath, "若父节点不存在,则自动创建".getBytes(StandardCharsets.UTF_8));
System.out.println(node);
}
}

执行结果如下:

使用PrettyZoo查看结果如下:

2.3 检查节点是否存在

关于检查节点是否存在的API为:

curator.checkExists().forPath("path")

下面给出对应的示例:

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;
public class NodeExistsTest {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.217.128:2181")
.sessionTimeoutMs(50000)
.connectionTimeoutMs(30000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.namespace("base")
.build();
client.start();
System.out.println("会话建立了......");
Stat stat = client.checkExists().forPath("/test-persistent");
System.out.println("/test-persistent节点是否存在:" + (stat != null));
}
}

执行结果如下:

2.4 获取节点内容

关于获取节点内容的相关API为:

//获取某个节点数据
client.getData().forPath(nodePath)
//读取zookeeper的数据,并放到Stat中
client.getData().storingStatIn(stat1).forPath(nodePath)

上述方法对应参数的含义为:

  • path:指定待读取的节点
  • stat:指定数据节点的节点状态信息

下面给出对应的示例代码:

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;
public class GetNodeDataTest {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.217.128:2181")
.sessionTimeoutMs(50000)
.connectionTimeoutMs(30000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.namespace("base")
.build();
client.start();
System.out.println("会话建立了......");
String path = "/test-persistent";
// 获取节点时同时获取状态信息
Stat stat = new Stat();
byte[] data = client.getData().storingStatIn(stat).forPath(path);
System.out.println("获取到节点的数据内容为:" + new String(data));
System.out.println("获取到节点状态信息为:" + stat);
}
}

执行结果如下:

2.5 获取子节点列表

关于获取子节点列表的API如下:

curator.getChildren().forPath("path")

下面给出对应的示例:

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.util.List;
public class GetChildNodesTest {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.217.128:2181")
.connectionTimeoutMs(50000)
.sessionTimeoutMs(30000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.namespace("base")
.build();
client.start();
System.out.println("会话被创建......");
String path = "/test-persistent";
List<String> children = client.getChildren().forPath(path);
System.out.println("【" + path + "】的子节点列表为:" + children);
}
}

执行结果如下:

2.6 设置或更新节点信息

设置或更新节点内容的API如下:

//更新节点
client.setData().forPath(nodePath, data.getBytes())
//指定版本号,更新节点
client.setData().withVersion(-1).forPath(nodePath, data.getBytes())
//异步设置某个节点数据
client.setData().inBackground().forPath(nodePath, data.getBytes())

下面给出对应的示例:

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;
import java.nio.charset.StandardCharsets;
public class UpdateNodeDataTest {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.217.128:2181")
.connectionTimeoutMs(5000)
.sessionTimeoutMs(3000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.namespace("base")
.build();
client.start();
System.out.println("会话被创建......");
// 状态信息
Stat stat = new Stat();
String path = "/test-persistent";
client.getData().storingStatIn(stat).forPath(path);
System.out.println("获取到的节点状态为:" + stat);
// 更新节点内容
int version = client.setData().withVersion(stat.getAversion()).forPath(path, "修改后的内容".getBytes(StandardCharsets.UTF_8)).getVersion();
System.out.println("当前的最新版本为:" + version);
byte[] data = client.getData().forPath(path);
System.out.println("修改后的节点数据为:" + new String(data));
// 此时会报错
client.setData().withVersion(stat.getVersion()).forPath(path, "再次修改内容".getBytes(StandardCharsets.UTF_8));
}
}

执行结果如下:

2.7 删除节点

删除节点对应的API为:

//删除节点
client.delete().forPath(nodePath);
//删除节点,即使出现网络故障,zookeeper也可以保证删除该节点
client.delete().guaranteed().forPath(nodePath);
//级联删除节点(如果当前节点有子节点,子节点也可以一同删除)   client.delete().deletingChildrenIfNeeded().forPath(nodePath);

下面给出对应的示例:

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class DeleteNodeTest {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.217.128:2181")
.connectionTimeoutMs(50000)
.sessionTimeoutMs(30000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.namespace("base")
.build();
client.start();
System.out.println("会话2被创建......");
String path = "/test-persistent";
client.delete().deletingChildrenIfNeeded().withVersion(-1)
.forPath(path);
System.out.println("节点删除成功:" + path);
}
}

执行结果如下:

2.8 缓存

Zookeeper原生就支持通过注册Watcher来进行事件监听,但基于原生实现的事件监听只能使用一次,开发者需反复注册。为解决该问题,Curator将事件监听封装为Cache,Cache可看作为对事件监听的本地缓存视图,其能为开发者自动重复注册监听。对于Curator而言,其提供了三种Watcher(Cache)来监听结点的变化。

2.8.1 PathChildren Cache

PathChildren Cache 用于监控ZNode子节点的变更. 当子节点增加、 更新,删除时, Path Cache便会改变其状态, 并包含最新的子节点、以及子节点的数据和状态。此外,状态的变更将通过PathChildrenCacheListener来进行通知。

使用时会涉及到四个类,分别为:PathChildrenCache、PathChildrenCacheEvent、PathChildrenCacheListener、ChildData。创建PathCache可通过如下构造函数进行:

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)

当然,若想使用cache,则需调用其start方法,使用完毕后需调用close方法进行关闭。启动时,可通过StartMode来进行设置。对于StartMode而言,可选值如下:

  • NORMAL:表示进行正常的初始化。
  • BUILD_INITIAL_CACHE:表示在调用 start() 前会调用 rebuild()。
  • POST_INITIALIZED_EVENT: 表示当 Cache 初始化后会发送PathChildrenCacheEvent.Type#INITIALIZED 事件此外,还可通过addListener方法添加监听缓存变化的监听器,通过getCurrentData方法获取List对象。

下面给出PathCache使用的示例:

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
public class PathCacheTest {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.217.128:2181")
.sessionTimeoutMs(3000)
.connectionTimeoutMs(5000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.namespace("base")
.build();
client.start();
System.out.println("会话已创建......");
PathChildrenCache cache = new PathChildrenCache(client, "/test-persistent", true);
cache.start();
PathChildrenCacheListener listener = (cli, event) -> {
System.out.println("事件类型为:" + event.getType());
ChildData data = event.getData();
if (data != null) {
System.out.println("节点数据为:" + data.getPath() + " = " + new String(data.getData()));
}
};
cache.getListenable().addListener(listener);
client.create().creatingParentsIfNeeded().forPath("/test-persistent/pathCache1", "01".getBytes(StandardCharsets.UTF_8));
TimeUnit.MILLISECONDS.sleep(10);
client.create().creatingParentsIfNeeded().forPath("/test-persistent/pathCache2", "02".getBytes(StandardCharsets.UTF_8));
TimeUnit.MILLISECONDS.sleep(10);
client.setData().forPath("/test-persistent/pathCache1", "01_V2".getBytes(StandardCharsets.UTF_8));
TimeUnit.MILLISECONDS.sleep(10);
for (ChildData childData : cache.getCurrentData()) {
System.out.println("getCurrentData: " + childData.getPath() + " = " + new String(childData.getData()));
}
client.delete().forPath("/test-persistent/pathCache1");
TimeUnit.MILLISECONDS.sleep(10);
client.delete().forPath("/test-persistent/pathCache2");
TimeUnit.MILLISECONDS.sleep(10);
cache.close();
client.close();
System.out.println("OK");
}
}

执行结果如下:

需要说明的是,若 new PathChildrenCache(client, PATH, true)中的参数cacheData设置为false时,则示例中的event.getData().getData()将返回null,也就是说,此时的cache不会缓存节点数据。

需要说明的是,PathCache无法触发太过频繁的事件。

2.8.2 Node Cache

Node Cache与Path Cache类似,Node Cache只会监听某个特定的节点,其会涉及如下三个类:

  • NodeCache: 表示Node的缓存实现类
  • NodeCacheListener:表示节点监听器
  • ChildData:表示节点数据

需要说明的还是,Node Cache依然需使用start()方法启动,使用完后调用close()方法关闭。通过getCurrentData()方法可获得节点的当前状态,根据其状态可得节点当前的值。

下面给出对应的示例:

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
public class NodeCacheTest {
public static void main(String[] args) throws Exception{
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.217.128:2181")
.connectionTimeoutMs(5000)
.sessionTimeoutMs(3000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.namespace("base")
.build();
client.start();
System.out.println("会话已创建......");
String path = "/test-persistent";
client.create().creatingParentsIfNeeded().forPath(path);
NodeCache cache = new NodeCache(client, path);
NodeCacheListener listener = () -> {
ChildData data = cache.getCurrentData();
if (data != null) {
System.out.println("节点数据:" + new String(data.getData()));
} else {
System.out.println("节点不存在!");
}
};
cache.getListenable().addListener(listener);
cache.start();
client.setData().forPath(path, "01".getBytes(StandardCharsets.UTF_8));
TimeUnit.MILLISECONDS.sleep(100);
client.setData().forPath(path, "02".getBytes(StandardCharsets.UTF_8));
TimeUnit.MILLISECONDS.sleep(100);
client.delete().deletingChildrenIfNeeded().forPath(path);
TimeUnit.SECONDS.sleep(2);
cache.close();
}
}

执行结果如下:

2.8.3 Tree Cache

Tree Cache可监控整棵树的所有节点,包括当前节点和其子节点,可认为是PathCache 和 NodeCache 的组合,其会涉及如下四个类:

  • TreeCache:表示Tree Cache实现类
  • TreeCacheListener:表示监听器类
  • TreeCacheEvent:表示触发的事件类
  • ChildData:表示节点数据

下面给出对应的代码:

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
public class TreeCacheTest {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.217.128:2181")
.sessionTimeoutMs(3000)
.connectionTimeoutMs(5000)
.namespace("base")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
client.start();
System.out.println("会话被创建......");
String path = "/test-persistent";
client.create().creatingParentsIfNeeded().forPath(path);
TreeCache cache = new TreeCache(client, path);
TreeCacheListener listener = (cli, event) -> {
String treePath = event.getData() != null ? event.getData().getPath() : null;
System.out.println("事件类型:" + event.getType() + ", 路径:" + treePath);
TreeCacheEvent.Type eventType = event.getType();
if (TreeCacheEvent.Type.NODE_ADDED == eventType || TreeCacheEvent.Type.NODE_UPDATED == eventType) {
byte[] data = cli.getData().forPath(treePath);
System.out.println("当前节点的数据为:" + new String(data));
}
};
cache.getListenable().addListener(listener);
cache.start();
client.setData().forPath(path, "01".getBytes(StandardCharsets.UTF_8));
TimeUnit.MILLISECONDS.sleep(100);
client.setData().forPath(path, "02".getBytes(StandardCharsets.UTF_8));
TimeUnit.MILLISECONDS.sleep(100);
client.create().forPath(path + "/SubTree", "SubTree".getBytes(StandardCharsets.UTF_8));
client.setData().forPath(path + "/SubTree", "SubTree2".getBytes(StandardCharsets.UTF_8));
TimeUnit.MILLISECONDS.sleep(100);
client.delete().deletingChildrenIfNeeded().forPath(path);
TimeUnit.SECONDS.sleep(2);
cache.close();
client.close();
System.out.println("OK!");
}
}

执行结果如下:

2.8.4 CuratorCache

需要说明的是,在Curator5.0之后,之前的PathCache、NodeCache以及TreeCache均被置为弃用状态。取而代之的则为CuratorCache。

CuratorCache​​会试图将节点的数据保存到本地缓存中。 其既可缓存指定的单个节点,也可缓存以指定节点为根的整个子树(默认缓存方案)。此外,还可为​​CuratorCache​​实例注册监听器,这样,当相关节点发生更改时会接收到通知, 从而响应更新、创建、删除等事件。

下面给出对应的示例:

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
public class CuratorCacheTest {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.217.128:2181")
.sessionTimeoutMs(3000)
.connectionTimeoutMs(5000)
.namespace("base")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
client.start();
System.out.println("会话被创建......");
String path = "/test-persistent";
client.create().creatingParentsIfNeeded().forPath(path);
CuratorCache cache = CuratorCache.build(client, path);
CuratorCacheListener listener = CuratorCacheListener.builder()
.forCreates(node -> {
System.out.println(String.format("Node Created: [%s]", node));
}).forChanges(((oldNode, node) -> {
System.out.println(String.format("Node Changed, Old: [%s], New: [%s]", oldNode, node));
})).forDeletes(oldNode -> {
System.out.println(String.format("Node deleted, Old Value: [%s]", oldNode));
}).forInitialized(() -> System.out.println("Cache initialized!")).build();
cache.listenable().addListener(listener);
cache.start();
client.setData().forPath(path, "01".getBytes(StandardCharsets.UTF_8));
TimeUnit.MILLISECONDS.sleep(100);
client.setData().forPath(path, "02".getBytes(StandardCharsets.UTF_8));
TimeUnit.MILLISECONDS.sleep(100);
client.create().forPath(path + "/SubTree", "SubTree".getBytes(StandardCharsets.UTF_8));
client.setData().forPath(path + "/SubTree", "SubTree2".getBytes(StandardCharsets.UTF_8));
TimeUnit.MILLISECONDS.sleep(100);
client.delete().deletingChildrenIfNeeded().forPath(path);
TimeUnit.SECONDS.sleep(2);
cache.close();
client.close();
System.out.println("OK!");
}
}

执行结果如下:

给TA打赏
共{{data.count}}人
人已打赏
干货分享

Zookeeper使用详解:Zookeeper配置管理和服务管理

2023-8-23 9:13:31

干货分享

Zookeeper使用详解之ZKClient操作篇(为什么要使用ZkClient)

2023-8-23 9:21:00

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索
打开微信,扫描左侧二维码,关注【旅游人lvyouren】,发送【101】获取验证码,输入获取到的验证码即可解锁复制功能,解锁之后可复制网站任意一篇文章,验证码每月更新一次。
提交