SelectionKey:通道向selector注册后会创建一个SelectionKey,SelectionKey维系通道和selector的关系.SelectionKey包含两个整数集一个为interest集合,一个为ready集合.interest集合指定Selector需要监听的事件.ready集合为Selector为SelectorKey监听后已经准备就绪的可以进行操作的事件.ready集合特别需要注意,这个里面可能有阻塞的行为,如OP_READ事件,只是暗示可读,但是真正的数据此时还没有到来,此时就会阻塞了。
epoll有三个方法epoll_create(),epoll_ctl(),epoll_wait():
int epoll_create(int size)
创建epoll文件,用于存放epoll_ctl注册的event。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
注册fd到epfd中,op为操作数(add,mod,delete) ,epoll_event为注册感兴趣的事件,这个里面也注册了回调函数,当对应的fd的设备ready时,就调用回调函数,将这个fd加入epfd的ready set当中,epoll_wait()一直就在那里等待ready set。
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
等待ready set,当准备好有数据的时候返回数据的个数,epoll_event为感兴趣的事件集合,maxevents为事件集合的个数。
JDK中向Selector注册事件流程(以linux epoll为例):
关键代码如下:
AbstractSelectableChannel.register():
public final SelectionKey register(Selector sel, int ops,
Object att)
throws ClosedChannelException
{
if (!isOpen()) //channel关闭抛异常
throw new ClosedChannelException();
if ((ops & ~validOps()) != 0)//不合理的注册值,抛异常
throw new IllegalArgumentException();
synchronized (regLock) {//有锁就有可能有线程的阻塞和切换 关键点1
if (blocking)
throw new IllegalBlockingModeException();
SelectionKey k = findKey(sel); //查看原来有没有注册过
if (k != null) {//注册过直接设置后返回
k.interestOps(ops);
k.attach(att);
}
if (k == null) {
// New registration
k = ((AbstractSelector)sel).register(this, ops, att);//没有注册的话执行selector的register
addKey(k);
}
return k;
}
}
SelectionImpl.register():
protected final SelectionKey register(AbstractSelectableChannel ch,
int ops,
Object attachment)
{
if (!(ch instanceof SelChImpl))
throw new IllegalSelectorException();
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
k.attach(attachment);
synchronized (publicKeys) {//又是锁,可能阻塞或者线程切换
implRegister(k);
}
k.interestOps(ops);
return k;
}
EPollSelectorImpl.implRegister():
protected void implRegister(SelectionKeyImpl ski) {
SelChImpl ch = ski.channel;
fdToKey.put(Integer.valueOf(ch.getFDVal()), ski);//关键点2 fd和Selectionkey对应的map
pollWrapper.add(ch);
keys.add(ski);
}
EPollArrayWrapper.add():
void add(SelChImpl channel) {
synchronized (updateList) {
updateList.add(new Updator(channel, EPOLL_CTL_ADD));//关键点3
}
}
关键点1.register()方法中有同步语句,可能当前线程就阻塞了,线程切换会有性能的损耗。
关键点2.fdToKey是个key为fd,value为selectionKey的map
关键点3.
Updator是个内部类:
private static class Updator {
SelChImpl channel;
int opcode;
int events;
Updator(SelChImpl channel, int opcode, int events) {
this.channel = channel;
this.opcode = opcode;
this.events = events;
}
Updator(SelChImpl channel, int opcode) {
this(channel, opcode, 0);
}
}
代表channel以及对channel的操作类型以及操作的事件,新注册的操作类型为add,操作事件为0,代表没有事件。
在SelectionImpl.register()中最后还要执行SelectionKeyImpl.interestOps()方法注册操作事件(前面添加的时候操作事件是为0的)
SelectionKeyImpl:
public SelectionKey interestOps(int ops) {
ensureValid();//检测是否可用
return nioInterestOps(ops);
}
SelectionKey nioInterestOps(int ops) { // package-private
if ((ops & ~channel().validOps()) != 0)
throw new IllegalArgumentException();
//真正的进行epoll感兴趣事件的注册
channel.translateAndSetInterestOps(ops, this);
interestOps = ops;
return this;
}
public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
int newOps = 0;
//进行java的ops和epoll的ops转换,每个具体的channel是有区别的
if ((ops & SelectionKey.OP_READ) != 0)
newOps |= PollArrayWrapper.POLLIN;
if ((ops & SelectionKey.OP_WRITE) != 0)
newOps |= PollArrayWrapper.POLLOUT;
if ((ops & SelectionKey.OP_CONNECT) != 0)
newOps |= PollArrayWrapper.POLLCONN;
sk.selector.putEventOps(sk, newOps);
}
void setInterest(SelChImpl channel, int mask) {
synchronized (updateList) {
// if the previous pending operation is to add this file descriptor
// to epoll then update its event set
if (updateList.size() > 0) {//关键点1
Updator last = updateList.getLast();
//这个肯定是刚才那个注册的channel,直接进行事件的更新
if (last.channel == channel && last.opcode == EPOLL_CTL_ADD) {
last.events = mask;
return;
}
}
// update existing registration 程序运行到这里的话,说明前面已经有更新的updator加入了,这里只好新加入一个
updateList.add(new Updator(channel, EPOLL_CTL_MOD, mask));
}
}
关键点1.这里是考虑到并发的问题了,但是我有个疑问为什么注册要分两个步骤执行,为什么不直接在EPollSelectorImpl.implRegister()加入 updateList.add(new Updator(channel, EPOLL_CTL_ADD, mask))呢?
至此register()动作已经完成。
select()
代码如下:
public int select(long timeout)
throws IOException
{
if (timeout < 0)
throw new IllegalArgumentException("Negative timeout");
return lockAndDoSelect((timeout == 0) ? -1 : timeout);
}
private int lockAndDoSelect(long timeout) throws IOException {
synchronized (this) {
if (!isOpen())
throw new ClosedSelectorException();
synchronized (publicKeys) {
synchronized (publicSelectedKeys) {
return doSelect(timeout);
}
}
}
}
protected int doSelect(long timeout)
throws IOException
{
if (closed)
throw new ClosedSelectorException();
//反注册过程 删除取消的通道的key(interest keys,ready keys(也就是selected keys),channel的key set)
processDeregisterQueue();
try {
begin();
pollWrapper.poll(timeout);//关键点1
} finally {
end();
}
processDeregisterQueue();
int numKeysUpdated = updateSelectedKeys();
if (pollWrapper.interrupted()) {
// Clear the wakeup pipe
pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
synchronized (interruptLock) {
pollWrapper.clearInterrupted();
IOUtil.drain(fd0);
interruptTriggered = false;
}
}
return numKeysUpdated;
}
void processDeregisterQueue() throws IOException {
// Precondition: Synchronized on this, keys, and selectedKeys
//遍历取消的key
Set cks = cancelledKeys();
synchronized (cks) {
Iterator i = cks.iterator();
while (i.hasNext()) {
SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
try {
//底层的取消实现
implDereg(ski);
} catch (SocketException se) {
IOException ioe = new IOException(
"Error deregistering key");
ioe.initCause(se);
throw ioe;
} finally {
//删除取消的key
i.remove();
}
}
}
}
protected void implDereg(SelectionKeyImpl ski) throws IOException {
assert (ski.getIndex() >= 0);
SelChImpl ch = ski.channel;
int fd = ch.getFDVal();
fdToKey.remove(new Integer(fd));//hashmap中去除
pollWrapper.release(ch);//这步很关键
ski.setIndex(-1);
keys.remove(ski);//总的key set去除
selectedKeys.remove(ski);//已经准备好的set去除
deregister((AbstractSelectionKey)ski);//移除channel的集合
SelectableChannel selch = ski.channel();
if (!selch.isOpen() && !selch.isRegistered())
((SelChImpl)selch).kill();
}
void release(SelChImpl channel) {
//空闲队列删除了
synchronized (updateList) {
// flush any pending updates
for (Iterator<Updator> it = updateList.iterator(); it.hasNext();) {
if (it.next().channel == channel) {
it.remove();
}
}
// remove from the idle set (if present)
idleSet.remove(channel);
//调用native 通知本channel对应的fd被被删除
// remove from epoll (if registered)
epollCtl(epfd, EPOLL_CTL_DEL, channel.getFDVal(), 0);
}
}