狂奔 lion

自强不息

NIO学习之Web服务器示例

1 根据cpu core数量确定selector数量
2 用一个selector服务accept,其他selector按照core-1分配线程数运行
3 accept selector作为生产者把获得的请求放入队列
4 某个selector作为消费者从blocking queue中取出请求socket channel,并向自己注册
5 当获得read信号时,selector建立工作任务线程worker,并提交给系统线程池
6 worker线程排队后在线程池中执行,把协议头读入缓冲区,然后解析,处理,响应,关闭连接

import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

public class NioServer {
    
private static NioServer svr = new NioServer();
    
private final int numProcessors = Runtime.getRuntime().availableProcessors();
    
private static LinkedBlockingQueue<SocketChannel> queue = new LinkedBlockingQueue<SocketChannel>();
    
private static ExecutorService es = Executors.newFixedThreadPool(3);
    
    
private NioServer(){}
    
    
public static NioServer getInstance(){return svr;}
    
    
public void init(){
        
try {
            
for(int i=0;i<numProcessors - 1;i++){
                Thread tk 
= new Thread(new Talker());
                tk.start();
            }

            
            Selector acceptSelector 
= Selector.open();

            ServerSocketChannel ssc 
= ServerSocketChannel.open();
            ssc.configureBlocking(
false);

            InetSocketAddress isa 
= new InetSocketAddress("127.0.0.1"8080);
            ssc.socket().bind(isa);

            ssc.register(acceptSelector, SelectionKey.OP_ACCEPT);

            
while (acceptSelector.select() > 0{
                Set
<SelectionKey> readyKeys = acceptSelector.selectedKeys();
                Iterator
<SelectionKey> i = readyKeys.iterator();

                
while (i.hasNext()) {
                    SelectionKey sk 
= i.next();
                    i.remove();
                    
if (sk.isAcceptable()) {
                        ServerSocketChannel nextReady 
= (ServerSocketChannel) sk
                                .channel();
                        SocketChannel s 
= nextReady.accept();
                        s.configureBlocking(
false);
                        queue.offer(s);
                    }

                }

            }

        }
 catch (Exception e) {
            e.printStackTrace();
        }

    }

    
    
private static class Talker implements Runnable{
        
private Selector se = null;

        
        
public Talker(){
            
try {
                
this.se = Selector.open();
            }
 catch (IOException e) {
                e.printStackTrace();
            }

        }

        
        
public void addChannelIfAvailable(){
            
try {
                
if(queue.isEmpty())
                    
return;
                SocketChannel sc 
= queue.poll();
                sc.register(se, SelectionKey.OP_READ);
            }
 catch (ClosedChannelException e) {
                e.printStackTrace();
            }

        }

        
        
public void run() {
            
try {
                
while (true{
                    
int skOps = se.select(20);
                    addChannelIfAvailable();
                    
if(skOps <= 0){
                        
continue;
                    }

                    
                    Set
<SelectionKey> readyKeys = se.selectedKeys();
                    Iterator
<SelectionKey> i = readyKeys.iterator();
                    
while (i.hasNext()) {
                        SelectionKey sk 
= i.next();
                        i.remove();
                        
                        
if (sk.isValid() && sk.isReadable()) {
                            SocketChannel sc 
= (SocketChannel) sk.channel();
                            sc.configureBlocking(
false);
                            
                            Worker worker 
= new Worker(sc);
                            es.execute(worker);    
                        }

                    }

                    Thread.sleep(
300);
                }

            }
 catch (IOException e) {
                e.printStackTrace();
            }
 catch (InterruptedException e) {
                e.printStackTrace();
            }

        }

    }

    
    
private static class Worker implements Runnable{
        ByteBuffer bb 
= ByteBuffer.allocateDirect(1024);
        SocketChannel sc 
= null;
        
        
public Worker(SocketChannel sc){
            Thread.currentThread().setName(
this.toString());
            
this.sc = sc;
        }

        
        
public void run() {
            
try {
                
try{
                    sc.read(bb);
                }
catch(IOException e){
                    e.printStackTrace();
                    sc.finishConnect();
                    sc.close();
                    
return;
                }

                bb.flip();
                
byte[] bs = new byte[bb.limit()];
                bb.get(bs);
                bb.clear();
                
                StringBuilder sb 
= new StringBuilder();
                sb.append(
"HTTP/1.1 200 OK").append("\n").append("Date:" + new Date()).append("\n");
                sb.append(
"Server:NIO Server By Yi Yang\n\n");
                bb.put(sb.toString().getBytes());
                
                bb.flip();
                sc.write(bb);
                bb.clear();
                
                FileInputStream is 
= new FileInputStream("E:/test.html");
                is.getChannel().transferTo(
01024, sc);
                
                is.close();
                sc.finishConnect();
                sc.close();
            }
 catch (IOException e) {
                e.printStackTrace();
            }

        }

    }

    
    
public static void main(String[] args) throws Exception {
        NioServer server 
= NioServer.getInstance();
        server.init();
    }

}
===============06/27/10 
如何解析header?,以行为单位读取,按照header敏感的关键字进行匹配 对于首行取得对方调用的方法GET/POST 地址 和协议版本 
然后根据用户的配置,和解析地址请求,获得响应的servlet,并把通过反射+默认构造函数构造这个servlet,解析地址参数等设置到对象httpservletrequest和httpservletresponse中,然后通过反射invoke对应的get/post/put/delete等方法,并把封装的两个对象作为参数传进去,同时在response的header中传递一个cookie作为session的依据。


 @2008 杨一. 版权所有. 保留所有权利

posted on 2010-06-25 19:19 杨一 阅读(1952) 评论(0)  编辑  收藏 所属分类: Java SE


只有注册用户登录后才能发表评论。


网站导航:
 
<2010年6月>
303112345
6789101112
13141516171819
20212223242526
27282930123
45678910

导航

公告

本人在blogjava上发表的文章及随笔除特别声明外均为原创或翻译,作品受知识产权法保护并被授权遵从 知识分享协议:署名-非商业性使用-相同方式共享 欢迎转载,请在转载时注明作者姓名(杨一)及出处(www.blogjava.net/yangyi)
/////////////////////////////////////////
我的访问者

常用链接

留言簿(5)

随笔分类(55)

随笔档案(55)

相册

Java

其他技术

生活

最新随笔

搜索

积分与排名

最新评论

阅读排行榜

评论排行榜

自强不息


用心 - 珍惜时间,勇于创造