posts - 28,  comments - 15,  trackbacks - 0

     本文主要利用nio包中的SocketChannel和Selector模拟实现ping命令(linux),下面的示例是Sun提供的,有助于理解SocketChannel与Selector的使用。对于初学者而言,有明白以下几件事情:
    <1> 调用Selector.selector()方法将会导致当前线程阻塞;
    <2> Selector阻塞后,以下事件将导致Selector活跃:
                (1)其他线程或者当前线程为通道向Selector中注册了感兴趣的事件(Connect、Accept、Read、Write);
                (2)其他线程调用了selector的wakeup方法;


    明白了这些以下代码对你来说,就是小菜一叠,以上是我看代码的一点体会,希望指正!

public class Ping {
     
// The default daytime port
    static int DAYTIME_PORT = 13;

    
// The port we'll actually use
    static int port = DAYTIME_PORT;


    
// Representation of a ping target
    
// 
    static class Target {

        InetSocketAddress address;
        SocketChannel channel;
        Exception failure;
        
long connectStart;
        
long connectFinish = 0;
        
boolean shown = false;

        Target(String host) 
{
            
try {
            address 
= new InetSocketAddress(InetAddress.getByName(host),
                            port);
            }
 catch (IOException x) {
            failure 
= x;
            }

        }

        
/*打印信息*/
        
void show() {
            String result;
            
if (connectFinish != 0)
                result 
= Long.toString(connectFinish - connectStart) + "ms";
            
else if (failure != null)
                result 
= failure.toString();
            
else
                result 
= "Timed out";
            
            System.out.println(address 
+ " : " + result);
            shown 
= true;
        }


    }



    
// Thread for printing targets as they're heard from
    
//
    static class Printer extends Thread
    
{
        LinkedList pending 
= new LinkedList();
    
        Printer() 
{
            setName(
"Printer");
            setDaemon(
true);
        }


        
void add(Target t) {
            
synchronized (pending) {
                pending.add(t);
                pending.notify();
            }

        }


        
public void run() {
            
try {
                
for (;;) {
                    Target t 
= null;
                    
synchronized (pending) {
                        
while (pending.size() == 0)
                            pending.wait();
                        
//唤醒后删除target
                        t = (Target)pending.removeFirst();
                    }

                    
                    t.show();
                }

            }
 catch (InterruptedException x) {
                
return;
            }

        }


    }



    
// Thread for connecting to all targets in parallel via a single selector
    
// 
    static class Connector extends Thread{
        
        Selector sel;
        Printer printer;
    
        
// List of pending targets.  We use this list because if we try to
        
// register a channel with the selector while the connector thread is
        
// blocked in the selector then we will block.
        
//
        LinkedList pending = new LinkedList();
    
