java io以及unix io模型

IO分两个阶段:
1.通知内核准备数据。2.数据从内核缓冲区拷贝到应用缓冲区

根据这2点IO类型可以分成:
1.阻塞IO,在两个阶段上面都是阻塞的。
2.非阻塞IO,在第1阶段,程序不断的轮询直到数据准备好,第2阶段还是阻塞的
3.IO复用,在第1阶段,当一个或者多个IO准备就绪时,通知程序,第2阶段还是阻塞的,在第1阶段还是轮询实现的,只是
所有的IO都集中在一个地方,这个地方进行轮询
4.信号IO,当数据准备完毕的时候,信号通知程序数据准备完毕,第2阶段阻塞
5.异步IO,1,2都不阻塞,windows的iocp是真正的异步IO


Java NIO
java NIO在linux上面是用epoll实现的,属于IO复用类型。
Selector:IO的多路复用器,通道需要向其注册,在数据准备阶段由他进行状态的轮询
SelectionKey:通道向selector注册后会创建一个SelectionKey,SelectionKey维系通道和selector的关系.SelectionKey包含两个整数集一个为interest集合,一个为ready集合.interest集合指定Selector需要监听的事件.ready集合为Selector为SelectorKey监听后已经准备就绪的可以进行操作的事件.ready集合特别需要注意,这个里面可能有阻塞的行为,如OP_READ事件,只是暗示可读,但是真正的数据此时还没有到来,此时就会阻塞了。

epoll有三个方法epoll_create(),epoll_ctl(),epoll_wait():
int epoll_create(int size)
        创建epoll文件,用于存放epoll_ctl注册的event。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
        注册fd到epfd中,op为操作数(add,mod,delete) ,epoll_event为注册感兴趣的事件,这个里面也注册了回调函数,当对应的fd的设备ready时,就调用回调函数,将这个fd加入epfd的ready set当中,epoll_wait()一直就在那里等待ready set。
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
        等待ready set,当准备好有数据的时候返回数据的个数,epoll_event为感兴趣的事件集合,maxevents为事件集合的个数。

JDK中向Selector注册事件流程(以linux epoll为例):
关键代码如下:
AbstractSelectableChannel.register():
    
public final SelectionKey register(Selector sel, int ops,
                                       Object att)
        
throws ClosedChannelException
    {
        
if (!isOpen()) //channel关闭抛异常
            throw new ClosedChannelException();
        
if ((ops & ~validOps()) != 0)//不合理的注册值,抛异常
            throw new IllegalArgumentException();
        
synchronized (regLock) {//有锁就有可能有线程的阻塞和切换  关键点1
            if (blocking)
                
throw new IllegalBlockingModeException();
            SelectionKey k 
= findKey(sel); //查看原来有没有注册过
            if (k != null) {//注册过直接设置后返回
                k.interestOps(ops);
                k.attach(att);
            }
            
if (k == null) {
                
// New registration
                k = ((AbstractSelector)sel).register(this, ops, att);//没有注册的话执行selector的register
                addKey(k);
            }
            
return k;
        }
    }


SelectionImpl.register():
    
protected final SelectionKey register(AbstractSelectableChannel ch,
                                          
int ops,
                                          Object attachment)
    {
        
if (!(ch instanceof SelChImpl))
            
throw new IllegalSelectorException();
        SelectionKeyImpl k 
= new SelectionKeyImpl((SelChImpl)ch, this);
        k.attach(attachment);
        
synchronized (publicKeys) {//又是锁,可能阻塞或者线程切换
            implRegister(k);
        }
        k.interestOps(ops);
        
return k;
    }


EPollSelectorImpl.implRegister():
protected void implRegister(SelectionKeyImpl ski) {
        SelChImpl ch 
= ski.channel;
        fdToKey.put(Integer.valueOf(ch.getFDVal()), ski);
//关键点2 fd和Selectionkey对应的map
        pollWrapper.add(ch);
        keys.add(ski);
}

EPollArrayWrapper.add():
void add(SelChImpl channel) {
        
synchronized (updateList) {
            updateList.add(
new Updator(channel, EPOLL_CTL_ADD));//关键点3
        }
}

关键点1.register()方法中有同步语句,可能当前线程就阻塞了,线程切换会有性能的损耗。
关键点2.fdToKey是个key为fd,value为selectionKey的map
关键点3.Updator是个内部类:
    private static class Updator {
        SelChImpl channel;
        
int opcode;
        
int events;
        Updator(SelChImpl channel, 
int opcode, int events) {
            
this.channel = channel;
            
this.opcode = opcode;
            
this.events = events;
        }
        Updator(SelChImpl channel, 
int opcode) {
            
this(channel, opcode, 0);
        }
    }
