jinfeng_wang

G-G-S,D-D-U!

BlogJava 首页 新随笔 联系 聚合 管理
  400 Posts :: 0 Stories :: 296 Comments :: 0 Trackbacks

ZooKeeper.class

 1 public String create(final String path, byte data[], List<ACL> acl,
 2             CreateMode createMode)
 3         throws KeeperException, InterruptedException
 4     {
 5         final String clientPath = path;
 6         PathUtils.validatePath(clientPath, createMode.isSequential());
 7 
 8         final String serverPath = prependChroot(clientPath);
 9 
10         RequestHeader h = new RequestHeader();
11         h.setType(ZooDefs.OpCode.create);
12         CreateRequest request = new CreateRequest();
13         CreateResponse response = new CreateResponse();
14         request.setData(data);
15         request.setFlags(createMode.toFlag());
16         request.setPath(serverPath);
17         if (acl != null && acl.size() == 0) {
18             throw new KeeperException.InvalidACLException();
19         }
20         request.setAcl(acl);
21         ReplyHeader r = cnxn.submitRequest(h, request, response, null);
22         if (r.getErr() != 0) {
23             throw KeeperException.create(KeeperException.Code.get(r.getErr()),
24                     clientPath);
25         }
26         if (cnxn.chrootPath == null) {
27             return response.getPath();
28         } else {
29             return response.getPath().substring(cnxn.chrootPath.length());
30         }
31     }


ClientCnxn.class, 放到队列中,循环等到packet标识位finished。

 1 public ReplyHeader submitRequest(RequestHeader h, Record request,
 2             Record response, WatchRegistration watchRegistration)
 3             throws InterruptedException {
 4         ReplyHeader r = new ReplyHeader();
 5         Packet packet = queuePacket(h, r, request, response, nullnullnull,
 6                     null, watchRegistration);
 7         synchronized (packet) {
 8             while (!packet.finished) {
 9                 packet.wait();
10             }
11         }
12         return r;
13     }


 1 Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
 2             Record response, AsyncCallback cb, String clientPath,
 3             String serverPath, Object ctx, WatchRegistration watchRegistration)
 4     {
 5         Packet packet = null;
 6 
 7         // Note that we do not generate the Xid for the packet yet. It is
 8         // generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
 9         // where the packet is actually sent.
10         synchronized (outgoingQueue) {
11             packet = new Packet(h, r, request, response, watchRegistration);
12             packet.cb = cb;
13             packet.ctx = ctx;
14             packet.clientPath = clientPath;
15             packet.serverPath = serverPath;
16             if (!state.isAlive() || closing) {
17                 conLossPacket(packet);
18             } else {
19                 // If the client is asking to close the session then
20                 // mark as closing
21                 if (h.getType() == OpCode.closeSession) {
22                     closing = true;
23                 }
24                 outgoingQueue.add(packet);
25             }
26         }
27         sendThread.getClientCnxnSocket().wakeupCnxn();
28         return packet;
29     }


