Zookeeper API

ZooKeeper 具有针对 Java 和 C 的官方 API 绑定。ZooKeeper 社区为大多数语言(.NET、python 等)提供了非官方 API。 使用 ZooKeeper API,应用程序可以连接、交互、操作数据、协调并最终断开与 ZooKeeper 整体的连接。

ZooKeeper API 具有丰富的功能集,可以以简单且安全的方式获得 ZooKeeper 集合体的所有功能。 ZooKeeper API 提供同步和异步方法。

ZooKeeper ensemble 和 ZooKeeper API 在各个方面都完全互补,并以极大的方式使开发人员受益。 让我们在本章中讨论 Java 绑定。


ZooKeeper API 基础知识

与 ZooKeeper ensemble 交互的应用程序称为 ZooKeeper Client 或简称为 Client

Znode 是 ZooKeeper ensemble 的核心组件,ZooKeeper API 提供了一小组方法来使用 ZooKeeper ensemble 操作 znode 的所有细节。

客户端应该按照下面给出的步骤与 ZooKeeper ensemble 进行清晰干净的交互。

  • 连接到 ZooKeeper 整体。 ZooKeeper ensemble 为客户端分配一个 Session ID。
  • 定期向服务器发送心跳。 否则,ZooKeeper ensemble 会使 Session ID 过期,客户端需要重新连接。
  • 只要会话 ID 处于活动状态,就获取/设置 znode。
  • 完成所有任务后,断开与 ZooKeeper 整体的连接。 如果客户端长时间处于非活动状态,则 ZooKeeper 集成将自动断开客户端连接。

Java绑定

让我们了解本章中最重要的一组 ZooKeeper API。 ZooKeeper API 的核心部分是 ZooKeeper 类。 它提供了在其构造函数中连接 ZooKeeper 集合的选项,并具有以下方法 -

  • connect - 连接到 ZooKeeper 集合
  • create − 创建一个 znode
  • exists - 检查 znode 是否存在及其信息
  • getData - 从特定的 znode 获取数据
  • setData - 在特定的 znode 中设置数据
  • getChildren - 获取特定 znode 中可用的所有子节点
  • delete − 获取特定的 znode 及其所有子节点
  • close - 关闭连接

连接到 ZooKeeper Ensemble

ZooKeeper 类通过其构造函数提供连接功能。 构造函数的签名如下

ZooKeeper(String connectionString, int sessionTimeout, Watcher watcher)
  • connectionString - ZooKeeper 集成主机。
  • sessionTimeout - 以毫秒为单位的会话超时。
  • watcher - 一个实现“Watcher”接口的对象。 ZooKeeper ensemble 通过 watcher 对象返回连接状态。

让我们创建一个新的辅助类 ZooKeeperConnection 并添加一个方法 connectconnect 方法创建一个 ZooKeeper 对象,连接到 ZooKeeper 集合体,然后返回该对象。

这里 CountDownLatch 用于停止(等待)主进程,直到客户端与 ZooKeeper 集合体连接。

ZooKeeper ensemble 通过 Watcher 回调回复连接状态。 一旦客户端连接到 ZooKeeper 集成,就会调用 Watcher 回调,并且 Watcher 回调会调用 CountDownLatchcountDown 方法来释放锁,在主进程中等待。

下面是连接 ZooKeeper 整体的完整代码。

ZooKeeperConnection.java

// import java classes
import java.io.IOException;
import java.util.concurrent.CountDownLatch;

// import zookeeper classes
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.data.Stat;

public class ZooKeeperConnection {

   // declare zookeeper instance to access ZooKeeper ensemble
   private ZooKeeper zoo;
   final CountDownLatch connectedSignal = new CountDownLatch(1);

   // Method to connect zookeeper ensemble.
   public ZooKeeper connect(String host) throws IOException,InterruptedException {
    
      zoo = new ZooKeeper(host,5000,new Watcher() {
        
         public void process(WatchedEvent we) {

            if (we.getState() == KeeperState.SyncConnected) {
               connectedSignal.countDown();
            }
         }
      });
        
      connectedSignal.await();
      return zoo;
   }

   // Method to disconnect from zookeeper server
   public void close() throws InterruptedException {
      zoo.close();
   }
}

保存上面的代码,它将在下一节中用于连接 ZooKeeper 集合体。


创建一个 Znode

ZooKeeper 类提供了创建方法来在 ZooKeeper 集合中创建一个新的 znode。 创建方法的签名如下

create(String path, byte[] data, List<ACL> acl, CreateMode createMode)
  • path - Znode 路径。 例如,/myapp1、/myapp2、/myapp1/mydata1、myapp2/mydata1/myanothersubdata
  • data - 要存储在指定 znode 路径中的数据
  • acl - 要创建的节点的访问控制列表。 ZooKeeper API 提供了一个静态接口 ZooDefs.Ids 来获取一些基本的 acl 列表。 例如,ZooDefs.Ids.OPEN_ACL_UNSAFE 返回打开的 znode 的 acl 列表。
  • createMode - 节点的类型,可以是临时的,顺序的,或者两者兼而有之。 这是一个枚举。

让我们创建一个新的 Java 应用程序来检查 ZooKeeper API 的创建功能。 创建文件 ZKCreate.java。 在 main 方法中,创建一个 ZooKeeperConnection 类型的对象并调用 connect 方法连接到 ZooKeeper 集合体。

connect 方法将返回 ZooKeeper 对象 zk。 现在,使用自定义路径和数据调用 zk 对象的创建方法。

创建 znode 的完整程序代码如下

ZKCreate.java

import java.io.IOException;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;

public class ZKCreate {
   // create static instance for zookeeper class.
   private static ZooKeeper zk;

   // create static instance for ZooKeeperConnection class.
   private static ZooKeeperConnection conn;

   // Method to create znode in zookeeper ensemble
   public static void create(String path, byte[] data) throws 
      KeeperException,InterruptedException {
      zk.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE,
      CreateMode.PERSISTENT);
   }

   public static void main(String[] args) {

      // znode path
      String path = "/MyFirstZnode"; // Assign path to znode

      // data in byte array
      byte[] data = "My first zookeeper app”.getBytes(); // Declare data
        
      try {
         conn = new ZooKeeperConnection();
         zk = conn.connect("localhost");
         create(path, data); // Create the data to the specified path
         conn.close();
      } catch (Exception e) {
         System.out.println(e.getMessage()); //Catch error message
      }
   }
}

编译和执行应用程序后,将在 ZooKeeper 集合中创建具有指定数据的 znode。 我们可以使用 ZooKeeper CLI zkCli.sh 检查它。

$ cd /path/to/zookeeper
$ bin/zkCli.sh
>>> get /MyFirstZnode

Exists——检查 Znode 是否存在

ZooKeeper 类提供了 exists 方法来检查 znode 是否存在。 如果指定的 znode 存在,它返回 znode 的元数据。 存在方法的签名如下

exists(String path, boolean watcher)
  • path - Znode 路径
  • watcher - 指定是否监视指定 znode 的布尔值

让我们创建一个新的 Java 应用程序来检查 ZooKeeper API 的“exists”功能。 创建一个文件“ZKExists.java”。 在 main 方法中,使用 ZooKeeperConnection 对象创建 ZooKeeper 对象“zk”。 然后,用自定义的 path 调用“zk”对象的 exists 方法。 完整代码如下:

ZKExists.java

import java.io.IOException;

import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.data.Stat;

public class ZKExists {
   private static ZooKeeper zk;
   private static ZooKeeperConnection conn;

   // Method to check existence of znode and its status, if znode is available.
   public static Stat znode_exists(String path) throws
      KeeperException,InterruptedException {
      return zk.exists(path, true);
   }

   public static void main(String[] args) throws InterruptedException,KeeperException {
      String path = "/MyFirstZnode"; // Assign znode to the specified path
            
      try {
         conn = new ZooKeeperConnection();
         zk = conn.connect("localhost");
         Stat stat = znode_exists(path); // Stat checks the path of the znode
                
         if(stat != null) {
            System.out.println("Node exists and the node version is " +
            stat.getVersion());
         } else {
            System.out.println("Node does not exists");
         }
                
      } catch(Exception e) {
         System.out.println(e.getMessage()); // Catches error messages
      }
   }
}

编译并执行应用程序后,我们将获得以下输出。

Node exists and the node version is 1.

getData 方法

ZooKeeper 类提供了 getData 方法来获取附加在指定 znode 中的数据及其状态。 getData 方法的签名如下

getData(String path, Watcher watcher, Stat stat)
  • path - Znode 路径。
  • watcher - Watcher 类型的回调函数。 当指定 znode 的数据发生变化时,ZooKeeper ensemble 将通过 Watcher 回调通知。 这是一次性通知。
  • stat - 返回 znode 的元数据。

让我们创建一个新的 Java 应用程序来了解 ZooKeeper API 的 getData 功能。 创建文件 ZKGetData.java。 在 main 方法中,使用 ZooKeeperConnection 对象创建 ZooKeeper 对象 zk。 然后,使用自定义路径调用 zk 对象的 getData 方法。

这是从指定节点获取数据的完整程序代码

ZKGetData.java

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.data.Stat;

public class ZKGetData {

   private static ZooKeeper zk;
   private static ZooKeeperConnection conn;
   public static Stat znode_exists(String path) throws 
      KeeperException,InterruptedException {
      return zk.exists(path,true);
   }

   public static void main(String[] args) throws InterruptedException, KeeperException {
      String path = "/MyFirstZnode";
      final CountDownLatch connectedSignal = new CountDownLatch(1);
        
      try {
         conn = new ZooKeeperConnection();
         zk = conn.connect("localhost");
         Stat stat = znode_exists(path);
            
         if(stat != null) {
            byte[] b = zk.getData(path, new Watcher() {
                
               public void process(WatchedEvent we) {
                    
                  if (we.getType() == Event.EventType.None) {
                     switch(we.getState()) {
                        case Expired:
                        connectedSignal.countDown();
                        break;
                     }
                            
                  } else {
                     String path = "/MyFirstZnode";
                            
                     try {
                        byte[] bn = zk.getData(path,
                        false, null);
                        String data = new String(bn,
                        "UTF-8");
                        System.out.println(data);
                        connectedSignal.countDown();
                            
                     } catch(Exception ex) {
                        System.out.println(ex.getMessage());
                     }
                  }
               }
            }, null);
                
            String data = new String(b, "UTF-8");
            System.out.println(data);
            connectedSignal.await();
                
         } else {
            System.out.println("Node does not exists");
         }
      } catch(Exception e) {
        System.out.println(e.getMessage());
      }
   }
}

编译并执行应用程序后,我们将获得以下输出。

My first zookeeper app

应用程序将等待来自 ZooKeeper ensemble 的进一步通知。 使用 ZooKeeper CLI zkCli.sh 更改指定 znode 的数据。

$ cd /path/to/zookeeper
$ bin/zkCli.sh
>>> set /MyFirstZnode Hello

现在,应用程序将打印以下输出并退出。

Hello

setData 方法

ZooKeeper 类提供了 setData 方法来修改附加在指定 znode 中的数据。 setData 方法的签名如下

setData(String path, byte[] data, int version)
  • path - Znode 路径
  • data - 要存储在指定 znode 路径中的数据。
  • version - znode 的当前版本。 只要数据发生变化,ZooKeeper 就会更新 znode 的版本号。

现在让我们创建一个新的 Java 应用程序来了解 ZooKeeper API 的 setData 功能。 创建文件 ZKSetData.java。 在 main 方法中,使用 ZooKeeperConnection 对象创建 ZooKeeper 对象 zk。 然后,使用指定的路径、新数据和节点版本调用 zk 对象的 setData 方法。

这是修改附加在指定 znode 中的数据的完整程序代码。

ZKSetData.java

import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;

import java.io.IOException;

public class ZKSetData {
   private static ZooKeeper zk;
   private static ZooKeeperConnection conn;