        Connector(Printer pr) 
throws IOException {
            printer 
= pr;
            sel 
= Selector.open();
            setName(
"Connector");
        }


        
// Initiate a connection sequence to the given target and add the
        
// target to the pending-target list
        void add(Target t) {
            SocketChannel sc 
= null;
            
try {
    
                
// Open the channel, set it to non-blocking, initiate connect
                
//在收到添加一个target的请求时,打开通道
                sc = SocketChannel.open();
                
//把通道注册为非阻塞的
                sc.configureBlocking(false);
    
                
boolean connected = sc.connect(t.address);
                System.out.println(
"if connected:"+connected);
                
// Record the time we started
                t.channel = sc;
                t.connectStart 
= System.currentTimeMillis();
    
            
if (connected) {
                t.connectFinish 
= t.connectStart;
                sc.close();
                printer.add(t);
            }
 else {
                
// Add the new channel to the pending list
//                synchronized (pending) {
//                    pending.add(t);
//                }
    
                
// Nudge the selector so that it will process the pending list
                sel.wakeup();
            }

            }
 catch (IOException x) {
                
if (sc != null{
                    
try {
                    sc.close();
                    }
 catch (IOException xx) { }
                }

                t.failure 
= x;
                printer.add(t);
            }

        }


        
// Process any targets in the pending list
        
//
        void processPendingTargets() throws IOException {
            
synchronized (pending) {
                
while (pending.size() > 0{
                    Target t 
= (Target)pending.removeFirst();
                    
try {
        
                    
// Register the channel with the selector, indicating
                    
// interest in connection completion and attaching the
                    
// target object so that we can get the target back
                    
// after the key is added to the selector's
                    
// selected-key set
                    t.channel.register(sel, SelectionKey.OP_CONNECT, t);
        
                    }
 catch (IOException x) {
        
                        
// Something went wrong, so close the channel and
                        
// record the failure
                        t.channel.close();
                        t.failure 
= x;
                        printer.add(t);
        
                    }

        
                }

            }

        }


    
// Process keys that have become selected
    
//
        void processSelectedKeys() throws IOException {
            
for (Iterator i = sel.selectedKeys().iterator(); i.hasNext();) {
    
            
// Retrieve the next key and remove it from the set
            SelectionKey sk = (SelectionKey)i.next();
            i.remove();
    
            
// Retrieve the target and the channel
            Target t = (Target)sk.attachment();
            SocketChannel sc 
= (SocketChannel)sk.channel();
    
            
// Attempt to complete the connection sequence
            try {
                
if (sc.finishConnect()) {
                    sk.cancel();
                    t.connectFinish 
= System.currentTimeMillis();
                    sc.close();
                    printer.add(t);
                }

            }
 catch (IOException x) {
                sc.close();
                t.failure 
= x;
                printer.add(t);
            }

            }

        }


        
volatile boolean shutdown = false;

    
// Invoked by the main thread when it's time to shut down
        void shutdown() {
            shutdown 
= true;
            sel.wakeup();
        }


        
// connector线程执行的过程描述
        
// 启动-->调用select()-->导致线程阻塞
        
//                  -->add()方法调用selecor.wakeup激活阻塞-->n=0-->执行注册感兴趣的事件-->select()-->n=1-->处理target连接-->select()-->线程阻塞
        
//                         -->main函数中的sleep过后,调用shutdown()方法,shutdown()中调用selector.wakeup(),唤醒线程,线程执行完成。  
        public void run() {
            
for (;;) {
                
try {
                    
int n = sel.select();//这里需要注意:线程启动后,select方法的执行将会导致线程阻塞,直到有感兴趣的事件或者selector.wakeup()调用激活。
                    if (n > 0)processSelectedKeys();//由于通道没有注册感兴趣事件,即使是selector.wakeup()激活阻塞,这里的n也等于0
                   
                    processPendingTargets();
                    
                    
if (shutdown) {
                        sel.close();
                        
return;
                    }

                }
 catch (IOException x) {
                    x.printStackTrace();
                }

            }

        }


    }



    
public static void main(String[] args)throws InterruptedException, IOException{
    
//        if (args.length < 1) {
//            System.err.println("Usage: java Ping [port] host");
//            return;
//        }
//        
        int firstArg = 1;
//    
//        // If the first argument is a string of digits then we take that
//        // to be the port number to use
//        if (Pattern.matches("[0-9]+", args[0])) {
//            port = Integer.parseInt(args[0]);
//            firstArg = 1;
//        }
    
        port 
= new Integer(80);
        
// Create the threads and start them up
        Printer printer = new Printer();
        printer.start();
        Connector connector 
= new Connector(printer);
        connector.start();
    
        
// Create the targets and add them to the connector
        LinkedList targets = new LinkedList();
        
for (int i = firstArg; i < 2; i++{
            Target t 
= new Target("localhost");
            targets.add(t);
            connector.add(t);
        }

    
        
//等待指定的时间关闭connector线程
        Thread.sleep(50000);
        connector.shutdown();
//设置connector线程关闭标识位,同时唤醒阻塞的selector
        
//阻塞当前调用线程,等待connector线程执行完毕(join的经典用法)
        connector.join();
    
        
// Print status of targets that have not yet been shown
        for (Iterator i = targets.iterator(); i.hasNext();) {
            Target t 
= (Target)i.next();
            
if (!t.shown)
                t.show();
        }


    }


打印结果:

/10.10.10.153:80 : 50024ms


posted on 2012-02-07 16:53 zhangxl 阅读(902) 评论(0)  编辑  收藏 所属分类: java concurrency

只有注册用户登录后才能发表评论。


网站导航:
 
<2012年2月>
2930311234
567891011
12131415161718
19202122232425
26272829123
45678910

常用链接

留言簿(1)

随笔分类(17)

随笔档案(28)

文章分类(30)

文章档案(30)

相册

收藏夹(2)

hibernate

java基础

mysql

xml

关注

压力测试

算法

最新随笔

搜索

  •  

积分与排名

  • 积分 - 95794
  • 排名 - 601

最新评论

阅读排行榜

评论排行榜