代表channel以及对channel的操作类型以及操作的事件,新注册的操作类型为add,操作事件为0,代表没有事件。
在SelectionImpl.register()中最后还要执行SelectionKeyImpl.interestOps()方法注册操作事件(前面添加的时候操作事件是为0的)
SelectionKeyImpl:
  
public SelectionKey interestOps(int ops) {
        ensureValid();
//检测是否可用
        return nioInterestOps(ops);
    }


    SelectionKey nioInterestOps(
int ops) {      // package-private
        if ((ops & ~channel().validOps()) != 0)
            
throw new IllegalArgumentException();
        
//真正的进行epoll感兴趣事件的注册
        channel.translateAndSetInterestOps(ops, this);
        interestOps 
= ops;
        
return this;
    }


    
public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
        
int newOps = 0;
        
//进行java的ops和epoll的ops转换,每个具体的channel是有区别的
        if ((ops & SelectionKey.OP_READ) != 0)
            newOps 
|= PollArrayWrapper.POLLIN;
        
if ((ops & SelectionKey.OP_WRITE) != 0)
            newOps 
|= PollArrayWrapper.POLLOUT;
        
if ((ops & SelectionKey.OP_CONNECT) != 0)
            newOps 
|= PollArrayWrapper.POLLCONN;
        sk.selector.putEventOps(sk, newOps);
    }

   
void setInterest(SelChImpl channel, int mask) {

        
synchronized (updateList) {
            
// if the previous pending operation is to add this file descriptor
            
// to epoll then update its event set
            if (updateList.size() > 0) {//关键点1
                Updator last = updateList.getLast();
                
//这个肯定是刚才那个注册的channel,直接进行事件的更新
                if (last.channel == channel && last.opcode == EPOLL_CTL_ADD) {
                    last.events 
= mask;
                    
return;
                }
            }

            
// update existing registration 程序运行到这里的话,说明前面已经有更新的updator加入了,这里只好新加入一个
            updateList.add(new Updator(channel, EPOLL_CTL_MOD, mask));
        }
    }
关键点1.这里是考虑到并发的问题了,但是我有个疑问为什么注册要分两个步骤执行,为什么不直接在EPollSelectorImpl.implRegister()加入 updateList.add(new Updator(channel, EPOLL_CTL_ADD, mask))呢?
至此register()动作已经完成。


select()



代码如下:
    public int select(long timeout)
        
throws IOException
    {
        
if (timeout < 0)
            
throw new IllegalArgumentException("Negative timeout");
        
return lockAndDoSelect((timeout == 0? -1 : timeout);
    }

    
private int lockAndDoSelect(long timeout) throws IOException {
        
synchronized (this) {
            
if (!isOpen())
                
throw new ClosedSelectorException();
            
synchronized (publicKeys) {
                
synchronized (publicSelectedKeys) {
                    
return doSelect(timeout);
                }
            }
        }
    }
   
protected int doSelect(long timeout)
        
throws IOException
    {
        
if (closed)
            
throw new ClosedSelectorException();

        
//反注册过程 删除取消的通道的key(interest keys,ready keys(也就是selected keys),channel的key set)
        processDeregisterQueue();
        
try {
            begin();
            pollWrapper.poll(timeout);
//关键点1
        } finally {
            end();
        }
        processDeregisterQueue();
        
int numKeysUpdated = updateSelectedKeys();
        
if (pollWrapper.interrupted()) {
            
// Clear the wakeup pipe
            pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
            
synchronized (interruptLock) {
                pollWrapper.clearInterrupted();
                IOUtil.drain(fd0);
                interruptTriggered 
= false;
            }
        }
        
return numKeysUpdated;
    }


      
void processDeregisterQueue() throws IOException {
        
// Precondition: Synchronized on this, keys, and selectedKeys
        
//遍历取消的key
        Set cks = cancelledKeys();
        
synchronized (cks) {
            Iterator i 
= cks.iterator();
            
while (i.hasNext()) {
                SelectionKeyImpl ski 
= (SelectionKeyImpl)i.next();
                
try {
                     //底层的取消实现
                    implDereg(ski);
                } 
catch (SocketException se) {
                    IOException ioe 
= new IOException(
                        
"Error deregistering key");
                    ioe.initCause(se);
                    
throw ioe;
                } 
finally {
                    
//删除取消的key
                    i.remove();
                }
            }
        }
    }

       
protected void implDereg(SelectionKeyImpl ski) throws IOException {
        
assert (ski.getIndex() >= 0);
        SelChImpl ch 
= ski.channel;
        
int fd = ch.getFDVal();
        fdToKey.remove(
new Integer(fd));//hashmap中去除
        pollWrapper.release(ch);//这步很关键
        ski.setIndex(-1);
        keys.remove(ski);
//总的key set去除
        selectedKeys.remove(ski);//已经准备好的set去除
        deregister((AbstractSelectionKey)ski);//移除channel的集合
        SelectableChannel selch = ski.channel();
        
if (!selch.isOpen() && !selch.isRegistered())
            ((SelChImpl)selch).kill();
    }



    
void release(SelChImpl channel) {

        
//空闲队列删除了
        synchronized (updateList) {
            
// flush any pending updates
            for (Iterator<Updator> it = updateList.iterator(); it.hasNext();) {
                
if (it.next().channel == channel) {
                    it.remove();
                }
            }

            
// remove from the idle set (if present)
            idleSet.remove(channel);

             
//调用native 通知本channel对应的fd被被删除
            
// remove from epoll (if registered)
            epollCtl(epfd, EPOLL_CTL_DEL, channel.getFDVal(), 0);
        }
    }
看关键点1的代码:
    int poll(long timeout) throws IOException {
        updateRegistrations();
//在poll前 先把update里面不需要的条目处理掉
        updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
        
for (int i=0; i<updated; i++) {
            
if (getDescriptor(i) == incomingInterruptFD) {
                interruptedIndex 
= i;
                interrupted 
= true;
                
break;
            }
        }
        
return updated;
    }


    
void updateRegistrations() {
        
synchronized (updateList) {
            Updator u 
= null;
            
while ((u = updateList.poll()) != null) {
                SelChImpl ch 
= u.channel;
                
if (!ch.isOpen())
                    
continue;

                
// if the events are 0 then file descriptor is put into "idle
                
// set" to prevent it being polled
                if (u.events == 0) {//这个表示interest事件为0
                    boolean added = idleSet.add(u.channel); //关键点1
                    
//先加入到idleSet里面 如果是这次加入的 而且操作行为是mod,那么就是个删除这个channel对应的fd
                    
// if added to idle set then remove from epoll if registered
                    if (added && (u.opcode == EPOLL_CTL_MOD))
                        epollCtl(epfd, EPOLL_CTL_DEL, ch.getFDVal(), 
0);
                } 
else {
                    
//关键点 2
                    
// events are specified. If file descriptor was in idle set
                    
// it must be re-registered (by converting opcode to ADD)
                    boolean idle = false;
                    
//如果idleSet不为空而且有这个Updator  说明关键点1处代码返回true,操作行为为add,mod的话在epollCtl会被删除掉
                    if (!idleSet.isEmpty())
                        idle 
= idleSet.remove(u.channel);
                    
int opcode = (idle) ? EPOLL_CTL_ADD : u.opcode;
                    epollCtl(epfd, opcode, ch.getFDVal(), u.events);
                }
            }
        }
    }

updateList是在EPollArrayWrapper.setInterest()和add()方法中添加的,当有多个线程的时候,通过channel register()有可能刚加入的Updator会被updateRegistrations()得到,得到的就是Channel第一次register的updatot,这个时候events为0,被加入到idleSet接着setInterest()被调用(channel register()最后一步),多了一个updator,这个时候再执行Selector.select(),显然会到关键点2操作行为add,是没有问题的。执行完updateRegistrations()方法,然后就epollWait()方法的调用,这个就是epoll的native方法了


总结:
1.Channel的register方法最后加入channel的感兴趣的事件到updatorList中
2.Selector的select的方法主要是对updatorList进行运作,首先去除所有cancelkey(),也就删除了对应的底层的updatorList的条目,然后迭代updatorList根据updator的event事件进行处理,也就是执行epoll的epollCtl方法,之后就是执行epollWait等待epollCtl的channel对应的callback函数的执行了。

posted on 2011-09-29 13:15 nod0620 阅读(1621) 评论(2)  编辑  收藏

评论

# re: java io以及unix io模型 2016-02-27 20:09 tianapple

这么好的文章,没有顶  回复  更多评论   

# re: java io以及unix io模型 2016-02-27 20:09 tianapple

//反注册过程 删除取消的通道的key(interest keys,ready keys(也就是selected keys),channel的key set)
processDeregisterQueue();
这段调用是合意,实在没看明白,能否说名一下
  回复  更多评论   


只有注册用户登录后才能发表评论。


网站导航:
 
<2025年1月>
2930311234
567891011
12131415161718
19202122232425
2627282930311
2345678

导航

统计

常用链接

留言簿

随笔分类

随笔档案

文章分类

文章档案

搜索

最新评论

阅读排行榜

评论排行榜