怎样建立连接Zookeeper中的服务端
这篇文章给大家介绍Zookeeper之怎样建立连接服务端,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。
创新互联坚持“要么做到,要么别承诺”的工作理念,服务领域包括:网站制作、网站设计、企业官网、英文网站、手机端网站、网站推广等服务,满足客户于互联网时代的黄陵网站设计、移动媒体设计的需求,帮助企业找到有效的互联网解决方案。努力成为您成熟可靠的网络建设合作伙伴!
服务端处理请求的代码有两种NIOServerCnxnFactory和NettyServerCnxnFactory,默认是NIOServerCnxnFactory,可以通过指定zookeeper.serverCnxnFactory参数来修改。
这两个类逻辑是一样的,只是一个用的java原生的NIO,一个用的netty,这里我们就分析下NIOServerCnxnFactory。
NIOServerCnxnFactory实现了Runnable接口,看下它的run方法,循环处理请求
//NIOServerCnxnFactory.java //第200行 public void run() { while (!ss.socket().isClosed()) { try { selector.select(1000); Setselected; synchronized (this) { selected = selector.selectedKeys(); } ArrayList selectedList = new ArrayList ( selected); Collections.shuffle(selectedList); for (SelectionKey k : selectedList) { //如果是连接请求 if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) { SocketChannel sc = ((ServerSocketChannel) k .channel()).accept(); InetAddress ia = sc.socket().getInetAddress(); //获取IP地址对应的客户端连接数 int cnxncount = getClientCnxnCount(ia); //如果超出则关闭 if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){ LOG.warn("Too many connections from " + ia + " - max is " + maxClientCnxns ); sc.close(); } else { LOG.info("Accepted socket connection from " + sc.socket().getRemoteSocketAddress()); sc.configureBlocking(false); SelectionKey sk = sc.register(selector, SelectionKey.OP_READ); //每一个连接都是一个NIOServerCnxn NIOServerCnxn cnxn = createConnection(sc, sk); sk.attach(cnxn); addCnxn(cnxn); } } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) { //在第二个循环的时候,会进入这里,处理真正的连接请求 NIOServerCnxn c = (NIOServerCnxn) k.attachment(); c.doIO(k); } else { if (LOG.isDebugEnabled()) { LOG.debug("Unexpected ops in select " + k.readyOps()); } } } selected.clear(); } catch (RuntimeException e) { LOG.warn("Ignoring unexpected runtime exception", e); } catch (Exception e) { LOG.warn("Ignoring exception", e); } } closeAll(); LOG.info("NIOServerCnxn factory exited run method"); } //NIOServerCnxn.java //第237行 void doIO(SelectionKey k) throws InterruptedException { try { if (isSocketOpen() == false) { LOG.warn("trying to do i/o on a null socket for session:0x" + Long.toHexString(sessionId)); return; } if (k.isReadable()) { int rc = sock.read(incomingBuffer); if (rc < 0) { throw new EndOfStreamException( "Unable to read additional data from client sessionid 0x" + Long.toHexString(sessionId) + ", likely client has closed socket"); } if (incomingBuffer.remaining() == 0) { boolean isPayload; if (incomingBuffer == lenBuffer) { // start of next request incomingBuffer.flip(); isPayload = readLength(k); incomingBuffer.clear(); } else { // continuation isPayload = true; } if (isPayload) { // not the case for 4letterword readPayload(); } else { // four letter words take care // need not do anything else return; } } } //省略部分代码 } catch (CancelledKeyException e) { } catch (CloseRequestException e) { } catch (EndOfStreamException e) { } catch (IOException e) { } } //NIOServerCnxn.java //第194行 private void readPayload() throws IOException, InterruptedException { if (incomingBuffer.remaining() != 0) { // have we read length bytes? int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok if (rc < 0) { throw new EndOfStreamException( "Unable to read additional data from client sessionid 0x" + Long.toHexString(sessionId) + ", likely client has closed socket"); } } if (incomingBuffer.remaining() == 0) { // have we read length bytes? packetReceived(); incomingBuffer.flip(); if (!initialized) { readConnectRequest(); } else { readRequest(); } lenBuffer.clear(); incomingBuffer = lenBuffer; } } //NIOServerCnxn.java //第434行 private void readConnectRequest() throws IOException, InterruptedException { if (!isZKServerRunning()) { throw new IOException("ZooKeeperServer not running"); } zkServer.processConnectRequest(this, incomingBuffer); initialized = true; } //ZookeeperServer.java //第886行 public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException { BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer)); ConnectRequest connReq = new ConnectRequest(); connReq.deserialize(bia, "connect"); if (LOG.isDebugEnabled()) { LOG.debug("Session establishment request from client " + cnxn.getRemoteSocketAddress() + " client's lastZxid is 0x" + Long.toHexString(connReq.getLastZxidSeen())); } boolean readOnly = false; try { readOnly = bia.readBool("readOnly"); cnxn.isOldClient = false; } catch (IOException e) { // this is ok -- just a packet from an old client which // doesn't contain readOnly field LOG.warn("Connection request from old client " + cnxn.getRemoteSocketAddress() + "; will be dropped if server is in r-o mode"); } //如果客户端没有设置readOnly,但是服务端是只读的,直接抛出异常关闭连接 if (readOnly == false && this instanceof ReadOnlyZooKeeperServer) { String msg = "Refusing session request for not-read-only client " + cnxn.getRemoteSocketAddress(); LOG.info(msg); throw new CloseRequestException(msg); } if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) { String msg = "Refusing session request for client " + cnxn.getRemoteSocketAddress() + " as it has seen zxid 0x" + Long.toHexString(connReq.getLastZxidSeen()) + " our last zxid is 0x" + Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid()) + " client must try another server"; LOG.info(msg); throw new CloseRequestException(msg); } //协商session超时时间 int sessionTimeout = connReq.getTimeOut(); byte passwd[] = connReq.getPasswd(); int minSessionTimeout = getMinSessionTimeout(); if (sessionTimeout < minSessionTimeout) { sessionTimeout = minSessionTimeout; } int maxSessionTimeout = getMaxSessionTimeout(); if (sessionTimeout > maxSessionTimeout) { sessionTimeout = maxSessionTimeout; } cnxn.setSessionTimeout(sessionTimeout); // We don't want to receive any packets until we are sure that the // session is setup cnxn.disableRecv(); long sessionId = connReq.getSessionId(); if (sessionId != 0) { //如果sessionId不是0,说明是之前已经连接过的客户端因为掉线等原因重新连接的情况 long clientSessionId = connReq.getSessionId(); LOG.info("Client attempting to renew session 0x" + Long.toHexString(clientSessionId) + " at " + cnxn.getRemoteSocketAddress()); serverCnxnFactory.closeSession(sessionId); cnxn.setSessionId(sessionId); reopenSession(cnxn, sessionId, passwd, sessionTimeout); } else { LOG.info("Client attempting to establish new session at " + cnxn.getRemoteSocketAddress()); createSession(cnxn, passwd, sessionTimeout); } } //ZookeeperServer.java //第617行 long createSession(ServerCnxn cnxn, byte passwd[], int timeout) { //创建一个session,zookeeper的session管理比较复杂,具体情况下一章分析 long sessionId = sessionTracker.createSession(timeout); Random r = new Random(sessionId ^ superSecret); r.nextBytes(passwd); ByteBuffer to = ByteBuffer.allocate(4); to.putInt(timeout); cnxn.setSessionId(sessionId); //响应客户端 submitRequest(cnxn, sessionId, OpCode.createSession, 0, to, null); return sessionId; } //ZookeeperServer.java //第728行 public void submitRequest(Request si) { //省略部分代码 try { //刷新session的超时时间 touch(si.cnxn); boolean validpacket = Request.isValid(si.type); if (validpacket) { //提交给PrepRequestProcessor进一步处理 firstProcessor.processRequest(si); if (si.cnxn != null) { incInProcess(); } } else { LOG.warn("Received packet at server of unknown type " + si.type); new UnimplementedRequestProcessor().processRequest(si); } } catch (MissingSessionException e) { if (LOG.isDebugEnabled()) { LOG.debug("Dropping request: " + e.getMessage()); } } catch (RequestProcessorException e) { LOG.error("Unable to process request:" + e.getMessage(), e); } } //PrepRequestProcessor.java //第294行 protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) throws KeeperException, IOException, RequestProcessorException { request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), type); switch (type) { //省略部分代码 case OpCode.createSession: request.request.rewind(); int to = request.request.getInt(); request.txn = new CreateSessionTxn(to); request.request.rewind(); //这里又调用了一次addSession,但是之前的代码其实已经新增过了,不太明白为什么 zks.sessionTracker.addSession(request.sessionId, to); zks.setOwner(request.sessionId, request.getOwner()); break; //省略部分代码 default: LOG.error("Invalid OpCode: {} received by PrepRequestProcessor", type); } }
关于Zookeeper之怎样建立连接服务端就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。
分享名称:怎样建立连接Zookeeper中的服务端
文章网址:http://pcwzsj.com/article/jdecgp.html