   // Method to update the data in a znode. Similar to getData but without watcher.
   public static void update(String path, byte[] data) throws
      KeeperException,InterruptedException {
      zk.setData(path, data, zk.exists(path,true).getVersion());
   }

   public static void main(String[] args) throws InterruptedException,KeeperException {
      String path= "/MyFirstZnode";
      byte[] data = "Success".getBytes(); //Assign data which is to be updated.
        
      try {
         conn = new ZooKeeperConnection();
         zk = conn.connect("localhost");
         update(path, data); // Update znode data to the specified path
      } catch(Exception e) {
         System.out.println(e.getMessage());
      }
   }
}

一旦应用程序被编译和执行,指定 znode 的数据将被更改,可以使用 ZooKeeper CLI zkCli.sh 来检查它。

$ cd /path/to/zookeeper
$ bin/zkCli.sh
>>> get /MyFirstZnode

getChildren 方法

ZooKeeper 类提供了 getChildren 方法来获取特定 znode 的所有子节点。 getChildren 方法的签名如下

getChildren(String path, Watcher watcher)
  • path - Znode 路径。
  • watcher - “Watcher”类型的回调函数。 当指定的 znode 被删除或 znode 下的子节点被创建/删除时,ZooKeeper ensemble 将发出通知。 这是一次性通知。

ZKGetChildren.java

import java.io.IOException;
import java.util.*;

import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.data.Stat;

public class ZKGetChildren {
   private static ZooKeeper zk;
   private static ZooKeeperConnection conn;

   // Method to check existence of znode and its status, if znode is available.
   public static Stat znode_exists(String path) throws 
      KeeperException,InterruptedException {
      return zk.exists(path,true);
   }

   public static void main(String[] args) throws InterruptedException,KeeperException {
      String path = "/MyFirstZnode"; // Assign path to the znode
        
      try {
         conn = new ZooKeeperConnection();
         zk = conn.connect("localhost");
         Stat stat = znode_exists(path); // Stat checks the path

         if(stat!= null) {

            //“getChildren” method- get all the children of znode.It has two
            args, path and watch
            List <String> children = zk.getChildren(path, false);
            for(int i = 0; i < children.size(); i++)
            System.out.println(children.get(i)); //Print children's
         } else {
            System.out.println("Node does not exists");
         }

      } catch(Exception e) {
         System.out.println(e.getMessage());
      }

   }

}

在运行该程序之前,让我们使用 ZooKeeper CLI zkCli.sh/MyFirstZnode 创建两个子节点。

$ cd /path/to/zookeeper
$ bin/zkCli.sh
>>> create /MyFirstZnode/myfirstsubnode Hi
>>> create /MyFirstZnode/mysecondsubmode Hi

现在,编译运行程序会输出上面创建的znodes。

myfirstsubnode
mysecondsubnode

删除 Znode

ZooKeeper 类提供了 delete 方法来删除指定的 znode。 delete 方法的签名如下

delete(String path, int version)
  • path - Znode 路径。
  • version - znode 的当前版本。

让我们创建一个新的 Java 应用程序来了解 ZooKeeper API 的删除功能。 创建文件 ZKDelete.java。 在 main 方法中,使用 ZooKeeperConnection 对象创建 ZooKeeper 对象 zk。 然后调用指定节点路径和版本的 zk 对象的 delete 方法。

删除 znode 的完整程序代码如下

ZKDelete.java

import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.KeeperException;

public class ZKDelete {
   private static ZooKeeper zk;
   private static ZooKeeperConnection conn;

   // Method to check existence of znode and its status, if znode is available.
   public static void delete(String path) throws KeeperException,InterruptedException {
      zk.delete(path,zk.exists(path,true).getVersion());
   }

   public static void main(String[] args) throws InterruptedException,KeeperException {
      String path = "/MyFirstZnode"; //Assign path to the znode
        
      try {
         conn = new ZooKeeperConnection();
         zk = conn.connect("localhost");
         delete(path); //delete the node with the specified path
      } catch(Exception e) {
         System.out.println(e.getMessage()); // catches error messages
      }
   }
}

查看笔记

扫码一下
查看教程更方便