1.Zookeeper概念

  • Zookeeper是ApacheHadoop 项目下的一个子项目,是一个树形目录服务。
  • Zookeeper翻译过来就是动物园管理员,他是用来管 Hadoop(大象)、Hive(蜜蜂)、Pig(小 猪)的管理员。简称zk
  • Zookeeper是一个分布式的、开源的分布式应用程序的协调服务。
  • Zookeeper提供的主要功能包括:
    • 配置管理
    • 分布式锁
    • 集群管理

2.png

分布式锁:在一个数据被访问的时候会对此数据进行上锁以免在查询的时候进行修改操作导致数据错乱

1.png!

2.Zookeeper命令操作

1.Zookeeper数据模型

  • ZooKeeper 是一个树形目录服务,其数据模型和Unix的文件系统目录树很类似,拥有一个层次化结构。
  • 这里面的每一个节点都被称为:ZNode,每个节点上都会保存自己的数据和节点信息。
  • 节点可以拥有子节点,同时也允许少量(1MB)数据存储在该节点之下。
  • 节点可以分为四大类:
    • PERSISTENT持久化节点
    • EPHEMERAL临时节点:-e
    • PERSISTENT_SEQUENTIAL持久化顺序节点:-s
    • EPHEMERAL_SEQUENTIAL临时顺序节点 :-es

4.png3.png

2.Zookeeper服务端常用命令

5.png

  • 启动 ZooKeeper服务: ./zkServer.sh start
  • 查看 ZooKeeper服务状态: ./zkServer.sh status
  • 停止 ZooKeeper服务: ./zkServer.sh stop
  • 重启 ZooKeeper服务: ./zkServer.sh restart

3.Zookeeper客户端常用命令

6.png7.png

  • czxid:节点被创建的事务ID
  • ctime: 创建时间
  • mzxid:最后一次被更新的事务ID
  • mtime: 修改时间
  • pzxid:子节点列表最后一次被更新的事务ID
  • cversion:子节点的版本号
  • dataversion:数据版本号
  • aclversion:权限版本号
  • ephemeralOwner:用于临时节点,代表临时节点的事务ID,如果为持久节点则为0
  • dataLength:节点存储的数据的长度
  • numChildren:当前节点的子节点个数

3.Zookeeper JavaAPI操作

1.Curator介绍

  • Curator是ApacheZooKeeper 的Java客户端库。
  • 常见的ZooKeeperJava API :
  • 原生JavaAPI
  • kClient
  • Curator
  • Curator项目的目标是简化 ZooKeeper 客户端的使用。
  • Curator最初是Netfix 研发的,后来捐献了Apache基金会,目前是Apache的顶级项目。
  • 官网:http://curator.apache.org/

2.Curator API 常用操作

①建立连接

public class CuratorTest {

   private CuratorFramework client;
    /**
     * 建立连接
     */
    @Before
    public void testConnect(){
        /*
         * @param connectString 连接字符串 zk 服务器的地址和端口 192.168.31.99:2181
         * @param sessionTimeoutMs  会话超时时间 单位ms
         * @param connectionTimeoutMs   连接超时时间 单位ms
         * @param retryPolicy   重试策略
         * */
        //重试策略
        RetryPolicy retry = new ExponentialBackoffRetry(3000,10);

/*        //1.第一种方式
        CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.149.135:2181",60*1000,15*1000,retry);
        */


        //2.第二种方式
        client = CuratorFrameworkFactory.builder().connectString("192.168.31.99:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(retry)
                .namespace("itheima")
                .build();

        client.start();

    }
//================================创建===================================

    /**
     * 创建节点:create 持久 临时 顺序 数据
     * 1.基本创建:create().forpath("")
     * 2.创建结点 带有数据:create().forPath("",data)
     * 3.设置节点的类型:create()。withMode().forpath("",data)
     * 4.创建多级节点     /app1/p1    create().creatingParentsIfNeeded().forPath("",data)
     */
    @Test
    public void testCreate() throws Exception {
        //1.基本创建
        //如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储
        String path = client.create().forPath("/app1");
        System.out.println(path);
    }

    @Test
    public void testCreate2() throws Exception {
        //1.基本创建
        String path = client.create().forPath("/app2","hehe".getBytes());
        System.out.println(path);
    }
    @Test
    public void testCreate3() throws Exception {
        //1.基本创建
        //默认类型:持久化
        String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");
        System.out.println(path);
    }
    @Test
    public void testCreate4() throws Exception {
        //1.基本创建
        //默认类型:持久化
        //creatingParentsIfNeeded():如果父节点不存在,则创建父节点
        String path = client.create().creatingParentsIfNeeded().forPath("/app4/p1");
        System.out.println(path);
    }
    @After
    public void close(){
        if (client != null){
            client.close();
        }
    }

②添加节点

public class CuratorTest {

   private CuratorFramework client;
    /**
     * 建立连接
     */
    @Before
    public void testConnect(){
        /*
         * @param connectString 连接字符串 zk 服务器的地址和端口 192.168.31.99:2181
         * @param sessionTimeoutMs  会话超时时间 单位ms
         * @param connectionTimeoutMs   连接超时时间 单位ms
         * @param retryPolicy   重试策略
         * */
        //重试策略
        RetryPolicy retry = new ExponentialBackoffRetry(3000,10);

/*        //1.第一种方式
        CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.149.135:2181",60*1000,15*1000,retry);
        */


        //2.第二种方式
        client = CuratorFrameworkFactory.builder().connectString("192.168.31.99:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(retry)
                .namespace("itheima")
                .build();

        client.start();

    }
//================================查询===================================
    /**
     * 查询节点
     * 1.查询数据:get : getData().forPath()
     * 2.查询子节点:ls : getChildren().forPath()
     * 3.查询节点状态信息:ls -s :getData().storingStatIn(status).forPath()
     * @throws Exception
     */
    @Test
    public void testGet() throws Exception {
        //1.查询数据:get
        byte[] data = client.getData().forPath("/app1");
        System.out.println(new String(data));
    }

    @Test
    public void testGet2() throws Exception {
        //2.查询子节点
        List<String> path = client.getChildren().forPath("/app4");
        System.out.println(path);
    }

    @Test
    public void testGet3() throws Exception {
        Stat status = new Stat();
        System.out.println(status);
        //3.查询节点的状态信息
        client.getData().storingStatIn(status).forPath("/app1");
        System.out.println(status);
    }
    @After
    public void close(){
        if (client != null){
            client.close();
        }
    }

③删除节点

public class CuratorTest {

   private CuratorFramework client;
    /**
     * 建立连接
     */
    @Before
    public void testConnect(){
        /*
         * @param connectString 连接字符串 zk 服务器的地址和端口 192.168.31.99:2181
         * @param sessionTimeoutMs  会话超时时间 单位ms
         * @param connectionTimeoutMs   连接超时时间 单位ms
         * @param retryPolicy   重试策略
         * */
        //重试策略
        RetryPolicy retry = new ExponentialBackoffRetry(3000,10);

/*        //1.第一种方式
        CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.149.135:2181",60*1000,15*1000,retry);
        */


        //2.第二种方式
        client = CuratorFrameworkFactory.builder().connectString("192.168.31.99:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(retry)
                .namespace("itheima")
                .build();

        client.start();

    }
//================================查询===================================
    /**
     * 查询节点
     * 1.查询数据:get : getData().forPath()
     * 2.查询子节点:ls : getChildren().forPath()
     * 3.查询节点状态信息:ls -s :getData().storingStatIn(status).forPath()
     * @throws Exception
     */
    @Test
    public void testGet() throws Exception {
        //1.查询数据:get
        byte[] data = client.getData().forPath("/app1");
        System.out.println(new String(data));
    }

    @Test
    public void testGet2() throws Exception {
        //2.查询子节点
        List<String> path = client.getChildren().forPath("/app4");
        System.out.println(path);
    }

    @Test
    public void testGet3() throws Exception {
        Stat status = new Stat();
        System.out.println(status);
        //3.查询节点的状态信息
        client.getData().storingStatIn(status).forPath("/app1");
        System.out.println(status);
    }
    @After
    public void close(){
        if (client != null){
            client.close();
        }
    }

④修改节点

public class CuratorTest {

   private CuratorFramework client;
    /**
     * 建立连接
     */
    @Before
    public void testConnect(){
        /*
         * @param connectString 连接字符串 zk 服务器的地址和端口 192.168.31.99:2181
         * @param sessionTimeoutMs  会话超时时间 单位ms
         * @param connectionTimeoutMs   连接超时时间 单位ms
         * @param retryPolicy   重试策略
         * */
        //重试策略
        RetryPolicy retry = new ExponentialBackoffRetry(3000,10);

/*        //1.第一种方式
        CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.149.135:2181",60*1000,15*1000,retry);
        */


        //2.第二种方式
        client = CuratorFrameworkFactory.builder().connectString("192.168.31.99:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(retry)
                .namespace("itheima")
                .build();

        client.start();

    }
//================================修改===================================

    /**
     * 修改数据
     * 1.修改数据 : setData().forPath()
     * 2.根据版本修改 : setData().withVersion().forPath()
     *      * version: 是通过查询出来的。目的就是为了让其他客户端或者线程不干扰我
     * @throws Exception
     */
    @Test
    public void testSet() throws Exception{
        //修改数据
        client.setData().forPath("/app1", "itcast".getBytes());
    }
    @Test
    public void testSetForVersion() throws Exception{
        Stat stat = new Stat();
        System.out.println(stat);
        int version = stat.getVersion();
        System.out.println(version);
        //修改数据
        client.setData().withVersion(version).forPath("/app4", "gaga".getBytes());
    }
    @After
    public void close(){
        if (client != null){
            client.close();
        }
    }

⑤查询节点

public class CuratorTest {

   private CuratorFramework client;
    /**
     * 建立连接
     */
    @Before
    public void testConnect(){
        /*
         * @param connectString 连接字符串 zk 服务器的地址和端口 192.168.31.99:2181
         * @param sessionTimeoutMs  会话超时时间 单位ms
         * @param connectionTimeoutMs   连接超时时间 单位ms
         * @param retryPolicy   重试策略
         * */
        //重试策略
        RetryPolicy retry = new ExponentialBackoffRetry(3000,10);

/*        //1.第一种方式
        CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.149.135:2181",60*1000,15*1000,retry);
        */


        //2.第二种方式
        client = CuratorFrameworkFactory.builder().connectString("192.168.31.99:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(retry)
                .namespace("itheima")
                .build();

        client.start();

    }
//================================删除===================================

    /**
     * 删除节点:delete deleteall
     * 1.删除单个节点 :delete().forPath("/app1")
     * 2.删除带有子节点的节点 : delete().deletingChildrenIfNeeded().forPath("/app1");
     * 3.删除必须成功 : 为了防止网络抖动。本质就是重试  client.delete().guaranteed().forPath("/app2");
     * 4.回调 : client.delete().guaranteed().inBackground(new BackgroundCallback(new BackgroundCallback(){})
     * @throws Exception
     */
    @Test
    public void testDelete() throws Exception{
        //1.删除单个节点
        client.delete().forPath("/app1");
    }
    @Test
    public void testDelete2() throws Exception{
        //2.删除带有子节点的节点
        client.delete().deletingChildrenIfNeeded().forPath("/app4");
    }
    @Test
    public void testDelete3() throws Exception{
        //3.必须删除成功
        client.delete().guaranteed().forPath("/app2");
    }
    @Test
    public void testDelete4() throws Exception{
        //4.回调
        client.delete().guaranteed().inBackground(new BackgroundCallback() {
            @Override
            public void processResult(CuratorFramework curatorFramework, CuratorEvent event) throws Exception {
                System.out.println("我被删除了~");
                System.out.println(event);
            }
        }).forPath("/app2");
    }
    @After
    public void close(){
        if (client != null){
            client.close();
        }
    }

⑥Watch事件监听

⑦分布式锁实现

4.Zookeeper 集群搭建

5.Zookeeper核心理论

Q.E.D.


窝似嫩叠