上篇文章手拉手带你zookeeper源码-zookeeper集群如何接收客户端的连接?分析了zk通过监控2181端口,集群通过NIO等待客户端连接的方式,本文主要分析如何与客户端进行联系zk集群创建连接
客户端的入口类别是org.apache.zookeeper.ZooKeeper, 如果您使用原始代码zookeeper当客户端与服务段建立连接时,首先需要实例化Zookeeper对象,然后传递zk集群的节点ip和端口号,设置超时时间和监听器
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException { this(connectString, sessionTimeout, watcher, false); }
第一个connectString参数就是zk如果多个集群地址需要用逗号分隔, 如 192.168.1.1:2181,192.168.1.2:2181
第二个sessionTimeout会话超时
第三个参数Watcher这是一个接口,当集群中的数据节点发生变化时,传输到这个接口的实现类回调您的接口方法
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws IOException { LOG.info("Initiating client connection, connectString=" connectString " sessionTimeout=" sessionTimeout " watcher=" watcher); //默认watcher watchManager.defaultWatcher = watcher; //连接在分析配置中zk的字符串 ConnectStringParser connectStringParser = new ConnectStringParser( connectString); //Host // 若是集群多台机器, 连接多个客户端, zk会均匀的把client连接到不同的机器 HostProvider hostProvider = new StaticHostProvider( connectStringParser.getServerAddresses()); cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly); cnxn.start(); }
ConnectStringParser对象是分析传输的连接字符串
public ConnectStringParser(String connectString) { // parse out chroot, if any int off = connectString.indexOf('); if (off >= 0) { String chrootPath = connectString.substring(off); // ignore "/" chroot spec, same as null if (chrootPath.length() == 1) { this.chrootPath = null; } else { PathUtils.validatePath(chrootPath); this.chrootPath = chrootPath; } connectString = connectString.substring(0, off); } else { this.chrootPath = null; } //192.168.1.1:2181,192.168.1.2:2181 String hostsList[] = connectString.split(","); for (String host : hostsList) { int port = DEFAULT_PORT; int pidx = host.lastIndexOf(」; if (pidx >= 0) { // otherwise : is at the end of the string, ignore if (pidx < host.length() - 1) { port = Integer.parseInt(host.substring(pidx 1)); } host = host.substring(0, pidx); } serverAddresses.add(InetSocketAddress.createUnresolved(host, port)); } }
代码的前部分
int off = connectString.indexOf('); if (off >= 0) { String chrootPath = connectString.substring(off); // ignore "/" chroot spec, same as null if (chrootPath.length() == 1) { this.chrootPath = null; } else { PathUtils.validatePath(chrootPath); this.chrootPath = chrootPath; } connectString = connectString.substring(0, off); } else { this.chrootPath = null; }
在配置zk当服务器字符串地址时,可以添加一个路径,因此该连接只能操作该路径下的直接点和数据节点
如 192.168.1.2:2181:/zk/ 就是连接到192.168.1.在这个服务器上,目前的客户端和zk建立连接后,只能对/zk操作下子节点或数据节点
//192.168.1.1:2181,192.168.1.2:2181 String hostsList[] = connectString.split(","); for (String host : hostsList) { int port = DEFAULT_PORT; int pidx = host.lastIndexOf(」; if (pidx >= 0) { // otherwise : is at the end of the string, ignore if (pidx < host.length() - 1) { port = Integer.parseInt(host.substring(pidx 1)); } host = host.substring(0, pidx); } serverAddresses.add(InetSocketAddress.createUnresolved(host, port)); }
解析192.168.1.1:2181,192.168.1.2:2181这样的字符串,把ip分析地址和端口号,然后包装成InetSocketAddress对象,最后加入serverAddresses集合当中
HostProvider hostProvider = new StaticHostProvider( connectStringParser.getServerAddresses());
实例化一个HostProvider当对象最终被调用时,它最终会被调用StaticHostProvider中的init方法
private void init(Collection<InetSocketAddress> serverAddresses) { if (serverAddesses.isEmpty()) {
throw new IllegalArgumentException(
"A HostProvider may not be empty!");
}
this.serverAddresses.addAll(serverAddresses);
//把配置的server address随机打乱
Collections.shuffle(this.serverAddresses);
}
在上面对connectString进行解析的时候,最终把解析出来的ip和端口号信息封装伟InetSocketAddress对象加入到serverAddresses的list集合中,List是有序的,那么只要配置的connectString相同,则解析出来的zk服务器地址集合中的顺序是一样的,所以,此时会造成有的zk节点连接的客户端少,有的多,导致不均匀,所以此处通过shuffle对serverAddress进行打乱,这样可以保证任意一个客户端都可以随机的连接到某台服务器,连接比较均匀
接下来是关键代码
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), canBeReadOnly);
cnxn.start();
创建一个客户端ClientCnxn对象,把上面实例化好的对象全部传递进去
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
this.zooKeeper = zooKeeper;
this.watcher = watcher;
this.sessionId = sessionId;
this.sessionPasswd = sessionPasswd;
this.sessionTimeout = sessionTimeout;
this.hostProvider = hostProvider;
this.chrootPath = chrootPath;
connectTimeout = sessionTimeout / hostProvider.size();
readTimeout = sessionTimeout * 2 / 3;
readOnly = canBeReadOnly;
//发送数据
sendThread = new SendThread(clientCnxnSocket);
//接受zk服务端反向通知过来的event事件
eventThread = new EventThread();
}
最终调用到这个实例化构造方法中,主要看最后的两个线程,一个是sendThread线程,用来发送数据,待会看看这个线程在哪启动的,还有一个eventThread线程,此线程是用来接受zk集群发送回来的事件通知,然后去调用watcher接口的方法
cnxn.start()
public void start() {
sendThread.start();
eventThread.start();
}
启动了sendThread线程和eventThread线程
我们先来看一下sendThread线程启动之后会干写什么事,进入到run方法中
clientCnxnSocket.introduce(this,sessionId);
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
int to;
long lastPingRwServer = Time.currentElapsedTime();
final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
InetSocketAddress serverAddress = null;
上面这几行代码就是维护一下时间,进入while循环中
// 还没有和zk集群创建链接
if (!clientCnxnSocket.isConnected()) {
// 不是第一次链接,则随机sleep一会
if(!isFirstConnect){
try {
Thread.sleep(r.nextInt(1000));
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
}
// don't re-establish connection if we are closing
// 正在关闭
if (closing || !state.isAlive()) {
break;
}
if (rwServerAddress != null) {
serverAddress = rwServerAddress;
rwServerAddress = null;
} else {
//随机选择一台zk服务器
serverAddress = hostProvider.next(1000);
}
//sendThread和服务端建立长连接
startConnect(serverAddress);
clientCnxnSocket.updateLastSendAndHeard();
}
刚开始的,客户端肯定没有和zk集群服务端建立连接
// 不是第一次链接,则随机sleep一会
if(!isFirstConnect){
try {
Thread.sleep(r.nextInt(1000));
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
}
首先判断是不是第一次连接,刚开始肯定是第一次连接啊
// 正在关闭
if (closing || !state.isAlive()) {
break;
}
if (rwServerAddress != null) {
serverAddress = rwServerAddress;
rwServerAddress = null;
} else {
//随机选择一台zk服务器
serverAddress = hostProvider.next(1000);
}
//sendThread和服务端建立长连接
startConnect(serverAddress);
clientCnxnSocket.updateLastSendAndHeard();
判断当前客户端和服务端是否正在关闭,如果正在关闭则直接跳出循环,不能重新连接
然后通过hostProvider.next(1000), 随机选择一个zk服务器
因为有的配置可能是通过主机名而非ip地址进行配置的, 如hostname:port,hostname:port
public InetSocketAddress next(long spinDelay) {
currentIndex = ++currentIndex % serverAddresses.size();
if (currentIndex == lastIndex && spinDelay > 0) {
try {
Thread.sleep(spinDelay);
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
} else if (lastIndex == -1) {
// We don't want to sleep on the first ever connect attempt.
lastIndex = 0;
}
InetSocketAddress curAddr = serverAddresses.get(currentIndex);
try {
String curHostString = getHostString(curAddr);
List<InetAddress> resolvedAddresses = new ArrayList<InetAddress>(Arrays.asList(this.resolver.getAllByName(curHostString)));
if (resolvedAddresses.isEmpty()) {
return curAddr;
}
Collections.shuffle(resolvedAddresses);
return new InetSocketAddress(resolvedAddresses.get(0), curAddr.getPort());
} catch (UnknownHostException e) {
return curAddr;
}
}
那么需要根据hostname去查找对应的ip地址
选择出来要连接的zk服务器之后,调用 startConnect(serverAddress);方法开始连接
private void startConnect(InetSocketAddress addr) throws IOException {
// initializing it for new connection
saslLoginFailed = false;
state = States.CONNECTING;
setName(getName().replaceAll("\\(.*\\)",
"(" + addr.getHostName() + ":" + addr.getPort() + ")"));
// 权限、认证
if (ZooKeeperSaslClient.isEnabled()) {
try {
String principalUserName = System.getProperty(
ZK_SASL_CLIENT_USERNAME, "zookeeper");
zooKeeperSaslClient =
new ZooKeeperSaslClient(
principalUserName+"/"+addr.getHostName());
} catch (LoginException e) {
LOG.warn("SASL configuration failed: " + e + " Will continue connection to Zookeeper server without "
+ "SASL authentication, if Zookeeper server allows it.");
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null));
saslLoginFailed = true;
}
}
logStartConnect(addr);
clientCnxnSocket.connect(addr); // 关键点在这
}
最后调用了clientCnxnSocket.connect(addr)方法
void connect(InetSocketAddress addr) throws IOException {
SocketChannel sock = createSock();
try {
registerAndConnect(sock, addr);
} catch (IOException e) {
LOG.error("Unable to open socket to " + addr);
sock.close();
throw e;
}
initialized = false;
/*
* Reset incomingBuffer
*/
lenBuffer.clear();
incomingBuffer = lenBuffer;
}
这个方法就是和zk集群中的某一台服务器进行链接的,首先通过createSock()创建一个SocketChannel对象
SocketChannel createSock() throws IOException {
SocketChannel sock;
sock = SocketChannel.open();
sock.configureBlocking(false);//异步
sock.socket().setSoLinger(false, -1);
sock.socket().setTcpNoDelay(true);// 不延迟,立即链接/发送数据
return sock;
}
这些是有关nio的相关知识,大家可以去百度nio demo看一下
void registerAndConnect(SocketChannel sock, InetSocketAddress addr)
throws IOException {
sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
boolean immediateConnect = sock.connect(addr);
if (immediateConnect) {
sendThread.primeConnection();
}
}
接着就去注册链接,首先先对OP_CONNECT进行关注,然后通过sock.connect和服务端进行链接
接着判断是否立即链接,我们假设立即链接成功了,此时进入到sendThread.primeConnection();中去
void primeConnection() throws IOException {
isFirstConnect = false;
long sessId = (seenRwServerBefore) ? sessionId : 0;
// 创建一个链接请求
ConnectRequest conReq = new ConnectRequest(0, lastZxid,
sessionTimeout, sessId, sessionPasswd);
synchronized (outgoingQueue) {
if (!disableAutoWatchReset) {
//监听一个znode下数据的变化
List<String> dataWatches = zooKeeper.getDataWatches();
//监听一个znode是否存在
List<String> existWatches = zooKeeper.getExistWatches();
//监听一个znode下子节点的变化
List<String> childWatches = zooKeeper.getChildWatches();
if (!dataWatches.isEmpty()
|| !existWatches.isEmpty() || !childWatches.isEmpty()) {
Iterator<String> dataWatchesIter = prependChroot(dataWatches).iterator();
Iterator<String> existWatchesIter = prependChroot(existWatches).iterator();
Iterator<String> childWatchesIter = prependChroot(childWatches).iterator();
long setWatchesLastZxid = lastZxid;
while (dataWatchesIter.hasNext()
|| existWatchesIter.hasNext() || childWatchesIter.hasNext()) {
List<String> dataWatchesBatch = new ArrayList<String>();
List<String> existWatchesBatch = new ArrayList<String>();
List<String> childWatchesBatch = new ArrayList<String>();
int batchLength = 0;
while (batchLength < SET_WATCHES_MAX_LENGTH) {
final String watch;
if (dataWatchesIter.hasNext()) {
watch = dataWatchesIter.next();
dataWatchesBatch.add(watch);
} else if (existWatchesIter.hasNext()) {
watch = existWatchesIter.next();
existWatchesBatch.add(watch);
} else if (childWatchesIter.hasNext()) {
watch = childWatchesIter.next();
childWatchesBatch.add(watch);
} else {
break;
}
batchLength += watch.length();
}
SetWatches sw = new SetWatches(setWatchesLastZxid,
dataWatchesBatch,
existWatchesBatch,
childWatchesBatch);
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.setWatches);
h.setXid(-8);
Packet packet = new Packet(h, new ReplyHeader(), sw, null, null);
// 把所有的watcher封装成一个packet,加入到待发送的队列中
outgoingQueue.addFirst(packet);
}
}
}
for (AuthData id : authInfo) { // 添加权限相关数据到待发送队列中
outgoingQueue.addFirst(new Packet(new RequestHeader(-4,
OpCode.auth), null, new AuthPacket(0, id.scheme,
id.data), null, null));
}
// 把链接请求对象封装为一个packet加入到待发送的队列中
outgoingQueue.addFirst(new Packet(null, null, conReq,
null, null, readOnly));
}
//关注读写请求
clientCnxnSocket.enableReadWriteOnly();
}
可以看上面的代码中的注释,就是在链接完毕之后,客户端会把要发送的数据先加入到一个outgoingQueue待发送的队列中,然后某个地方会去从此队列中拿数据,然后一条一条的发送给服务端
我们下篇文章继续分析客户端如何从outgoingQueue里面获取数据,并发送给服务端的,同时服务端会做些什么事情,以及会响应什么样的信息