zk客户端Curator

/ 分布式

简介

"Guava is to Java what Curator is to ZooKeeper" --Patrick Hunt, ZooKeeper committer

这句话是Curator核心代码提交者对Curator形象而生动的定义。

总所周知,zk有原生的api包,那么我们为什么还要使用Curator呢?

因此尽量不要直接使用zk原生客户端。Curator都帮你把坑填好了,又何必费时间去挖坑呢。

样例

下面封装了Curator的常用方法,并给出了使用demo。

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;

/**
 * Curator操作zk客户端
 *
 * @author guangcai.ji
 * @create 2019-09-12
 */
public class ZkClient {

    private CuratorFramework client;
    private static final int sessionTimeout = 10000;

    private static final String zkServerPath = "172.16.0.2:2181,172.16.0.3:2181,172.16.0.4:2181";

    /**
     * 获取客户端
     *
     * @return
     * @throws Exception
     */
    public CuratorFramework obtainClient() throws Exception {
        synchronized (ZkClient.class) {
            if (client != null) {
                return client;
            }
            client = CuratorFrameworkFactory.builder()
                    .connectString(zkServerPath)
                    .sessionTimeoutMs(sessionTimeout)
                    .retryPolicy(new ExponentialBackoffRetry(100, 10, 5000))
                    .build();
            client.start();
            client.blockUntilConnected();
            return client;
        }
    }

    /**
     * 保存(创建或修改)临时数据
     *
     * @param path zk路径
     * @param data 临时数据,失联时自动删除数据
     * @throws Exception
     */
    public void saveTemporaryData(String path, byte[] data) throws Exception {
        CuratorFramework curatorFramework = obtainClient();
        Stat stat = curatorFramework.checkExists().forPath(path);
        if (stat != null) {
            curatorFramework.setData().forPath(path, data);
        } else {
            curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, data);
        }
    }

    /**
     * 保存(创建或修改)数据
     *
     * @param path zk路径
     * @param data 要保存的数据
     * @throws Exception
     */
    public void saveData(String path, byte[] data) throws Exception {
        CuratorFramework curatorFramework = obtainClient();
        Stat stat = curatorFramework.checkExists().forPath(path);
        if (stat != null) {
            curatorFramework.setData().forPath(path, data);
        } else {
            curatorFramework.create().creatingParentsIfNeeded().forPath(path, data);
        }
    }

    /**
     * 删除数据(包含其路径下的子节点)
     *
     * @param path zk路径
     * @throws Exception
     */
    public void deleteData(String path) throws Exception {
        obtainClient().delete().guaranteed().deletingChildrenIfNeeded().forPath(path);
    }

    /**
     * 查询数据
     *
     * @param path zk路径
     * @throws Exception
     */
    public String getData(String path) throws Exception {
        byte[] bytes = obtainClient().getData().forPath(path);
        return new String(bytes, StandardCharsets.UTF_8);
    }

    /**
     * 监控某一节点数据变化
     * <p>
     * 如果节点被删除,则无法触发监听
     *
     * @param path zk路径
     * @throws Exception
     */
    public void nodeListener(String path) throws Exception {
        final NodeCache nodeCache = new NodeCache(obtainClient(), path, false);
        nodeCache.getListenable().addListener(
                () -> {
                    if (nodeCache.getCurrentData() != null) {
                        System.out.println("\n .nodeCache------节点数据发生了改变,发生的路径为:" + nodeCache.getCurrentData().getPath() + ",节点数据发生了改变 ,新的数据为:" + new String(nodeCache.getCurrentData().getData()) + "\n");
                    }
                }
        );
        nodeCache.start(true);
    }

    /**
     * 监控某一节点数据变化
     * <p>
     * 如果节点被删除,则无法触发监听
     *
     * @param path zk路径
     * @throws Exception
     */
    public void pathChildrenListener(String path) throws Exception {
        final PathChildrenCache pathChildrenCache = new PathChildrenCache(obtainClient(), path, true);
        pathChildrenCache.getListenable().addListener(
                (client, event) -> {

                    switch (event.getType()) {
                        case CHILD_ADDED:
                            System.out.println("\n 添加节点 path = " + event.getData().getPath() + " data = " + new String(event.getData().getData(), StandardCharsets.UTF_8) + "\n");
                            break;
                        case CHILD_UPDATED:
                            System.out.println("\n 更新节点 path = " + event.getData().getPath() + " data = " + new String(event.getData().getData(), StandardCharsets.UTF_8) + "\n");
                            break;
                        case CHILD_REMOVED:
                            System.out.println("\n 删除节点 path = " + event.getData().getPath() + " data = " + new String(event.getData().getData(), StandardCharsets.UTF_8) + "\n");
                            break;
                        default:
                            break;

                    }


                }
        );

        // * POST_INITIALIZED_EVENT:异步初始化,初始化之后会触发事件
        // * NORMAL:异步初始化
        // * BUILD_INITIAL_CACHE:同步初始化
        pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
    }


    public static void main(String[] args) throws Exception {

        ZkClient zkClient = new ZkClient();

        CountDownLatch countDownLatch = new CountDownLatch(1);

        zkClient.saveTemporaryData("/test/temporary", "temp".getBytes());

        zkClient.nodeListener("/test/temporary");

        zkClient.pathChildrenListener("/test/temporary");

        zkClient.saveData("/test/temporary", "temp1".getBytes());

        String data = zkClient.getData("/test/temporary");

        System.out.println("getData res = " + data);

        zkClient.saveTemporaryData("/test/temporary", "temp3".getBytes());

        //阻塞
        countDownLatch.await();

    }
}

经典使用场景