如何理解zk-client通信层ClientCnxnSocket
这篇文章将为大家详细讲解有关如何理解zk-client通信层ClientCnxnSocket,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。
创新互联主要从事成都网站建设、网站制作、网页设计、企业做网站、公司建网站等业务。立足成都服务索县,10余年网站建设经验,价格优惠、服务专业,欢迎来电咨询建站服务:18982081108
ClientCnxnSocket抽象类结构 定义了底层Socket通信接口,默认实现是ClientCnxnSocketNIO
readConnectResult 读取server的connnect的response
readLength 方法 读取buffer长度并给incomingBuffer分配对应大小空间
ClientCnxnSocketNIO 实现
findSendablePacket函数 从outgoingQueue中读取发送的packet
doIO函数 处理读写
doTransport函数
如果连接就绪,调用sendThread连接操作
若读写就绪,调用doIO函数
ClientCnxnSocket
属性

ClientCnxnSocketNIO子类
属性
//nio中的selector
private final Selector selector = Selector.open();
/**
* nio中的selectionKey
*/
private SelectionKey sockKey;
private SocketAddress localSocketAddress;
private SocketAddress remoteSocketAddress;
方法
@Override
void connect(InetSocketAddress addr) throws IOException {
SocketChannel sock = createSock();
try {
registerAndConnect(sock, addr);
} catch (IOException e) {
LOG.error("Unable to open socket to " + addr);
sock.close();
throw e;
}
//已经连接,但是没有收到resposne
initialized = false;
/*
* Reset incomingBuffer
*/
lenBuffer.clear();
incomingBuffer = lenBuffer;
}
client和server主要交互函数
@Override
void doTransport(
int waitTimeOut,
Queue pendingQueue,
ClientCnxn cnxn) throws IOException, InterruptedException {
selector.select(waitTimeOut);
Set selected;
synchronized (this) {
selected = selector.selectedKeys();
}
// Everything below and until we get back to the select is
// non blocking, so time is effectively a constant. That is
// Why we just have to do this once, here
updateNow();
for (SelectionKey k : selected) {
SocketChannel sc = ((SocketChannel) k.channel());
if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
if (sc.finishConnect()) {
updateLastSendAndHeard();
updateSocketAddresses();
sendThread.primeConnection();
}
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
doIO(pendingQueue, cnxn);
}
}
if (sendThread.getZkState().isConnected()) {
if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
enableWrite();
}
}
selected.clear();
} 主要分为读写两种 读:没有初始化完成初始化 读取len再改incomingbuffer分配对应空间 读取对应response 写: 找到可以发送的packet 如果packet的bytebuffer没有创建,那就进行属性添加 bytebuffer写入socketChannel 把Packet从outgingQueue中取出,放入pendingQueue中 /** * @throws InterruptedException * @throws IOException */ void doIO(QueuependingQueue, ClientCnxn cnxn) throws InterruptedException, IOException { SocketChannel sock = (SocketChannel) sockKey.channel(); if (sock == null) { throw new IOException("Socket is null!"); } if (sockKey.isReadable()) { int rc = sock.read(incomingBuffer); if (rc < 0) { throw new EndOfStreamException("Unable to read additional data from server sessionid 0x" + Long.toHexString(sessionId) + ", likely server has closed socket"); } if (!incomingBuffer.hasRemaining()) { incomingBuffer.flip(); if (incomingBuffer == lenBuffer) { recvCount.getAndIncrement(); readLength(); } else if (!initialized) { readConnectResult(); enableRead(); if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) { // Since SASL authentication has completed (if client is configured to do so), // outgoing packets waiting in the outgoingQueue can now be sent. enableWrite(); } lenBuffer.clear(); incomingBuffer = lenBuffer; updateLastHeard(); initialized = true; } else { sendThread.readResponse(incomingBuffer); lenBuffer.clear(); incomingBuffer = lenBuffer; updateLastHeard(); } } } if (sockKey.isWritable()) { Packet p = findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()); if (p != null) { updateLastSend(); // If we already started writing p, p.bb will already exist if (p.bb == null) { if ((p.requestHeader != null) && (p.requestHeader.getType() != OpCode.ping) && (p.requestHeader.getType() != OpCode.auth)) { p.requestHeader.setXid(cnxn.getXid()); } p.createBB(); } sock.write(p.bb); if (!p.bb.hasRemaining()) { sentCount.getAndIncrement(); outgoingQueue.removeFirstOccurrence(p); if (p.requestHeader != null && p.requestHeader.getType() != OpCode.ping && p.requestHeader.getType() != OpCode.auth) { synchronized (pendingQueue) { pendingQueue.add(p); } } } } if (outgoingQueue.isEmpty()) { // No more packets to send: turn off write interest flag. // Will be turned on later by a later call to enableWrite(), // from within ZooKeeperSaslClient (if client is configured // to attempt SASL authentication), or in either doIO() or // in doTransport() if not. disableWrite(); } else if (!initialized && p != null && !p.bb.hasRemaining()) { // On initial connection, write the complete connect request // packet, but then disable further writes until after // receiving a successful connection response. If the // session is expired, then the server sends the expiration // response and immediately closes its end of the socket. If // the client is simultaneously writing on its end, then the // TCP stack may choose to abort with RST, in which case the // client would never receive the session expired event. See // http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.html disableWrite(); } else { // Just in case enableWrite(); } } } 查询待发送队列中可以发送的packet private Packet findSendablePacket(LinkedBlockingDeque outgoingQueue, boolean tunneledAuthInProgres) { //没有要发送的,返回null if (outgoingQueue.isEmpty()) { return null; } // If we've already starting sending the first packet, we better finish if (outgoingQueue.getFirst().bb != null || !tunneledAuthInProgres) { //取队列第一个进行发送 return outgoingQueue.getFirst(); } // Since client's authentication with server is in progress, // send only the null-header packet queued by primeConnection(). // This packet must be sent so that the SASL authentication process // can proceed, but all other packets should wait until // SASL authentication completes. //有正在认证处理,发送空请求头包给服务端 Iterator iter = outgoingQueue.iterator(); while (iter.hasNext()) { Packet p = iter.next(); if (p.requestHeader == null) { // We've found the priming-packet. Move it to the beginning of the queue. iter.remove(); outgoingQueue.addFirst(p); return p; } else { // Non-priming packet: defer it until later, leaving it in the queue // until authentication completes. LOG.debug("deferring non-priming packet {} until SASL authentation completes.", p); } } return null; }
发送接收packet图

关于如何理解zk-client通信层ClientCnxnSocket就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。
网站标题:如何理解zk-client通信层ClientCnxnSocket
分享路径:http://www.jxjierui.cn/article/josphh.html


咨询
建站咨询