ClientCnxnSocket.class

 1 @Override
 2     void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
 3                      ClientCnxn cnxn)
 4             throws IOException, InterruptedException {
 5         selector.select(waitTimeOut);
 6         Set<SelectionKey> selected;
 7         synchronized (this) {
 8             selected = selector.selectedKeys();
 9         }
10         // Everything below and until we get back to the select is
11         // non blocking, so time is effectively a constant. That is
12         // Why we just have to do this once, here
13         updateNow();
14         for (SelectionKey k : selected) {
15             SocketChannel sc = ((SocketChannel) k.channel());
16             if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
17                 if (sc.finishConnect()) {
18                     updateLastSendAndHeard();
19                     sendThread.primeConnection();
20                 }
21             } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
22                 doIO(pendingQueue, outgoingQueue, cnxn);
23             }
24         }
25         if (sendThread.getZkState().isConnected()) {
26             synchronized(outgoingQueue) {
27                 if (findSendablePacket(outgoingQueue,
28                         cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
29                     enableWrite();
30                 }
31             }
32         }
33         selected.clear();
34     }



 1 void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
 2       throws InterruptedException, IOException {
 3         SocketChannel sock = (SocketChannel) sockKey.channel();
 4         if (sock == null) {
 5             throw new IOException("Socket is null!");
 6         }
 7         if (sockKey.isReadable()) {
 8             int rc = sock.read(incomingBuffer);
 9             if (rc < 0) {
10                 throw new EndOfStreamException(
11                         "Unable to read additional data from server sessionid 0x"
12                                 + Long.toHexString(sessionId)
13                                 + ", likely server has closed socket");
14             }
15             if (!incomingBuffer.hasRemaining()) {
16                 incomingBuffer.flip();
17                 if (incomingBuffer == lenBuffer) {
18                     recvCount++;
19                     readLength();
20                 } else if (!initialized) {
21                     readConnectResult();
22                     enableRead();
23                     if (findSendablePacket(outgoingQueue,
24                             cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
25                         // Since SASL authentication has completed (if client is configured to do so),
26                         // outgoing packets waiting in the outgoingQueue can now be sent.
27                         enableWrite();
28                     }
29                     lenBuffer.clear();
30                     incomingBuffer = lenBuffer;
31                     updateLastHeard();
32                     initialized = true;
33                 } else {
34                     sendThread.readResponse(incomingBuffer);
35                     lenBuffer.clear();
36                     incomingBuffer = lenBuffer;
37                     updateLastHeard();
38                 }
39             }
40         }
41         if (sockKey.isWritable()) {
42             synchronized(outgoingQueue) {
43                 Packet p = findSendablePacket(outgoingQueue,
44                         cnxn.sendThread.clientTunneledAuthenticationInProgress());
45 
46                 if (p != null) {
47                     updateLastSend();
48                     // If we already started writing p, p.bb will already exist
49                     if (p.bb == null) {
50                         if ((p.requestHeader != null) &&
51                                 (p.requestHeader.getType() != OpCode.ping) &&
52                                 (p.requestHeader.getType() != OpCode.auth)) {
53                             p.requestHeader.setXid(cnxn.getXid());
54                         }
55                         p.createBB();
56                     }
57                     sock.write(p.bb);
58                     if (!p.bb.hasRemaining()) {
59                         sentCount++;
60                         outgoingQueue.removeFirstOccurrence(p);
61                         if (p.requestHeader != null
62                                 && p.requestHeader.getType() != OpCode.ping
63                                 && p.requestHeader.getType() != OpCode.auth) {
64                             synchronized (pendingQueue) {
65                                 pendingQueue.add(p);
66                             }
67                         }
68                     }
69                 }
70                 if (outgoingQueue.isEmpty()) {
71                     // No more packets to send: turn off write interest flag.
72                     // Will be turned on later by a later call to enableWrite(),
73                     // from within ZooKeeperSaslClient (if client is configured
74                     // to attempt SASL authentication), or in either doIO() or
75                     // in doTransport() if not.
76                     disableWrite();
77                 } else if (!initialized && p != null && !p.bb.hasRemaining()) {
78                     // On initial connection, write the complete connect request
79                     // packet, but then disable further writes until after
80                     // receiving a successful connection response.  If the
81                     // session is expired, then the server sends the expiration
82                     // response and immediately closes its end of the socket.  If
83                     // the client is simultaneously writing on its end, then the
84                     // TCP stack may choose to abort with RST, in which case the
85                     // client would never receive the session expired event.  See
86                     // http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.html
87                     disableWrite();
88                 } else {
89                     // Just in case
90                     enableWrite();
91                 }
92             }
93         }
94     }



SendThread.class

  1 void readResponse(ByteBuffer incomingBuffer) throws IOException {
  2             ByteBufferInputStream bbis = new ByteBufferInputStream(
  3                     incomingBuffer);
  4             BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
  5             ReplyHeader replyHdr = new ReplyHeader();
  6 
  7             replyHdr.deserialize(bbia, "header");
  8             if (replyHdr.getXid() == -2) {
  9                 // -2 is the xid for pings
 10                 if (LOG.isDebugEnabled()) {
 11                     LOG.debug("Got ping response for sessionid: 0x"
 12                             + Long.toHexString(sessionId)
 13                             + " after "
 14                             + ((System.nanoTime() - lastPingSentNs) / 1000000)
 15                             + "ms");
 16                 }
 17                 return;
 18             }
 19             if (replyHdr.getXid() == -4) {
 20                 // -4 is the xid for AuthPacket               
 21                 if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
 22                     state = States.AUTH_FAILED;                    
 23                     eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, 
 24                             Watcher.Event.KeeperState.AuthFailed, null) );                                        
 25                 }
 26                 if (LOG.isDebugEnabled()) {
 27                     LOG.debug("Got auth sessionid:0x"
 28                             + Long.toHexString(sessionId));
 29                 }
 30                 return;
 31             }
 32             if (replyHdr.getXid() == -1) {
 33                 // -1 means notification
 34                 if (LOG.isDebugEnabled()) {
 35                     LOG.debug("Got notification sessionid:0x"
 36                         + Long.toHexString(sessionId));
 37                 }
 38                 WatcherEvent event = new WatcherEvent();
 39                 event.deserialize(bbia, "response");
 40 
 41                 // convert from a server path to a client path
 42                 if (chrootPath != null) {
 43                     String serverPath = event.getPath();
 44                     if(serverPath.compareTo(chrootPath)==0)
 45                         event.setPath("/");
 46                     else if (serverPath.length() > chrootPath.length())
 47                         event.setPath(serverPath.substring(chrootPath.length()));
 48                     else {
 49                         LOG.warn("Got server path " + event.getPath()
 50                                 + " which is too short for chroot path "
 51                                 + chrootPath);
 52                     }
 53                 }
 54 
 55                 WatchedEvent we = new WatchedEvent(event);
 56                 if (LOG.isDebugEnabled()) {
 57                     LOG.debug("Got " + we + " for sessionid 0x"
 58                             + Long.toHexString(sessionId));
 59                 }
 60 
 61                 eventThread.queueEvent( we );
 62                 return;
 63             }
 64 
 65             // If SASL authentication is currently in progress, construct and
 66             // send a response packet immediately, rather than queuing a
 67             // response as with other packets.
 68             if (clientTunneledAuthenticationInProgress()) {
 69                 GetSASLRequest request = new GetSASLRequest();
 70                 request.deserialize(bbia,"token");
 71                 zooKeeperSaslClient.respondToServer(request.getToken(),
 72                   ClientCnxn.this);
 73                 return;
 74             }
 75 
 76             Packet packet;
 77             synchronized (pendingQueue) {
 78                 if (pendingQueue.size() == 0) {
 79                     throw new IOException("Nothing in the queue, but got "
 80                             + replyHdr.getXid());
 81                 }
 82                 packet = pendingQueue.remove();
 83             }
 84             /*
 85              * Since requests are processed in order, we better get a response
 86              * to the first request!
 87              */
 88             try {
 89                 if (packet.requestHeader.getXid() != replyHdr.getXid()) {
 90                     packet.replyHeader.setErr(
 91                             KeeperException.Code.CONNECTIONLOSS.intValue());
 92                     throw new IOException("Xid out of order. Got Xid "
 93                             + replyHdr.getXid() + " with err " +
 94                             + replyHdr.getErr() +
 95                             " expected Xid "
 96                             + packet.requestHeader.getXid()
 97                             + " for a packet with details: "
 98                             + packet );
 99                 }
100 
101                 packet.replyHeader.setXid(replyHdr.getXid());
102                 packet.replyHeader.setErr(replyHdr.getErr());
103                 packet.replyHeader.setZxid(replyHdr.getZxid());
104                 if (replyHdr.getZxid() > 0) {
105                     lastZxid = replyHdr.getZxid();
106                 }
107                 if (packet.response != null && replyHdr.getErr() == 0) {
108                     packet.response.deserialize(bbia, "response");
109                 }
110 
111                 if (LOG.isDebugEnabled()) {
112                     LOG.debug("Reading reply sessionid:0x"
113                             + Long.toHexString(sessionId) + ", packet:: " + packet);
114                 }
115             } finally {
116                 finishPacket(packet);
117             }
118         }


posted on 2016-12-27 13:51 jinfeng_wang 阅读(485) 评论(0)  编辑  收藏 所属分类: 2016-zookeeper

只有注册用户登录后才能发表评论。


网站导航: