资讯详情

手把手带你撸zookeeper源码-zookeeper客户端如何和zk集群创建连接

上篇文章手拉手带你zookeeper源码-zookeeper集群如何接收客户端的连接?分析了zk通过监控2181端口,集群通过NIO等待客户端连接的方式,本文主要分析如何与客户端进行联系zk集群创建连接

客户端的入口类别是org.apache.zookeeper.ZooKeeper, 如果您使用原始代码zookeeper当客户端与服务段建立连接时,首先需要实例化Zookeeper对象,然后传递zk集群的节点ip和端口号,设置超时时间和监听器

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)         throws IOException     {         this(connectString, sessionTimeout, watcher, false);     }

第一个connectString参数就是zk如果多个集群地址需要用逗号分隔, 如 192.168.1.1:2181,192.168.1.2:2181

第二个sessionTimeout会话超时

第三个参数Watcher这是一个接口,当集群中的数据节点发生变化时,传输到这个接口的实现类回调您的接口方法

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,             boolean canBeReadOnly)         throws IOException     {         LOG.info("Initiating client connection, connectString="   connectString                   " sessionTimeout="   sessionTimeout   " watcher="   watcher);         //默认watcher         watchManager.defaultWatcher = watcher;         //连接在分析配置中zk的字符串         ConnectStringParser connectStringParser = new ConnectStringParser(                 connectString);         //Host         // 若是集群多台机器, 连接多个客户端, zk会均匀的把client连接到不同的机器         HostProvider hostProvider = new StaticHostProvider(                 connectStringParser.getServerAddresses());         cnxn = new ClientCnxn(connectStringParser.getChrootPath(),                 hostProvider, sessionTimeout, this, watchManager,                 getClientCnxnSocket(), canBeReadOnly);         cnxn.start();     }

ConnectStringParser对象是分析传输的连接字符串

public ConnectStringParser(String connectString) {         // parse out chroot, if any         int off = connectString.indexOf(');         if (off >= 0) {             String chrootPath = connectString.substring(off);             // ignore "/" chroot spec, same as null             if (chrootPath.length() == 1) {                 this.chrootPath = null;             } else {                 PathUtils.validatePath(chrootPath);                 this.chrootPath = chrootPath;             }             connectString = connectString.substring(0, off);         } else {             this.chrootPath = null;         }         //192.168.1.1:2181,192.168.1.2:2181         String hostsList[] = connectString.split(",");         for (String host : hostsList) {             int port = DEFAULT_PORT;             int pidx = host.lastIndexOf(」;             if (pidx >= 0) {                 // otherwise : is at the end of the string, ignore                 if (pidx < host.length() - 1) {                     port = Integer.parseInt(host.substring(pidx   1));                 }                 host = host.substring(0, pidx);             }             serverAddresses.add(InetSocketAddress.createUnresolved(host, port));         }     }

代码的前部分

        int off = connectString.indexOf(');         if (off >= 0) {             String chrootPath = connectString.substring(off);             // ignore "/" chroot spec, same as null             if (chrootPath.length() == 1) {                 this.chrootPath = null;             } else {                 PathUtils.validatePath(chrootPath);                 this.chrootPath = chrootPath;             }             connectString = connectString.substring(0, off);         } else {             this.chrootPath = null;         }

在配置zk当服务器字符串地址时,可以添加一个路径,因此该连接只能操作该路径下的直接点和数据节点

如 192.168.1.2:2181:/zk/ 就是连接到192.168.1.在这个服务器上,目前的客户端和zk建立连接后,只能对/zk操作下子节点或数据节点

         //192.168.1.1:2181,192.168.1.2:2181         String hostsList[] = connectString.split(",");         for (String host : hostsList) {             int port = DEFAULT_PORT;             int pidx = host.lastIndexOf(」;             if (pidx >= 0) {                 // otherwise : is at the end of the string, ignore                 if (pidx < host.length() - 1) {                     port = Integer.parseInt(host.substring(pidx   1));                 }                 host = host.substring(0, pidx);             }             serverAddresses.add(InetSocketAddress.createUnresolved(host, port));         }

解析192.168.1.1:2181,192.168.1.2:2181这样的字符串,把ip分析地址和端口号,然后包装成InetSocketAddress对象,最后加入serverAddresses集合当中

HostProvider hostProvider = new StaticHostProvider(                 connectStringParser.getServerAddresses());

实例化一个HostProvider当对象最终被调用时,它最终会被调用StaticHostProvider中的init方法

private void init(Collection<InetSocketAddress> serverAddresses) {         if (serverAddesses.isEmpty()) {
            throw new IllegalArgumentException(
                    "A HostProvider may not be empty!");
        }

        this.serverAddresses.addAll(serverAddresses);
        //把配置的server address随机打乱
        Collections.shuffle(this.serverAddresses);
    }

在上面对connectString进行解析的时候,最终把解析出来的ip和端口号信息封装伟InetSocketAddress对象加入到serverAddresses的list集合中,List是有序的,那么只要配置的connectString相同,则解析出来的zk服务器地址集合中的顺序是一样的,所以,此时会造成有的zk节点连接的客户端少,有的多,导致不均匀,所以此处通过shuffle对serverAddress进行打乱,这样可以保证任意一个客户端都可以随机的连接到某台服务器,连接比较均匀

接下来是关键代码

cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
                hostProvider, sessionTimeout, this, watchManager,
                getClientCnxnSocket(), canBeReadOnly);
        cnxn.start();

创建一个客户端ClientCnxn对象,把上面实例化好的对象全部传递进去

public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
            ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
            long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
            this.zooKeeper = zooKeeper;
            this.watcher = watcher;
            this.sessionId = sessionId;
            this.sessionPasswd = sessionPasswd;
            this.sessionTimeout = sessionTimeout;
            this.hostProvider = hostProvider;
            this.chrootPath = chrootPath;

            connectTimeout = sessionTimeout / hostProvider.size();
            readTimeout = sessionTimeout * 2 / 3;
            readOnly = canBeReadOnly;
            //发送数据
            sendThread = new SendThread(clientCnxnSocket);
            //接受zk服务端反向通知过来的event事件
            eventThread = new EventThread();

    }

最终调用到这个实例化构造方法中,主要看最后的两个线程,一个是sendThread线程,用来发送数据,待会看看这个线程在哪启动的,还有一个eventThread线程,此线程是用来接受zk集群发送回来的事件通知,然后去调用watcher接口的方法

cnxn.start()
    public void start() {
        sendThread.start();
        eventThread.start();
    }

启动了sendThread线程和eventThread线程

我们先来看一下sendThread线程启动之后会干写什么事,进入到run方法中

            clientCnxnSocket.introduce(this,sessionId);
            clientCnxnSocket.updateNow();
            clientCnxnSocket.updateLastSendAndHeard();
            int to;
            long lastPingRwServer = Time.currentElapsedTime();
            final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
            InetSocketAddress serverAddress = null;

上面这几行代码就是维护一下时间,进入while循环中

                    // 还没有和zk集群创建链接
                    if (!clientCnxnSocket.isConnected()) {
                        // 不是第一次链接,则随机sleep一会
                        if(!isFirstConnect){
                            try {
                                Thread.sleep(r.nextInt(1000));
                            } catch (InterruptedException e) {
                                LOG.warn("Unexpected exception", e);
                            }
                        }
                        // don't re-establish connection if we are closing
                        // 正在关闭
                        if (closing || !state.isAlive()) {
                            break;
                        }
                        if (rwServerAddress != null) {
                            serverAddress = rwServerAddress;
                            rwServerAddress = null;
                        } else {
                            //随机选择一台zk服务器
                            serverAddress = hostProvider.next(1000);
                        }
                        //sendThread和服务端建立长连接
                        startConnect(serverAddress);
                        clientCnxnSocket.updateLastSendAndHeard();
                    }

刚开始的,客户端肯定没有和zk集群服务端建立连接

                        // 不是第一次链接,则随机sleep一会
                        if(!isFirstConnect){
                            try {
                                Thread.sleep(r.nextInt(1000));
                            } catch (InterruptedException e) {
                                LOG.warn("Unexpected exception", e);
                            }
                        }

首先判断是不是第一次连接,刚开始肯定是第一次连接啊

                        // 正在关闭
                        if (closing || !state.isAlive()) {
                            break;
                        }
                        if (rwServerAddress != null) {
                            serverAddress = rwServerAddress;
                            rwServerAddress = null;
                        } else {
                            //随机选择一台zk服务器
                            serverAddress = hostProvider.next(1000);
                        }
                        //sendThread和服务端建立长连接
                        startConnect(serverAddress);
                        clientCnxnSocket.updateLastSendAndHeard();

判断当前客户端和服务端是否正在关闭,如果正在关闭则直接跳出循环,不能重新连接

然后通过hostProvider.next(1000), 随机选择一个zk服务器

因为有的配置可能是通过主机名而非ip地址进行配置的, 如hostname:port,hostname:port

public InetSocketAddress next(long spinDelay) {
        currentIndex = ++currentIndex % serverAddresses.size();
        if (currentIndex == lastIndex && spinDelay > 0) {
            try {
                Thread.sleep(spinDelay);
            } catch (InterruptedException e) {
                LOG.warn("Unexpected exception", e);
            }
        } else if (lastIndex == -1) {
            // We don't want to sleep on the first ever connect attempt.
            lastIndex = 0;
        }

        InetSocketAddress curAddr = serverAddresses.get(currentIndex);
        try {
            String curHostString = getHostString(curAddr);
            List<InetAddress> resolvedAddresses = new ArrayList<InetAddress>(Arrays.asList(this.resolver.getAllByName(curHostString)));
            if (resolvedAddresses.isEmpty()) {
                return curAddr;
            }
            Collections.shuffle(resolvedAddresses);
            return new InetSocketAddress(resolvedAddresses.get(0), curAddr.getPort());
        } catch (UnknownHostException e) {
            return curAddr;
        }
    }

那么需要根据hostname去查找对应的ip地址

选择出来要连接的zk服务器之后,调用 startConnect(serverAddress);方法开始连接

private void startConnect(InetSocketAddress addr) throws IOException {
            // initializing it for new connection
            saslLoginFailed = false;
            state = States.CONNECTING;

            setName(getName().replaceAll("\\(.*\\)",
                    "(" + addr.getHostName() + ":" + addr.getPort() + ")"));
            // 权限、认证
            if (ZooKeeperSaslClient.isEnabled()) {
                try {
                    String principalUserName = System.getProperty(
                            ZK_SASL_CLIENT_USERNAME, "zookeeper");
                    zooKeeperSaslClient =
                        new ZooKeeperSaslClient(
                                principalUserName+"/"+addr.getHostName());
                } catch (LoginException e) {
                    LOG.warn("SASL configuration failed: " + e + " Will continue connection to Zookeeper server without "
                      + "SASL authentication, if Zookeeper server allows it.");
                    eventThread.queueEvent(new WatchedEvent(
                      Watcher.Event.EventType.None,
                      Watcher.Event.KeeperState.AuthFailed, null));
                    saslLoginFailed = true;
                }
            }
            logStartConnect(addr);

            clientCnxnSocket.connect(addr); // 关键点在这
        }

最后调用了clientCnxnSocket.connect(addr)方法

void connect(InetSocketAddress addr) throws IOException {
        SocketChannel sock = createSock();
        try {
           registerAndConnect(sock, addr);
        } catch (IOException e) {
            LOG.error("Unable to open socket to " + addr);
            sock.close();
            throw e;
        }
        initialized = false;

        /*
         * Reset incomingBuffer
         */
        lenBuffer.clear();
        incomingBuffer = lenBuffer;
    }

这个方法就是和zk集群中的某一台服务器进行链接的,首先通过createSock()创建一个SocketChannel对象

 SocketChannel createSock() throws IOException {
        SocketChannel sock;
        sock = SocketChannel.open();
        sock.configureBlocking(false);//异步
        sock.socket().setSoLinger(false, -1);
        sock.socket().setTcpNoDelay(true);// 不延迟,立即链接/发送数据
        return sock;
    }

这些是有关nio的相关知识,大家可以去百度nio demo看一下

void registerAndConnect(SocketChannel sock, InetSocketAddress addr) 
    throws IOException {
        sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
        boolean immediateConnect = sock.connect(addr);
        if (immediateConnect) {
            sendThread.primeConnection();
        }
    }

接着就去注册链接,首先先对OP_CONNECT进行关注,然后通过sock.connect和服务端进行链接

接着判断是否立即链接,我们假设立即链接成功了,此时进入到sendThread.primeConnection();中去

void primeConnection() throws IOException {
            isFirstConnect = false;
            long sessId = (seenRwServerBefore) ? sessionId : 0;
            // 创建一个链接请求
            ConnectRequest conReq = new ConnectRequest(0, lastZxid,
                    sessionTimeout, sessId, sessionPasswd);
            synchronized (outgoingQueue) {
                if (!disableAutoWatchReset) {
                    //监听一个znode下数据的变化
                    List<String> dataWatches = zooKeeper.getDataWatches();
                    //监听一个znode是否存在
                    List<String> existWatches = zooKeeper.getExistWatches();
                    //监听一个znode下子节点的变化
                    List<String> childWatches = zooKeeper.getChildWatches();
                    if (!dataWatches.isEmpty()
                                || !existWatches.isEmpty() || !childWatches.isEmpty()) {

                        Iterator<String> dataWatchesIter = prependChroot(dataWatches).iterator();
                        Iterator<String> existWatchesIter = prependChroot(existWatches).iterator();
                        Iterator<String> childWatchesIter = prependChroot(childWatches).iterator();
                        long setWatchesLastZxid = lastZxid;

                        while (dataWatchesIter.hasNext()
                                       || existWatchesIter.hasNext() || childWatchesIter.hasNext()) {
                            List<String> dataWatchesBatch = new ArrayList<String>();
                            List<String> existWatchesBatch = new ArrayList<String>();
                            List<String> childWatchesBatch = new ArrayList<String>();
                            int batchLength = 0;

                            while (batchLength < SET_WATCHES_MAX_LENGTH) {
                                final String watch;
                                if (dataWatchesIter.hasNext()) {
                                    watch = dataWatchesIter.next();
                                    dataWatchesBatch.add(watch);
                                } else if (existWatchesIter.hasNext()) {
                                    watch = existWatchesIter.next();
                                    existWatchesBatch.add(watch);
                                } else if (childWatchesIter.hasNext()) {
                                    watch = childWatchesIter.next();
                                    childWatchesBatch.add(watch);
                                } else {
                                    break;
                                }
                                batchLength += watch.length();
                            }

                            SetWatches sw = new SetWatches(setWatchesLastZxid,
                                    dataWatchesBatch,
                                    existWatchesBatch,
                                    childWatchesBatch);
                            RequestHeader h = new RequestHeader();
                            h.setType(ZooDefs.OpCode.setWatches);
                            h.setXid(-8);
                            Packet packet = new Packet(h, new ReplyHeader(), sw, null, null);
                            // 把所有的watcher封装成一个packet,加入到待发送的队列中
                            outgoingQueue.addFirst(packet);
                        }
                    }
                }

                for (AuthData id : authInfo) { // 添加权限相关数据到待发送队列中
                    outgoingQueue.addFirst(new Packet(new RequestHeader(-4,
                            OpCode.auth), null, new AuthPacket(0, id.scheme,
                            id.data), null, null));
                }
                // 把链接请求对象封装为一个packet加入到待发送的队列中
                outgoingQueue.addFirst(new Packet(null, null, conReq,
                            null, null, readOnly));
            }
            //关注读写请求
            clientCnxnSocket.enableReadWriteOnly();
        }

可以看上面的代码中的注释,就是在链接完毕之后,客户端会把要发送的数据先加入到一个outgoingQueue待发送的队列中,然后某个地方会去从此队列中拿数据,然后一条一条的发送给服务端

 

我们下篇文章继续分析客户端如何从outgoingQueue里面获取数据,并发送给服务端的,同时服务端会做些什么事情,以及会响应什么样的信息

 

标签: 164zk10连接器

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台