一、概述
ZKClient由Datameer进行开发,其对ZooKeeper的原生API进行了封装,同时还进行了功能增强,新增了如:超时重连,Watcher反复注册等功能。
下面给出一个ZKClient的一个类图:
从上述类图可知,IZKConnection(常用的实现类是ZKConnection)为 ZKClient 与 ZooKeeper 间的适配器。尽管代码中用的为ZKClient,但本质上仍为使用 ZooKeeper进行处理。
相对于原生API而言,ZkClient主要有两方面的改进:
- 解决了watcher的一次性注册问题,其将znode的事件重新定义为三类,分别为:子节点变化、数据变化和连接状态变化,并将watcher的WatchedEvent转换这三类事件。此外,在watcher执行重新读取数据的同时,会再次注册相同的watcher。
- 将Zookeeper的watcher机制转化为更易理解的订阅形式,且该关系可被保持,而非一次性。也就是说,子节点的变化、数据的变化、状态的变化均可被订阅。当watcher使用完毕后,zkClient会自动新增相同的watcher。
下面开始介绍进行操作,其所对应的的依赖如下:
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.11</version>
</dependency>
下面会逐一介绍常用的API。
二、常见操作
2.1 创建会话
创建会话是通过ZKClient的构造方法实现,其构造方法如下:
public ZkClient(String serverstring)
public ZkClient(String zkServers, int connectionTimeout)
public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout)
public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout, ZkSerializer zkSerializer)
public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout, ZkSerializer zkSerializer, long operationRetryTimeout)
public ZkClient(IZkConnection connection)
public ZkClient(IZkConnection connection, int connectionTimeout)
public ZkClient(IZkConnection zkConnection, int connectionTimeout, ZkSerializer zkSerializer)
public ZkClient(IZkConnection zkConnection, int connectionTimeout, ZkSerializer zkSerializer, long operationRetryTimeout)
上述方法中的参数的含义为:
- serverString和zkServers: 表示连接服务的字符串,其格式为:ip:port,若存在多个,则以逗号间隔
- sessionTimeout:表示会话的超时时间,单位为毫秒。
- connectionTimeout: 表示连接的超时时间,单位为毫秒。若超出该时间还未与zookeeper建立连接,则抛出异常。
- zkSerializer:表示序列化器,其默认支持byte[]类型数据,也就是使用Java原生序列化方式进行序列化和反序列化。但默认的序列化的数据格式不大友好,建议使用自定义的序列化器。
- operationRetryTimeout:表示操作重试的超时时间
- zkConnection:为IZkConnection接口的实现类,是对Zookeeper原生接口最直接的包装。IZkConnection有两个实现类,分别为:ZkConnection和InMemoryConnection,最常用的为ZkConnection。
下面给出对应的示例:
import org.I0Itec.zkclient.ZkClient;
public class CreateSessionTest {
/*
借助zkclient完成会话的创建
*/
public static void main(String[] args) {
/*
创建一个zkclient实例就可以完成连接,完成会话的创建
serverString : 服务器连接地址
注意:zkClient通过对zookeeperAPI内部封装,将这个异步创建会话的过程同步化了..
*/
ZkClient zkClient = new ZkClient("192.168.217.128:2181");
System.out.println("会话被创建.....");
}
}
执行结果如下:
2.2 创建节点
对于创建节点而言,可通过如下方法实现:
public String create(final String path, Object data, final CreateMode mode) throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException
public void createEphemeral(final String path) throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException
public void createEphemeral(final String path, final Object data) throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException
public String createEphemeralSequential(final String path, final Object data) throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException
public void createPersistent(String path) throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException
public void createPersistent(String path, boolean createParents) throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException
public void createPersistent(String path, Object data) throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException
public String createPersistentSequential(String path, Object data) throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException
对于上述方法而言,create方法与原生方法的使用类似,其中createPersistent、createEphemeral、createPersistentSequential、createEphemeralSequential 方法是对原生api的封装,这简化了创建节点的方式。
上述方法中各参数的含义为:
- path:表示创建的节点路径。
- data:表示对创建节点所赋值的内容。
- createParents:表示当父节点不存在时,是否创建父节点。
下面给出对应的示例:
import org.I0Itec.zkclient.ZkClient;
import java.util.concurrent.TimeUnit;
public class CreateNodeTest {
public static void main(String[] args) throws Exception {
ZkClient zkClient = new ZkClient("192.168.217.128:2181");
System.out.println("会话被创建......");
// 创建持久节点
zkClient.createPersistent("/test-persistent", "持久节点内容");
// 创建临时节点
zkClient.createEphemeral("/test-ephemeral", "临时节点内容");
// 创建持久有序节点
zkClient.createPersistentSequential("/test-persistent-sequential", "持久有序节点内容");
// 创建临时有序节点
zkClient.createEphemeral("/test-ephemeral-sequential", "临时有序节点内容");
TimeUnit.MINUTES.sleep(1);
}
}
执行结果如下:
此时使用PrettyZoo进行查看,如下:
等系统阻塞解除后,再次使用PrettyZoo进行查看,结果如下:
2.3 检查节点是否存在
对于ZkClient而言,其检查节点是否存在的API如下:
public boolean exists(String path)
该方法与原生API使用类似。下面给出对应的示例:
import org.I0Itec.zkclient.ZkClient;
public class NodeExistsTest {
public static void main(String[] args) throws Exception {
ZkClient zkClient = new ZkClient("192.168.217.128:2181");
System.out.println("会话被创建.....");
boolean exists = zkClient.exists("/test-persistent");
System.out.println("/test-persistent节点是否存在:" + exists);
}
}
执行结果如下:
2.4 获取节点信息
ZkClient获取节点的API如下:
public <T extends Object> T readData(String path)
public <T extends Object> T readData(String path, boolean returnNullIfPathNotExists)
public <T extends Object> T readData(String path, Stat stat)
上述方法中各参数的含义为:
- path:用于指定要读取的节点路径。
- returnNullIfPathNotExists:表示若节点不存在时返回null,而非抛出异常。
- stat:用于指定数据节点的状态信息,与服务器交互时,会被服务端响应的新stat替换。
下面给出对应的示例代码:
import org.I0Itec.zkclient.ZkClient;
public class GetNodeDataTest {
public static void main(String[] args) throws Exception {
ZkClient zkClient = new ZkClient("192.168.217.128:2181");
System.out.println("会话被创建......");
Object data = zkClient.readData("/test-persistent");
System.out.println(data);
}
}
执行结果如下:
2.5 获取子节点列表
关于获取子节点列表信的API如下:
// 获取当前节点的子节点
public List<String> getChildren(String path)
// 获取当前节点的子节点的数量
public int countChildren(String path)
为进行测试,需使用PrettyZoo在/test-persistent节点下新增两个子节点,新增子节点后如下:
下面给出对应的示例:
import org.I0Itec.zkclient.ZkClient;
import java.util.List;
public class GetChildNodesTest {
public static void main(String[] args) throws Exception {
ZkClient zkClient = new ZkClient("192.168.217.128:2181");
System.out.println("会话被创建......");
int count = zkClient.countChildren("/test-persistent");
System.out.println("/test-persistent节点下的子节点数量为:" + count);
List<String> children = zkClient.getChildren("/test-persistent");
System.out.println("/test-persistent节点的子节点数量为:" + children);
}
}
执行结果如下:
2.6 设置或更新节点信息
关于设置或更新节点的方法如下:
// 为节点设置内容
public void writeData(String path, Object data)
// 为节点的某个版本,设置内容
public void writeData(final String path, Object data, final int expectedVersion)
//该方式设置内容更加的灵活,可以使用DataUpdater<T>设置内容
public <T> void updateDataSerialized(String path, DataUpdater<T> updater)
下面给出对应的示例:
import org.I0Itec.zkclient.ZkClient;
public class UpdateNodeDataTest {
public static void main(String[] args) throws Exception {
ZkClient zkClient = new ZkClient("192.168.217.128:2181");
System.out.println("会话被创建......");
String path = "/test-persistent";
Object data = zkClient.readData(path);
System.out.println("修改前的数据为:" + data);
zkClient.writeData(path, "修改后的持久节点内容");
Object data2 = zkClient.readData(path);
System.out.println("修改后的数据为:" + data2);
}
}
执行结果如下:
2.7 删除节点
关于删除节点的API如下:
public boolean delete(final String path)
//根据节点路径,版本号,删除节点
public boolean delete(final String path, final int version)
//递归删除当前节点,以及当前节点的子节点
public boolean deleteRecursive(String path)
deleteRecursive()方法支持删除当前节点的子节点,也即是递归删除
下面给出对应的示例:
import org.I0Itec.zkclient.ZkClient;
public class DeleteNodeTest {
public static void main(String[] args) throws Exception {
ZkClient zkClient = new ZkClient("192.168.217.128:2181");
System.out.println("会话被创建......");
// 删除节点
zkClient.delete("/test-persistent/child-1");
// 递归删除节点
zkClient.deleteRecursive("/test-persistent");
}
}
执行结果如下:
执行后使用PrettyZoo查看,结果如下:
2.8 完整案例
下面给出ZKClient完整的使用示例:
import org.I0Itec.zkclient.ZkClient;
import org.apache.zookeeper.CreateMode;
import java.util.List;
public class ZkClientTest {
private static final String ZK_ADDRESS = "192.168.217.128:2181";
public static void main(String[] args) {
// 建立连接
ZkClient zkClient = new ZkClient(ZK_ADDRESS, 5000, 15000);
// 创建持久化节点
zkClient.createPersistent("/root");
// 判断节点是否存在
if (!zkClient.exists("/root")) {
zkClient.createPersistent("/root", "hello");
// 创建持久化有序的节点
zkClient.createPersistentSequential("/root/sequential", "sequential");
// 原生方式创建
String node = zkClient.create("/root/home", "zkclient data", CreateMode.PERSISTENT);
System.out.println(node);
}
// 创建持久化节点
zkClient.createPersistent("/root/children/node", true);
// 查询节点数据
Object data = zkClient.readData("/root");
System.out.println(data);
// 获取当前节点的子节点
List<String> children = zkClient.getChildren("/root");
children.forEach(System.out::println);
// 获取当前节点的子节点数量
int count = zkClient.countChildren("/root");
System.out.println("子节点个数为:" + count);
// 修改节点
zkClient.writeData("/root", "update hello");
// 使用自定义的序列化方式
zkClient.updateDataSerialized("/root", s -> "update hello2");
// 删除节点
zkClient.delete("/root/home", -1);
// 递归删除节点
zkClient.deleteRecursive("/root");
}
}
执行结果如下: