1. ZK选举算法代码实现分析
ZK内部有三种选举算法,即LeaderElection,FastLeaderElection和AuthLeaderElection,FastLeaderElection和AuthLeaderElection这是一种类似的选举算法,唯一的区别是后者添加了认证信息,但AuthLeaderElection已被ZK弃用, FastLeaderElection比LeaderElection更高效,LeaderElection在3.4.0后版本不再推荐使用,后续版本只保留FastLeaderElection选举算法。
以前解释过基础FastLeaderElection选择算法的主流程,FastLeaderElection如何实现?
-
Election接口源码:
public interface Election { public Vote lookForLeader() throws InterruptedException; public void shutdown(); }
-
FastLeaderElection实现了Election接口,它是标准的Fast Paxos基于算法的实现TCP协议进行选举。
FastLeaderElection分别有三个重要的内部类别Notification、ToSend、Messenger。
... static ByteBuffer buildMsg(int state, long leader, long zxid, long electionEpoch, long epoch) { byte requestBytes[] = new byte[40]; ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes); /* * Building notification packet to send */ requestBuffer.clear(); requestBuffer.putInt(state); requestBuffer.putLong(leader); requestBuffer.putLong(zxid); requestBuffer.putLong(electionEpoch) requestBuffer.putLong(epoch); requestBuffer.putInt(Notification.CURRENTVERSION); return requestBuffer; } ...
Notification表示收到的选举投票信息(其他服务器发来的选举投票信息),其包含了被选举者的id、zxid、选举周期等信息,其buildMsg方法将选举信息封装至ByteBuffer中再进行发送。
static public class ToSend { static enum mType { crequest, challenge, notification, ack} ToSend(mType type, long leader, long zxid, long electionEpoch, ServerState state, long sid, long peerEpoch) { this.leader = leader; this.zxid = zxid; this.electionEpoch = electionEpoch; this.state = state; this.sid = sid; this.peerEpoch = peerEpoch; } /* * Proposed leader in the case of notification */ //被推举的leader的id long leader; /* * id contains the tag for acks, and zxid for notifications */ // 被推举的leader的最大事务id long zxid; /* * Epoch */ // 推举者的选举周期 long electionEpoch; /* * Current state; */ // 推举者的状态 QuorumPeer.ServerState state; /* * Address of recipient */ // 推举者的id long sid; /* * Leader epoch */ // 被推举的leader的选举周期 long peerEpoch; }
ToSend类是发送给其他服务器的选举投票信息,包含了被选举者的id、zxid、选举周期等信息。
Messenger包含了WorkerReceiver和WorkerSender两个内部类。分别表示选票发送器和选票接收器。
protected class Messenger { // 选票发送器 WorkerSender ws; // 选票接收器 WorkerReceiver wr; }
WorkerReceiver实现了Runnable接口,是选票接收器。其会不断地从QuorumCnxManager中获取其他服务器发来的选举消息,并将其转换成一个选票,然后保存到recvqueue中,在选票接收过程中,如果发现该外部选票的选举轮次小于当前服务器的,那么忽略该外部投票,同时立即发送自己的内部投票。
WorkerReceiver的主要逻辑在run方法中实现,首先会从QuorumCnxManager中的recvQueue队列中取出其他服务器发来的选举消息,消息封装在Message数据结构中。
然后判断消息中的服务器id是否包含在可以投票的服务器集合中,若不是,则会将本服务器的内部投票发送给该服务器,代码流程如下:
if(!self.getVotingView().containsKey(response.sid)){ // 当前的投票者集合不包含服务器 // 获取自己的投票 Vote current = self.getCurrentVote(); // 构造ToSend消息 ToSend notmsg = new ToSend(ToSend.mType.notification, current.getId(), current.getZxid(), logicalclock, self.getPeerState(), response.sid, current.getPeerEpoch()); // 放入sendqueue队列,等待发送 sendqueue.offer(notmsg); }
如果是包含在可投票的服务器集合中,则根据消息(Message)解析出投票服务器的投票信息并将其封装为Notification。
1) 如果当前服务器为LOOKING状态,则直接将Notification放入FastLeaderElection的recvqueue(区别于recvQueue)中。接下来,如果其选举周期小于当前服务器的逻辑时钟,则将当前服务器的内部投票发送给从队列中取出的服务器,否则,直接忽略掉该投票。代码实现流程如下:
if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){ // 本服务器为LOOKING状态 // 将消息放入recvqueue中 recvqueue.offer(n); /* * Send a notification back if the peer that sent this * message is also looking and its logical clock is * lagging behind. */ if((ackstate == QuorumPeer.ServerState.LOOKING) // 推选者服务器为LOOKING状态 && (n.electionEpoch < logicalclock)){ // 选举周期小于逻辑时钟 // 创建新的投票 Vote v = getVote(); // 构造新的发送消息(本服务器自己的投票) ToSend notmsg = new ToSend(ToSend.mType.notification, v.getId(), v.getZxid(), logicalclock, self.getPeerState(), response.sid, v.getPeerEpoch()); // 将发送消息放置于队列,等待发送 sendqueue.offer(notmsg); } }
2) 如果当前服务器的状态不为LOOKING,则会根据投票服务器中解析的version信息来构造ToSend消息,放入sendqueue,等待发送,流程如下:
else { // 本服务器状态不为LOOKING /* * If this server is not looking, but the one that sent the ack * is looking, then send back what it believes to be the leader. */ // 获取当前投票 Vote current = self.getCurrentVote(); if(ackstate == QuorumPeer.ServerState.LOOKING){ // 为LOOKING状态 if(LOG.isDebugEnabled()){ LOG.debug("Sending new notification. My id = " + self.getId() + " recipient=" + response.sid + " zxid=0x" + Long.toHexString(current.getZxid()) + " leader=" + current.getId()); } ToSend notmsg; if(n.version > 0x0) { // 版本号大于0 // 构造ToSend消息 notmsg = new ToSend( ToSend.mType.notification, current.getId(), current.getZxid(), current.getElectionEpoch(), self.getPeerState(), response.sid, current.getPeerEpoch()); } else { // 版本号不大于0 // 构造ToSend消息 Vote bcVote = self.getBCVote(); notmsg = new ToSend( ToSend.mType.notification, bcVote.getId(), bcVote.getZxid(), bcVote.getElectionEpoch(), self.getPeerState(), response.sid, bcVote.getPeerEpoch()); } // 将发送消息放置于队列,等待发送 sendqueue.offer(notmsg); } }
class WorkerSender implements Runnable { // 是否终止 volatile boolean stop; // 服务器之间的连接 QuorumCnxManager manager; // 构造器 WorkerSender(QuorumCnxManager manager){ // 初始化属性 this.stop = false; this.manager = manager; } public void run() { while (!stop) { // 不终止 try { // 从sendqueue中取出ToSend消息 ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS); // 若为空,则跳过 if(m == null) continue; // 不为空,则进行处理 process(m); } catch (InterruptedException e) { break; } } LOG.info("WorkerSender is down"); } /** * Called by run() once there is a new message to send. * * @param m message to send */ void process(ToSend m) { // 构建消息 ByteBuffer requestBuffer = buildMsg(m.state.ordinal(), m.leader, m.zxid, m.electionEpoch, m.peerEpoch); // 发送消息 manager.toSend(m.sid, requestBuffer); } }
WorkerSender也实现了Runnable接口,为选票发送器,其会不断地从sendqueue中获取待发送的选票,并将其传递到底层QuorumCnxManager中。
-
private void sendNotifications() { for (QuorumServer server : self.getVotingView().values()) { // 遍历投票参与者集合 long sid = server.id; // 构造发送消息 ToSend notmsg = new ToSend(ToSend.mType.notification, proposedLeader, proposedZxid, logicalclock, QuorumPeer.ServerState.LOOKING, sid, proposedEpoch); if(LOG.isDebugEnabled()){ LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x" + Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock) + " (n.round), " + sid + " (recipient), " + self.getId() + " (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)"); } // 将发送消息放置于队列 sendqueue.offer(notmsg); } }
该方法会遍历所有的参与者投票集合,然后将自己的选票信息发送至上述所有的投票者集合,为提升性能,这里并非同步发送,而是将ToSend消息放置于sendqueue中,之后由WorkerSender进行发送。
-
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) { LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" + Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid)); if(self.getQuorumVerifier().getWeight(newId) == 0){ // 使用计票器判断当前服务器的权重是否为0 return false; } /* * We return true if one of the following three cases hold: * 1- New epoch is higher * 2- New epoch is the same as current epoch, but new zxid is higher * 3- New epoch is the same as current epoch, new zxid is the same * as current zxid, but server id is higher. */ // 1. 判断消息里的epoch是不是比当前的大,如果大则消息中id对应的服务器就是leader // 2. 如果epoch相等则判断zxid,如果消息里的zxid大,则消息中id对应的服务器就是leader // 3. 如果前面两个都相等那就比较服务器id,如果大,则其就是leader return ((newEpoch > curEpoch) || ((newEpoch == curEpoch) && ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId))))); }
该接口逻辑是将接收的投票与自身投票进行PK判断,查看是否消息中包含的服务器id是否更优,其按照epoch、zxid、id的优先级进行PK。
-
protected boolean termPredicate( HashMap<Long, Vote> votes, Vote vote) { HashSet<Long> set = new HashSet<Long>(); /* * First make the views consistent. Sometimes peers will have * different zxids for a server depending on timing. */ for (Map.Entry<Long,Vote> entry : votes.entrySet()) { // 遍历已经接收的投票集合 if (vote.equals(entry.getValue())){ // 将等于当前投票的项放入set set.add(entry.getKey()); } } //统计set,查看投某个id的票数是否超过一半 return self.getQuorumVerifier().containsQuorum(set); }
这个接口作用是用于判断Leader选举是否结束,即是否有一半以上的服务器选出了相同的Leader,其过程是将收到的选票与当前选票进行对比,选票相同的放入同一个集合,之后判断选票相同的集合是否超过了半数。
-
protected boolean checkLeader( HashMap<Long, Vote> votes, long leader, long electionEpoch){ boolean predicate = true; /* * If everyone else thinks I'm the leader, I must be the leader. * The other two checks are just for the case in which I'm not the * leader. If I'm not the leader and I haven't received a message * from leader stating that it is leading, then predicate is false. */ if(leader != self.getId()){ // 自己不为leader if(votes.get(leader) == null) predicate = false; // 还未选出leader else if(votes.get(leader).getState() != ServerState.LEADING) predicate = false; // 选出的leader还未给出ack信号,其他服务器还不知道leader } else if(logicalclock != electionEpoch) { // 逻辑时钟不等于选举周期 predicate = false; } return predicate; }
该接口会根据相关条件检查是否已经完成了Leader的选举,此时Leader的状态应该是LEADING状态。
-
该接口用于开始新一轮的Leader主节点选举,首先会将逻辑时钟做自增处理,然后更新本服务器上的选票(即初始化选票),之后将选票信息放入sendqueue队列中等待发送给其他服务器,代码如下:
synchronized(this){ // 更新逻辑时钟,每进行一轮新的leader选举,都需要更新逻辑时钟 logicalclock++; // 更新选票(初始化选票) updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } LOG.info("New election. My id = " + self.getId() + ", proposed zxid=0x" + Long.toHexString(proposedZxid)); // 向其他服务器发送自己的选票(已更新的选票) sendNotifications();
之后每台服务器会不断地从recvqueue队列中获取外部选票。如果服务器发现无法获取到任何的外部投票,就会立即确认自己是否和集群中其他服务器保持着有效的连接,如果没有连接,则马上建立连接,如果已经建立了连接,则再次发送自己当前的内部投票。代码如下:
// 从recvqueue接收队列中取出投票 Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS); /* * Sends more notifications if haven't received enough. * Otherwise processes new notification. */ if(n == null){ // 无法获取选票 if(manager.haveDelivered()){ // manager已经发送了所有选票消息(表示有连接) // 向所有其他服务器发送消息 sendNotifications(); } else { // 还未发送所有消息(表示无连接) // 连接其他每个服务器 manager.connectAll(); } /* * Exponential backoff */ int tmpTimeOut = notTimeout*2; notTimeout = (tmpTimeOut < maxNotificationInterval? tmpTimeOut : maxNotificationInterval); LOG.info("Notification time out: " + notTimeout); }
在发送完初始化选票之后,接下来开始处理外部投票。在处理外部投票时,会根据选举轮次来进行不同的处理:
- 外部投票的选举轮次大于内部投票:若服务器自身的选举轮次落后于该外部投票对应服务器的选举轮次,那么就会立即更新自己的选举轮次(logicalclock),并且清空所有已经收到的投票,然后使用初始化的投票与外部投票做比对,判定是否变更内部投票。处理完成之后,再将内部投票发送出去。
- 外部投票的选举轮次小于内部投票:若服务器接收的外选票的选举轮次落后于自身的选举那么Zookeeper就会直接忽略该外部投票,不做任何处理。
- 外部投票的选举轮次等于内部投票:此时可以开始进行选票PK,如果外部消息中的选票更优,则需要更新本服务器内部选票,再发送给其他服务器。
无论是否变更了投票,都会将刚收到的外部投票放入选票集合recvset中进行归档,其中recvset用于记录当前服务器在本轮次的Leader选举中收到的所有外部投票,然后开始统计投票,统计投票目的是为了统计集群中是否已经有过半的服务器认可了当前的内部投票,如果确定已经有过半服务器认可了该投票,然后再进行最后一次确认,判断是否又有更优的选票产生,若无,则终止投票,确定最终的选票。
// recvset用于记录当前服务器在本轮次的Leader选举中收到的所有外部投票 recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock, proposedEpoch))) { // 若能选出leader // Verify if there is any change in the proposed leader while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){ // 遍历已经接收的投票集合 if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)){ // 选票有变更,比之前提议的Leader有更好的选票加入 // 将更优的选票放在recvset中 recvqueue.put(n); break; } } /* * This predicate is true once we don't read any new * relevant message from the reception queue */ if (n == null) { // 表示之前提议的Leader已经是最优的 // 设置服务器状态 self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState()); // 最终的选票 Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock, proposedEpoch); // 清空recvqueue队列的选票 leaveInstance(endVote); // 返回选票 return endVote; }
2. ZK的持久化机制实现
-
:
· TxnLog,接口类型,读取事务性日志的接口。
· FileTxnLog,实现TxnLog接口,添加了访问该事务性日志的API。
· Snapshot,接口类型,持久层快照接口。
· FileSnap,实现Snapshot接口,负责存储、序列化、反序列化、访问快照。
· FileTxnSnapLog,封装了TxnLog和SnapShot。
· Util,工具类,提供持久化所需的API。
-
public interface TxnLog { /** * roll the current * log being appended to * @throws IOException */ // 回滚日志 void rollLog() throws IOException; /** * Append a request to the transaction log * @param hdr the transaction header * @param r the transaction itself * returns true iff something appended, otw false * @throws IOException */ // 添加一个请求至事务性日志 boolean append(TxnHeader hdr, Record r) throws IOException; /** * Start reading the transaction logs * from a given zxid * @param zxid * @return returns an iterator to read the * next transaction in the logs. * @throws IOException */ // 读取事务性日志 TxnIterator read(long zxid) throws IOException; /** * the last zxid of the logged transactions. * @return the last zxid of the logged transactions. * @throws IOException */ // 事务性操作的最新zxid long getLastLoggedZxid() throws IOException; /** * truncate the log to get in sync with the * leader. * @param zxid the zxid to truncate at. * @throws IOException */ // 清空日志,与Leader保持同步 boolean truncate(long zxid) throws IOException; /** * the dbid for this transaction log. * @return the dbid for this transaction log. * @throws IOException */ // 获取数据库的id long getDbId() throws IOException; /** * commmit the trasaction and make sure * they are persisted * @throws IOException */ // 提交事务并进行确认 void commit() throws IOException; /** * close the transactions logs */ // 关闭事务性日志 void close() throws IOException; /** * an iterating interface for reading * transaction logs. */ // 读取事务日志的迭代器接口 public interface TxnIterator { /** * return the transaction header. * @return return the transaction header. */ // 获取事务头部 TxnHeader getHeader(); /** * return the transaction record. * @return return the transaction record. */ // 获取事务 Record getTxn(); /** * go to the next transaction record. * @throws IOException */ // 下个事务 boolean next() throws IOException; /** * close files and release the * resources * @throws IOException */ // 关闭文件释放资源 void close() throws IOException; } }
TxnLog除了提供读写事务日志的API外,还提供了一个用于读取日志的迭代器接口TxnIterator。
-
public class FileTxnLog implements TxnLog { private static final Logger LOG; // 预分配大小 64M static long preAllocSize = 65536 * 1024; // 魔术数字,默认为1514884167 public final static int TXNLOG_MAGIC = ByteBuffer.wrap("ZKLG".getBytes()).getInt(); // 版本号 public final static int VERSION = 2; /** Maximum time we allow for elapsed fsync before WARNing */ // 进行同步时,发出warn之前所能等待的最长时间 private final static long fsyncWarningThresholdMS; // 静态属性,确定Logger、预分配空间大小和最长时间 static { LOG = LoggerFactory.getLogger(FileTxnLog.class); String size = System.getProperty("zookeeper.preAllocSize"); if (size != null) { try { preAllocSize = Long.parseLong(size) * 1024; } catch (NumberFormatException e) { LOG.warn(size + " is not a valid value for preAllocSize"); } } fsyncWarningThresholdMS = Long.getLong("fsync.warningthresholdms", 1000); } // 最大(新)的zxid long lastZxidSeen; // 存储数据相关的流 volatile BufferedOutputStream logStream = null; volatile OutputArchive oa; volatile FileOutputStream fos = null; // log目录文件 File logDir; // 是否强制同步 private final boolean forceSync = !System.getProperty("zookeeper.forceSync", "yes").equals("no");; // 数据库id long dbId; // 流列表 private LinkedList<FileOutputStream> streamsToFlush = new LinkedList<FileOutputStream>(); // 当前大小 long currentSize; // 写日志文件 File logFileWrite = null; }
核心函数,append接口实现:
public synchronized boolean append(TxnHeader hdr, Record txn) throws IOException { if (hdr != null) { // 事务头部不为空 if (hdr.getZxid() <= lastZxidSeen) { // 事务的zxid小于等于最后的zxid LOG.warn("Current zxid " + hdr.getZxid() + " is <= " + lastZxidSeen + " for " + hdr.getType()); } if (logStream==null) { // 日志流为空 if(LOG.isInfoEnabled()){ LOG.info("Creating new log file: log." + Long.toHexString(hdr.getZxid())); } // logFileWrite = new File(logDir, ("log." + Long.toHexString(hdr.getZxid()))); fos = new FileOutputStream(logFileWrite); logStream=new BufferedOutputStream(fos); oa = BinaryOutputArchive.getArchive(logStream); // FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId); // 序列化 fhdr.serialize(oa, "fileheader"); // Make sure that the magic number is written before padding. // 刷新到磁盘 logStream.flush(); // 当前通道的大小 currentSize = fos.getChannel().position(); // 添加fos streamsToFlush.add(fos); } // 填充文件 padFile(fos); // Serializes transaction header and transaction data into a byte buffer. // 将事务头和事务数据序列化成Byte Buffer byte[] buf = Util.marshallTxnEntry(hdr, txn); if (buf == null || buf.length == 0) { // 为空,抛出异常 throw new IOException("Faulty serialization for header " + "and txn"); } // 生成一个验证算法 Checksum crc = makeChecksumAlgorithm(); // Updates the current checksum with the specified array of bytes // 使用Byte数组来更新当前的Checksum crc.update(buf, 0, buf.length); // 写long类型数据 oa.writeLong(crc.getValue(), "txnEntryCRC"); // Write the serialized transaction record to the output archive. // 将序列化的事务记录写入OutputArchive Util.writeTxnBytes(oa, buf); return true; } return false; }
append函数主要用做向事务日志中添加一个条目,其大体步骤如下:
① 检查TxnHeader是否为空,若不为空,则进入②,否则,直接返回false
② 检查logStream是否为空(初始化为空),若不为空,则进入③,否则,进入⑤
③ 初始化写数据相关的流和FileHeader,并序列化FileHeader至指定文件,进入④
④ 强制刷新(保证数据存到磁盘),并获取当前写入数据的大小。进入⑤
⑤ 填充数据,填充0,进入⑥
⑥ 将事务头和事务序列化成ByteBuffer(使用Util.marshallTxnEntry函数),进入⑦
⑦ 使用Checksum算法更新步骤⑥的ByteBuffer。进入⑧
⑧ 将更新的ByteBuffer写入磁盘文件,返回true。
3. ZK的Watcher机制实现
-
类结构说明:
Watcher,接口类型,其定义了process方法,需子类实现。
Event,接口类型,Watcher的内部类,无任何方法。
KeeperState,枚举类型,Event的内部类,表示Zookeeper所处的状态。
EventType,枚举类型,Event的内部类,表示Zookeeper中发生的事件类型。
WatchedEvent,表示对ZooKeeper上发生变化后的反馈,包含了KeeperState和EventType。
ClientWatchManager,接口类型,表示客户端的Watcher管理者,其定义了materialized方法,需子类实现。
ZKWatchManager,Zookeeper的内部类,继承ClientWatchManager。
MyWatcher,ZooKeeperMain的内部类,继承Watcher。
ServerCnxn,接口类型,继承Watcher,表示客户端与服务端的一个连接。
WatchManager,管理Watcher。
-
1) 接口方法:
abstract public void process(WatchedEvent event);
代表了实现Watcher接口时必须实现的的方法,即定义进行处理,WatchedEvent表示观察的事件。
2) 内部类:
public interface Event { }
Event,它是接口类型,Event接口并没有定义任何属性和方法,但是其包含了KeeperState和EventType两个内部枚举类。
public enum KeeperState { // 事件发生时Zookeeper的状态 /** Unused, this state is never generated by the server */ @Deprecated // 未知状态,不再使用,服务器不会产生此状态 Unknown (-1), /** The client is in the disconnected state - it is not connected * to any server in the ensemble. */ // 断开 Disconnected (0), /** Unused, this state is never generated by the server */ @Deprecated // 未同步连接,不再使用,服务器不会产生此状态 NoSyncConnected (1), /** The client is in the connected state - it is connected * to a server in the ensemble (one of the servers specified * in the host connection parameter during ZooKeeper client * creation). */ // 同步连接状态 SyncConnected (3), /** * Auth failed state */ // 认证失败状态 AuthFailed (4), /** * The client is connected to a read-only server, that is the * server which is not currently connected to the majority. * The only operations allowed after receiving this state is * read operations. * This state is generated for read-only clients only since * read/write clients aren't allowed to connect to r/o servers. */ // 只读连接状态 ConnectedReadOnly (5), /** * SaslAuthenticated: used to notify clients that they are SASL-authenticated, * so that they can perform Zookeeper actions with their SASL-authorized permissions. */ // SASL认证通过状态 SaslAuthenticated(6), /** The serving cluster has expired this session. The ZooKeeper * client connection (the session) is no longer valid. You must * create a new client connection (instantiate a new ZooKeeper * instance) if you with to access the ensemble. */ // 过期状态 Expired (-112); // 代表状态的整形值 private final int intValue; // Integer representation of value // for sending over wire // 构造函数 KeeperState(int intValue) { this.intValue = intValue; } // 返回整形值 public int getIntValue() { return intValue; } // 从整形值构造相应的状态 public static KeeperState fromInt(int intValue) { switch(intValue) { case -1: return KeeperState.Unknown; case 0: return KeeperState.Disconnected; case 1: return KeeperState.NoSyncConnected; case 3: return KeeperState.SyncConnected; case 4: return KeeperState.AuthFailed; case 5: return KeeperState.ConnectedReadOnly; case 6: return KeeperState.SaslAuthenticated; case -112: return KeeperState.Expired; default: throw new RuntimeException("Invalid integer value for conversion to KeeperState"); } } }
KeeperState是一个枚举类,其定义了在事件发生时Zookeeper所处的各种状态,其还定义了一个从整形值返回对应状态的方法fromInt。
public enum EventType { // 事件类型 // 无 None (-1), // 结点创建 NodeCreated (1), // 结点删除 NodeDeleted (2), // 结点数据变化 NodeDataChanged (3), // 结点子节点变化 NodeChildrenChanged (4); // 代表事件类型的整形 private final int intValue; // Integer representation of value // for sending over wire // 构造函数 EventType(int intValue) { this.intValue = intValue; } // 返回整形 public int getIntValue() { return intValue; } // 从整形构造相应的事件 public static EventType fromInt(int intValue) { switch(intValue) { case -1: return EventType.None; case 1: return EventType.NodeCreated; case 2: return EventType.NodeDeleted; case 3: return EventType.NodeDataChanged; case 4: return EventType.NodeChildrenChanged; default: throw new RuntimeException("Invalid integer value for conversion to EventType"); } } } }
EventType是一个枚举类,其定义了事件的类型(如创建节点、删除节点等事件),同时,其还定义了一个从整形值返回对应事件类型的方法fromInt。
核心方法:
public Set<Watcher> materialize(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String clientPath) { // 新生成结果Watcher集合 Set<Watcher> result = new HashSet<Watcher>(); switch (type) { // 确定事件类型 case None: // 无类型 // 添加默认Watcher result.add(defaultWatcher); // 是否需要清空(提取对zookeeper.disableAutoWatchReset字段进行配置的值、Zookeeper的状态是否为同步连接) boolean clear = ClientCnxn.getDisableAutoResetWatch() && state != Watcher.Event.KeeperState.SyncConnected; synchronized(dataWatches) { // 同步块 for(Set<Watcher> ws: dataWatches.values()) { // 添加至结果集合 result.addAll(ws); } if (clear) { // 是否需要清空 dataWatches.clear(); } } synchronized(existWatches) { // 同步块 for(Set<Watcher> ws: existWatches.values()) { // 添加至结果集合 result.addAll(ws); } if (clear) { // 是否需要清空 existWatches.clear(); } } synchronized(childWatches) { // 同步块 for(Set<Watcher> ws: childWatches.values()) { // 添加至结果集合 result.addAll(ws); } if (clear) { // 是否需要清空 childWatches.clear(); } } // 返回结果 return result; case NodeDataChanged: // 节点数据变化 case NodeCreated: // 创建节点 synchronized (dataWatches) { // 同步块 // 移除clientPath对应的Watcher后全部添加至结果集合 addTo(dataWatches.remove(clientPath), result); } synchronized (existWatches) { // 移除clientPath对应的Watcher后全部添加至结果集合 addTo(existWatches.remove(clientPath), result); } break; case NodeChildrenChanged: // 节点子节点变化 synchronized (childWatches) { // 移除clientPath对应的Watcher后全部添加至结果集合 addTo(childWatches.remove(clientPath), result); } break; case NodeDeleted: // 删除节点 synchronized (dataWatches) { // 移除clientPath对应的Watcher后全部添加至结果集合 addTo(dataWatches.remove(clientPath), result); } // XXX This shouldn't be needed, but just in case synchronized (existWatches) { // 移除clientPath对应的Watcher Set<Watcher> list = existWatches.remove(clientPath); if (list != null) { // 移除clientPath对应的Watcher后全部添加至结果集合 addTo(existWatches.remove(clientPath), result); LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!"); } } synchronized (childWatches) { // 移除clientPath对应的Watcher后全部添加至结果集合 addTo(childWatches.remove(clientPath), result); } break; default: // 缺省处理 String msg = "Unhandled watch event type " + type + " with state " + state + " on path " + clientPath; LOG.error(msg); throw new RuntimeException(msg); } // 返回结果集合 return result; } }
该方法在事件发生后,返回需要被通知的Watcher集合。在该方法中,首先会根据EventType类型确定相应的事件类型,然后根据事件类型的不同做出相应的操作,如针对None类型,即无任何事件,则首先会从三个键值对中删除clientPath对应的Watcher,然后将剩余的Watcher集合添加至结果集合;针对NodeDataChanged和NodeCreated事件而言,其会从dataWatches和existWatches中删除clientPath对应的Watcher,然后将剩余的Watcher集合添加至结果集合。
-
说明:
· ZKWatchManager,Zookeeper的Watcher管理者,其源码在之前已经分析过,不再累赘。
· WatchRegistration,抽象类,用作watch注册。
· ExistsWatchRegistration,存在性watch注册。
· DataWatchRegistration,数据watch注册。
· ChildWatchRegistration,子节点注册。
· States,枚举类型,表示服务器的状态。
4. ZK的网络通信实现
-
Stats,表示ServerCnxn上的统计数据。
-
Watcher,表示时间处理器。
-
ServerCnxn,表示服务器连接,表示一个从客户端到服务器的连接。
-
NIOServerCnxn,基于NIO的连接的具体实现。
-
NettyServerCnxn,基于Netty的连接的具体实现。
-
NIOServerCnxn继承了ServerCnxn抽象类,使用NIO来处理与客户端之间的通信,使用单线程处理。
public void sendBuffer(ByteBuffer bb) { try { if (bb != ServerCnxnFactory.closeConn) { // 不关闭连接 // We check if write interest here because if it is NOT set, // nothing is queued, so we can try to send the buffer right // away without waking up the selector // 首先检查interestOps中是否存在WRITE操作,如果没有 // 则表示直接发送缓冲而不必先唤醒selector if ((sk.interestOps() & SelectionKey.OP_WRITE) == 0) { // 不为write操作 try { // 将缓冲写入socket sock.write(bb); } catch (IOException e) { // we are just doing best effort right now } } // if there is nothing left to send, we are done if (bb.remaining() == 0) { // bb中的内容已经被全部读取 // 统计发送包信息(调用ServerCnxn方法) packetSent(); return; } } synchronized(this.factory){ // 同步块 // Causes the first selection operation that has not yet returned to return immediately // 让第一个还没返回(阻塞)的selection操作马上返回结果 sk.selector().wakeup(); if (LOG.isTraceEnabled()) { LOG.trace("Add a buffer to outgoingBuffers, sk " + sk + " is valid: " + sk.isValid()); } // 将缓存添加至队列 outgoingBuffers.add(bb); if (sk.isValid()) { // key是否合法 // 将写操作添加至感兴趣的集合中 sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE); } } } catch(Exception e) { LOG.error("Unexpected Exception: ", e); } }
该接口将缓冲写入socket中,其大致处理可以分为两部分:
首先会判断ByteBuffer是否为关闭连接的信号,并且当感兴趣的集合中没有写操作时,其会立刻将缓存写入socket;
其次在synchronized同步块中,会唤醒上个被阻塞的selection操作,然后将缓冲添加至outgoingBuffers队列中,后续再进行发送。
-
NettyServerCnxn继承了ServerCnxn抽象类,使用Netty框架来高效处理与客户端之间的通信。
该接口用于接收ChannelBuffer中的数据,接口在while循环体中,当writerIndex大于readerIndex(表示ChannelBuffer中还有可读内容)且throttled为false时执行while循环体,该接口大致可以分为两部分,首先是当bb不为空时,表示已经准备好读取ChannelBuffer中的内容,其流程如下:
if (bb != null) { // 不为null,表示已经准备好读取message if (LOG.isTraceEnabled()) { LOG.trace("message readable " + message.readableBytes() + " bb len " + bb.remaining() + " " + bb); ByteBuffer dat = bb.duplicate(); dat.flip(); LOG.trace(Long.toHexString(sessionId) + " bb 0x" + ChannelBuffers.hexDump( ChannelBuffers.copiedBuffer(dat))); } if (bb.remaining() > message.readableBytes()) { // bb剩余空间大于message中可读字节大小 // 确定新的limit int newLimit = bb.position() + message.readableBytes(); bb.limit(newLimit); } // 将message写入bb中 message.readBytes(bb); // 重置bb的limit bb.limit(bb.capacity()); if (LOG.isTraceEnabled()) { LOG.trace("after readBytes message readable " + message.readableBytes() + " bb len " + bb.remaining() + " " + bb); ByteBuffer dat = bb.duplicate(); dat.flip(); LOG.trace("after readbytes " + Long.toHexString(sessionId) + " bb 0x" + ChannelBuffers.hexDump( ChannelBuffers.copiedBuffer(dat))); } if (bb.remaining() == 0) { // 已经读完message,表示内容已经全部接收 // 统计接收信息 packetReceived(); // 翻转,可读 bb.flip(); ZooKeeperServer zks = this.zkServer; if (zks == null) { // Zookeeper服务器为空 throw new IOException("ZK down"); } if (initialized) { // 未被初始化 // 处理bb中包含的包信息 zks.processPacket(this, bb); if (zks.shouldThrottle(outstandingCount.incrementAndGet())) { // 是否已经节流 // 不接收数据 disableRecvNoWait(); } } else { // 已经初始化 LOG.debug("got conn req request from " + getRemoteSocketAddress()); // 处理连接请求 zks.processConnectRequest(this, bb); initialized = true; } bb = null; } }
其中主要的部分是判断bb的剩余空间是否大于message中的内容,简单而言,就是判断bb是否还有足够空间存储message内容,然后设置bb的limit,之后将message内容读入bb缓冲中,之后再次确定时候已经读完message内容,统计接收信息,再根据是否已经初始化来处理包或者是连接请求,其中的请求内容都存储在bb中。
public void sendResponse(ReplyHeader h, Record r, String tag) throws IOException { if (!channel.isOpen()) { return; } ByteArrayOutputStream baos = new ByteArrayOutputStream(); // Make space for length BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos); try { // 向baos中写入四个字节(空) baos.write(fourBytes); // 写入记录 bos.writeRecord(h, "header"); if (r != null) { // 写入记录 bos.writeRecord(r, tag); } // 关闭 baos.close(); } catch (IOException e) { LOG.error("Error serializing response"); } // 转化为Byte Array byte b[] = baos.toByteArray(); // 将Byte Array封装成ByteBuffer ByteBuffer bb = ByteBuffer.wrap(b); bb.putInt(b.length - 4).rewind(); // 发送缓冲 sendBuffer(bb); if (h.getXid() > 0) { // zks cannot be null otherwise we would not have gotten here! if (!zkServer.shouldThrottle(outstandingCount.decrementAndGet())) { enableRecv(); } } }
其首先会将header和record都写入baos,之后再将baos转化为ByteBuffer,之后在调用sendBuffer来发送缓冲,而sendBuffer完成的操作是将ByteBuffer写入ChannelBuffer中。
public void process(WatchedEvent event) { // 创建响应头 ReplyHeader h = new ReplyHeader(-1, -1L, 0); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "Deliver event " + event + " to 0x" + Long.toHexString(this.sessionId) + " through " + this); } // Convert WatchedEvent to a type that can be sent over the wire WatcherEvent e = event.getWrapper(); try { // 发送响应 sendResponse(h, e, "notification"); } catch (IOException e1) { if (LOG.isDebugEnabled()) { LOG.debug("Problem sending to " + getRemoteSocketAddress(), e1); } close(); } }
首先创建ReplyHeader,然后再调用sendResponse来发送响应,最后调用close函数进行后续关闭处理。