例如: 主节点创建了一个临时性节点来标记主节点锁,而备份主节点注册一个监视点来监视这个主节点锁是否存在,如果主节点崩溃,主节点锁自动被删除,并通知所有的备份主节点。 一旦备份主节点收到了通知,它们就开始选举,尝试通过争夺创建临时的znode节点来标记主节点锁。
这样一来,监视(watch)和通知形成了一个机制,使得客户端可以观察变化情况,不需要去轮询整个ZooKeeper集合的状态。
事件(event) : 表示一个 znode 节点执行了更新操作。 监视点(watche) : 表示一个与之关联的 znode 节点和事件类型组成的单次触发器(例如,znode 被赋值或者 znode 被删除)。
当一个监视点被一个事件触发时,就会产生一个通知(notification)。通知是注册了监视点的应用客户端收到的事件的报告的消息。
如果应用程序注册了一个监视点来接收通知,匹配该监视点条件的第一个事件会触发监视点的通知,并且最多触发一次。
例如: 当 znode 节点 /master 被删除时,客户端需要知道该变化,客户端在 /master 执行 exists
操作并设置监视点标志位,等待通知,客户端会以回调函数的形式收到通知。
注意:
客户端设置的监视点与会话关联,如果会话过期,则等待中的监视点也将被删除。
不过,监视点可以跨越不同服务端的连接而保持。例如:一个 Zk 客户端与一个 Zk 服务端断开连接后,会话转移到集群中的另一个 Zk 服务端时,客户端会发送未触发的监视点列表,在注册监视点时,服务端将检查已经监视的znode节点在之前注册监视点之后是否发生了变化,如果znode节点已经发生了变化,一个监视点的事件就会被发送给客户端,否则在新的服务端注册监视点。
ZooKeeper 中的 API 的所有操作: getData
、getChildren
和 exists
,均可以选择在读取的 znode 节点上设置监视点。
使用监视机制时,需要实现 Watcher 接口:
public void process(WatchedEvent event);
WatchedEvent
数据结构包括以下信息:
final private KeeperState keeperState;
final private EventType eventType;
private String path;
-
org.apache.zookeeper.Watcher.Event.KeeperState : ZooKeeper 会话状态, 值包含:
- Disconnected (0) : 客户端与Zk服务端失去连接
- SyncConnected (3) : 客户端与Zk服务端处于连接有效状态
- AuthFailed (4) : 身份验证失败的状态
- ConnectedReadOnly (5) : 客户端处于只读连接的状态;客户端连接到一个只读的服务器(即该服务器目前没有连接到大都数)
- SaslAuthenticated (6) : SASL 验证通过状态;由于客户端是通过SASL授权验证连接到服务端的,所以可以通过sasl授权的权限执行Zookeeper操作。
- Expired (-112) : 会话失效
-
org.apache.zookeeper.Watcher.Event.EventType : 事件类型,值包括:
- None (-1) : 无
- NodeCreated (1) : 节点创建
- NodeDeleted (2) : 节点删除
- NodeDataChanged (3) : 节点数据变更
- NodeChildrenChanged (4) :子节点变更
-
如果事件类型不是 None 时, 则还会返回一个 znode 路径
监视点有2种类型: 数据监视点、子节点监视点。
创建、删除 或者 设置一个 znode 节点的数据都会触发数据监视点,
exists
、getData
这2个操作可以设置数据监视点。只有
getChildren
操作可以设置子节点监视点,这种监视点只有在 znode 子节点创建或删除时才被触发。
对于这些事件,可以通过以下调用设置监视点:
-
NodeCreated
通过 exists 调用设置一个监视点
-
NodeDeleted
通过 exists、getData 设置监视点
-
NodeDataChanged
通过 exists、getData 设置监视点
-
NodeChildrenChanged
通过 getChildren 设置监视点
当创建一个 ZooKeeper 对象时(前面几节的内容我们可以看到),需要传递一个默认的Watcher对象,Zk 会使用这个监视点来通知应用Zk状态的变更情况。 而对于Zk节点的事件的通知,你可以使用默认的监视点,也可以单独实现一个。
public byte[] getData(final String path, Watcher watcher, Stat stat) // 传入自定义的监视点watcher
public byte[] getData(String path, boolean watch, Stat stat) // 该方法watch参数传true,则表示使用默认的监视点
stat
入参为Stat 类型的实例化参数, Zk 使用该对象返回指定的 path 参数的 znode 节点信息。
Stat 结构包括 znode 节点的属性信息,如该 znode 节点的属性信息, 如该 znode 节点的上次更新(zxid) 的时间戳,以及该znode节点的子节点数。
Stat 的结构字段信息如下:https://zookeeper.apache.org/doc/r3.4.13/zookeeperProgrammers.html
zxid
表示事务id。
private long czxid; // 导致这个 znode 创建的变更的 zxid
private long mzxid; // 上次变更这个 znode 的变更的 zxid
private long ctime; // 创建这个 znode 的时间,截至目前的毫秒数
private long mtime; // 这个 znode 上次变更至今的毫秒数
private int version; // 这个 znode 的变更的次数
private int cversion; // 这个 znode 的子节点的变更次数
private int aversion; // 这个 znode 的 ACL 变更的次数
private long ephemeralOwner; // 这个 znode 不是临时节点则为0; 这个 znode 是临时节点则表示持有该节点的会话id
private int dataLength; // 这个 znode 的数据长度
private int numChildren; // 这个 znode 的子节点数
private long pzxid; // 上次变更这个 znode 的子节点的变更的 znode
注意事项:
在 Zk 的会话状态和 znode 节点的变化事件中,使用了相同的监视机制来处理应用程序的相关事件的通知。虽然会话状态、znode状态变更 是两个独立的事件集合,为了简便,使用相同的机制传送这些事件。
通过前面的案例,我们总结出在 ZooKeeper 的应用中使用的通用代码的模型:
-
- 进行异步方法调用
-
- 实现回调对象, 并传入异步调用函数中
-
- 如果操作需要设置监视点,实现一个 Watcher 对象, 并传入异步调用函数中
这是一个简单是示例(仅作展示用):
zk.exists("/myZnode", myWatcher, existsCallback, null);
Watcher myWatcher = new Watcher(){
public void process(WatchedEvent e){
// ...
}
}
StatCallback existsCallback = new StateCallback(){
public void processResult(int rc, String path, Object ctx, Stat stat){
// ...
}
};
现在具体通过主-从模式的例子来查看如何处理状态的变化。假设任务处理过程中遇到:
- 管理权变化
- 主节点等待从节点列表的变化 :主节点需要监视从节点
- 主节点等待新任务进行分配 : 主节点需要监视任务节点、任务分配节点
- 从节点等待分配新任务 : 从节点需要监视任务节点、任务分配节点
- 客户端等待任务的执行结果 : 客户端需要监视任务节点、任务分配节点
前面的示例程序中,我们知道,客户端通过创建 /master 节点来推选自己成为主节点(这个过程称为主节点竞选),如果该 /master 节点已经存在,则客户端确认自己不是主节点并返回。但是如果主节点崩溃,我们并不知道,因此,需要在 /master 节点上设置监视点,在节点删除时,ZooKeeper 会通知客户端。
...
static AsyncCallback.StatCallback masterExistsCallback = new AsyncCallback.StatCallback() {
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
switch (KeeperException.Code.get(rc)){
case CONNECTIONLOSS:
checkMaster(); // 连接丢失,则检查 /master 是否存在
break;
case OK:
states = MasterStates.ELECTED; // 宣示主权
log.info("Find the master znode.");
break;
case NODEEXISTS:
states = MasterStates.NOTELECTED;
existsMaster(); // 节点已经存在,则监视主节点
break;
default:
states = MasterStates.NOTELECTED;
log.error("Something went wrong when running for master.", KeeperException.create(KeeperException.Code.get(rc), path));
}
}
};
...
static void existsMaster(){
zk.exists("/master", masterExistsWatcher, masterExistsCallback, null); // 通过 exists 调用,在 /master 设置监视点
}
...
static Watcher masterExistsWatcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
if( event.getType() == Event.EventType.NodeDeleted ){
// 节点已删除
log.info("/master is equals to path: " + "/master".equals(event.getPath()));
runForMaster(); // 节点已经删除,则再次竞选主节点
}
}
};
程序代码见 Master5。
- 在连接丢失事件发生的情况下,客户端检查 /master 节点是否存在,因为客户端并不知道是否能够创建这个节点
- 如果返回 OK, 则输出日志,生产中这时候行使领导权处理某些逻辑
- 如果其他进程已经创建了这个znode节点,客户端需要监视该节点
- 通过 exists 调用,在 /master 设置监视点
- 节点已经删除,则再次竞选主节点
// /master 是否存在的异步调用函数
static AsyncCallback.StatCallback masterExistsCallback = new AsyncCallback.StatCallback() {
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
switch (KeeperException.Code.get(rc)){
case CONNECTIONLOSS:
existsMaster(); // 连接丢失则需要重新设置监视点
break;
case OK:
if( stat == null ){ // 在 create 回调函数执行和 exists 操作执行期间发生了 /master 节点被删除的情况,则如果返回了OK,还需要检查stat对象是否为空,为空则表示已经被删除了
states = MasterStates.RUNNING;
runForMaster();
}
break;
default:
checkMaster();
break;
}
}
};
// 创建 getData 回调方法对象;
static AsyncCallback.DataCallback masterCheckCallback = new AsyncCallback.DataCallback() {
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
switch (KeeperException.Code.get(rc)){
case CONNECTIONLOSS:
checkMaster(); // 连接丢失则继续检查
return;
case NONODE:
runForMaster(); // 节点存在,则尝试创建节点
return;
}
}
};
- 连接丢失的情况下重试
- 如果返回 OK, 则检查 znode 是否存在, 不存在则重新竞选
- 其它情况, 则通过获取节点数据来检查 /master 是否存在(检查是否存在 /master 的回调对象 masterCheckCallback)
为了确认某个时间可用的从节点信息,通过在 Zk 中的 /workers 下添加子节点来注册新的从节点。当一个节点崩溃或者从系统中被移除,如会话过期等情况,需要优雅的自动将对应的 znode 节点删除。优雅实现的从节点会显示地关闭其会话,而不需要 Zk 等待会话过期。
ZooKeeper 异步调用的类继承体系:
+--org.apache.zookeeper.AsyncCallback
+--org.apache.zookeeper.AsyncCallback.StatCallback
+--org.apache.zookeeper.AsyncCallback.DataCallback
+--org.apache.zookeeper.AsyncCallback.ACLCallback
+--org.apache.zookeeper.AsyncCallback.ChildrenCallback
+--org.apache.zookeeper.AsyncCallback.Children2Callback
+--org.apache.zookeeper.AsyncCallback.StringCallback
+--org.apache.zookeeper.AsyncCallback.VoidCallback
+--org.apache.zookeeper.AsyncCallback.MultiCallback
主节点使用 getChildren
来获取有效的从节点列表,同时还会监视这个列表的变化。示例代码:
/***********************************主节点等待从节点列表的变化START****************************************/
static Watcher workerChangeWatcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
if( event.getType() == Event.EventType.NodeChildrenChanged ){
// 子节点变更事件
log.info( "Is /workers's children znode changed: " + "/workers".equalsIgnoreCase(event.getPath()));
getWorkers();
}
}
};
/**
* 获取子节点并设置监视点
*/
static void getWorkers(){
zk.getChildren("/workers", workerChangeWatcher, workersGetChildrenCallback, null);
}
// 子节点变更回调对象
AsyncCallback.ChildrenCallback workersGetChildrenCallback = new AsyncCallback.ChildrenCallback() {
@Override
public void processResult(int rc, String path, Object ctx, List<String> children) {
switch (KeeperException.Code.get(rc)){
case CONNECTIONLOSS:
// 连接丢失,重新获取子节点并设置监视点
getWorkers();
break;
case OK:
// OK
// TODO 重新分配崩溃的节点,并重新设置监视点
default:
log.error("getChildren failed", KeeperException.create(KeeperException.Code.get(rc), path));
}
}
};
/*********************************主节点等待从节点列表的变化END****************************************/
workersGetChildrenCallback
为从节点列表的监视对象- 当发生 CONNECTIONLOSS 事件时,需要重新获取子节点并设置监视点
- 返回 OK,则重新分配崩溃从节点的任务,并重新设置新的从节点的监视点
/***********************************主节点等待从节点列表的变化START****************************************/
static Watcher workerChangeWatcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
if( event.getType() == Event.EventType.NodeChildrenChanged ){
// 子节点变更事件
log.info( "Is /workers's children znode changed: " + "/workers".equalsIgnoreCase(event.getPath()));
getWorkers();
}
}
};
/**
* 获取子节点并设置监视点
*/
static void getWorkers(){
zk.getChildren("/workers", workerChangeWatcher, workersGetChildrenCallback, null);
}
// 子节点变更回调对象
static AsyncCallback.ChildrenCallback workersGetChildrenCallback = new AsyncCallback.ChildrenCallback() {
@Override
public void processResult(int rc, String path, Object ctx, List<String> children) {
switch (KeeperException.Code.get(rc)){
case CONNECTIONLOSS:
// 连接丢失,重新获取子节点并设置监视点
getWorkers();
break;
case OK:
// OK
// TODO 重新分配崩溃的节点,并重新设置监视点
reassignAndSet(children);
default:
log.error("getChildren failed", KeeperException.create(KeeperException.Code.get(rc), path));
}
}
};
// 用于保存上次获得的节点列表的本地缓存
static ChildrenCache childrenCache;
/**
* 重新分配崩溃的节点,并重新设置监视点
*/
static void reassignAndSet(List<String> children){
List<String> process = null;
if( childrenCache == null ){
childrenCache = ChildrenCache.builder().data(children).build();
}else{
log.info("Removing and setting");
process = childrenCache.removeAndSet(children);
}
if( process != null ){
for( String worker : process ){
//TODO getAbsentWorkerTasks(worker);
}
}
}
@Setter
@Getter
@AllArgsConstructor
@NoArgsConstructor
@Builder
class ChildrenCache<E>{
List<E> data;
List<E> removeAndSet(List<E> children){
Iterator<E> it = data.iterator();
while(it.hasNext()){
E ele = it.next();
if( !children.contains(ele) ){
it.remove();
}
}
return data;
}
}
/*********************************主节点等待从节点列表的变化END****************************************/
程序代码见 Master6.java
。
childrenCache
变量用于保存上次获得的从节点列表的本地缓存- 如果第一次使用这个本地缓存,则初始化该变量
- 第一次获得所有节点时,不需要做其他事情
- 如果不是第一次,则检查是否有节点已经被移除了
- 如果有节点被移除了,就需要重新分配任务(待续...)
和主节点等待从节点列表变化一样,主节点等待添加到 /tasks 节点中的任务。 主节点先获得当前的任务集,并设置对应的监视点。 /tasks 的子节点表示任务集,每个子节点对应一个任务,一旦主节点获得还未分配的任务信息,主节点会随机选择一个从节点,将这个任务分配给从节点。
/*********************************主节点等待新任务分配START****************************************/
/**
* 监视任务集的变化
*/
static Watcher tasksChangeWatcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
if( event.getType() == Event.EventType.NodeChildrenChanged ){
// 子节点变更事件
log.info( "Is /tasks's children znode changed: " + "/tasks".equalsIgnoreCase(event.getPath()));
getTasks();
}
}
};
static void getTasks(){
zk.getChildren("/tasks", tasksChangeWatcher, tasksGetChildrenCallback, null);
}
static AsyncCallback.ChildrenCallback tasksGetChildrenCallback = new AsyncCallback.ChildrenCallback() {
@Override
public void processResult(int rc, String path, Object ctx, List<String> children) {
switch (KeeperException.Code.get(rc)){
case CONNECTIONLOSS:
// 连接丢失,重新获取子节点并设置监视点
getTasks();
break;
case OK:
// OK
// TODO 重新分配崩溃的节点,并重新设置监视点
if( children != null ){
assignTasks(children);
}
default:
log.error("getChildren failed", KeeperException.create(KeeperException.Code.get(rc), path));
}
}
};
static void assignTasks(List<String> tasks){
for( String task : tasks ){
getTaskData(task);
}
}
static void getTaskData(String task){
zk.getData("/tasks" + task, false, taskDataCallback, task);
}
static AsyncCallback.DataCallback taskDataCallback = new AsyncCallback.DataCallback() {
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
switch (KeeperException.Code.get(rc)){
case CONNECTIONLOSS:
// 连接丢失则重试
getTaskData((String)ctx);
break;
case OK:
// 随机选择一个从节点分配任务
int worker = new Random().nextInt(workerList.size());
String targetWorker = workerList.get(worker);
// 分配任务给选中的从节点 targetWorker
String assignPath = "/assign/" + targetWorker + "/" + (String)ctx;
createAssignment(assignPath, data);
}
}
};
static void createAssignment( String path, byte[] data ){
zk.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, createAssignCallback, data);
}
static AsyncCallback.StringCallback createAssignCallback = new AsyncCallback.StringCallback() {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
switch (KeeperException.Code.get(rc)){
case CONNECTIONLOSS:
// 失去连接重试
createAssignment(path, (byte[])ctx);
break;
case OK:
log.info("assigned a task:" + path);
// 需要删除节点,因为主节点不需要记忆哪些任务被分配了,它只需关心哪些任务没有被分配
deleteAssignedTask(path); // TODO
break;
default:
log.error("assigned a task failed", KeeperException.create(KeeperException.Code.get(rc), path));
}
}
};
/*********************************主节点等待新任务分配END****************************************/
- 获得任务信息
- 或的任务信息后,异步随机分配到从节点
- 分配任务后,删除任务节点
从节点需要向 ZooKeeper 注册自己,在 /workers 节点下创建一个子节点。
/*********************************从节点注册到Zk START*************************************/
/**
* 通过创建一个 Znode 节点来注册从节点
* 重试时,如果节点已经存在,则我们会收到一个 NODEEXISTS 事件
*/
void register(){
zk.create("/workers/worker-" + serverId,
new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL,
createWorkerCallback,
null);
}
AsyncCallback.StringCallback createWorkerCallback = new AsyncCallback.StringCallback() {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
switch (KeeperException.Code.get(rc)){
case OK:
log.info("Registerd successfully:" + serverId);
break;
case CONNECTIONLOSS:
register();
break;
default:
log.error("Registered failed, something went wrong:" + KeeperException.create(KeeperException.Code.get(rc), path));
}
}
};
/*********************************从节点注册到Zk END*************************************/
- 通过创建一个 Znode 节点来注册从节点
- 重试时,如果节点已经存在,则我们会收到一个 NODEEXISTS 事件
- 添加该 znode 节点时会通知主节点(因为在前面主节点已经对 /workers 下面的子节点设置了监视点)
有个问题: 假如我们先创建的 /workers/worker-id,之后才创建的 /assign/worker-id ,则在这两个动作之间,如果有新的任务,主节点检测到从节点可用,就会尝试分配任务,但是因为 /assign/worker-id 此时可能还没有创建,所以直接创建其子节点(即分配任务 /assign/worker-id/task-id)就会失败。为了解决这个问题,我们必须先创建 /assign/worker-id,并且从节点(/workers/worker-id)需要在 /assign/worker-id 节点上设置监视点来接收新任务的分配通知。
我们理一下:
- Step1: 创建 /master(主节点)、/workers、/tasks、/assign
- Step2: 创建 /workers/worker-id 从节点
- Step3: 创建 /tasks/task-id 任务
- Step4: 创建 /assign/worker-id 从节点接受任务的父节点
- Step5: 创建 /assign/worker-id/tasks-id 分配任务给从节点
- 当收到子节点变化的通知后,获得子节点的列表
- 单独的线程中执行
- 循环子节点列表
- 获得任务信息并执行任务
- 将正在执行的任务添加到执行中列表,防止多次执行
/*********************************从节点监听任务节点START*************************************/
/**执行中的任务列表*/
List<String> runingTasks = Collections.emptyList();
@Override
public void process(WatchedEvent event) {
if( event.getType() == Event.EventType.NodeChildrenChanged ){
// 子节点变化事件
assert new String("/assign/worker-" + serverId).equals(event.getPath());
getTasks();
}
}
void getTasks(){
zk.getChildren("assign/worker-" + serverId,
this,
tasksGetChildrenCallback,
null);
}
AsyncCallback.ChildrenCallback tasksGetChildrenCallback = new AsyncCallback.ChildrenCallback(){
@Override
public void processResult(int rc, String path, Object ctx, List<String> children) {
switch (KeeperException.Code.get(rc)){
case CONNECTIONLOSS:
getTasks();
break;
case OK:
if( children != null ){
// 存在子节点
executorService.execute(new Runnable() {
List<String> children;
DataCallback cb;
public Runnable init(List<String> children, DataCallback cb){
this.children = children;
this.cb = cb;
return this;
}
@Override
public void run() {
log.info("遍历任务");
synchronized (runingTasks){
for( String task: children){
if( runingTasks.contains(task) ){
log.trace("New task:{}", task);
getTaskData(task, cb);
runingTasks.add(task);
}
}
}
}
}.init(children, taskDataCallback));
}
}
}
};
AsyncCallback.DataCallback taskDataCallback = new AsyncCallback.DataCallback() {
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
switch (KeeperException.Code.get(rc)){
case OK:
break;
case CONNECTIONLOSS:
getTaskData(new String(data), (DataCallback)ctx);
}
}
};
void getTaskData(String task, AsyncCallback.DataCallback cb){
zk.getData("assign/worker-" + serverId + "/" + task,
false,
cb,
task);
}
/*********************************从节点监听任务节点END*************************************/