本文主要利用nio包中的SocketChannel和Selector模拟实现ping命令(linux),下面的示例是Sun提供的,有助于理解SocketChannel与Selector的使用。对于初学者而言,有明白以下几件事情:
<1> 调用Selector.selector()方法将会导致当前线程阻塞;
<2> Selector阻塞后,以下事件将导致Selector活跃:
(1)其他线程或者当前线程为通道向Selector中注册了感兴趣的事件(Connect、Accept、Read、Write);
(2)其他线程调用了selector的wakeup方法;
明白了这些以下代码对你来说,就是小菜一叠,以上是我看代码的一点体会,希望指正!

public class Ping
{
// The default daytime port
static int DAYTIME_PORT = 13;

// The port we'll actually use
static int port = DAYTIME_PORT;


// Representation of a ping target
//

static class Target
{

InetSocketAddress address;
SocketChannel channel;
Exception failure;
long connectStart;
long connectFinish = 0;
boolean shown = false;


Target(String host)
{

try
{
address = new InetSocketAddress(InetAddress.getByName(host),
port);

} catch (IOException x)
{
failure = x;
}
}

/**//*打印信息*/

void show()
{
String result;
if (connectFinish != 0)
result = Long.toString(connectFinish - connectStart) + "ms";
else if (failure != null)
result = failure.toString();
else
result = "Timed out";
System.out.println(address + " : " + result);
shown = true;
}

}


// Thread for printing targets as they're heard from
//
static class Printer extends Thread

{
LinkedList pending = new LinkedList();

Printer()
{
setName("Printer");
setDaemon(true);
}


void add(Target t)
{

synchronized (pending)
{
pending.add(t);
pending.notify();
}
}


public void run()
{

try
{

for (;;)
{
Target t = null;

synchronized (pending)
{
while (pending.size() == 0)
pending.wait();
//唤醒后删除target
t = (Target)pending.removeFirst();
}
t.show();
}

} catch (InterruptedException x)
{
return;
}
}

}


// Thread for connecting to all targets in parallel via a single selector
//

static class Connector extends Thread
{
Selector sel;
Printer printer;
// List of pending targets. We use this list because if we try to
// register a channel with the selector while the connector thread is
// blocked in the selector then we will block.
//
LinkedList pending = new LinkedList();

Connector(Printer pr) throws IOException
{
printer = pr;
sel = Selector.open();
setName("Connector");
}

// Initiate a connection sequence to the given target and add the
// target to the pending-target list

void add(Target t)
{
SocketChannel sc = null;

try
{
// Open the channel, set it to non-blocking, initiate connect
//在收到添加一个target的请求时,打开通道
sc = SocketChannel.open();
//把通道注册为非阻塞的
sc.configureBlocking(false);
boolean connected = sc.connect(t.address);
System.out.println("if connected:"+connected);
// Record the time we started
t.channel = sc;
t.connectStart = System.currentTimeMillis();

if (connected)
{
t.connectFinish = t.connectStart;
sc.close();
printer.add(t);

} else
{
// Add the new channel to the pending list
// synchronized (pending) {
// pending.add(t);
// }
// Nudge the selector so that it will process the pending list
sel.wakeup();
}

} catch (IOException x)
{

if (sc != null)
{

try
{
sc.close();

} catch (IOException xx)
{ }
}
t.failure = x;
printer.add(t);
}
}

// Process any targets in the pending list
//

void processPendingTargets() throws IOException
{

synchronized (pending)
{

while (pending.size() > 0)
{
Target t = (Target)pending.removeFirst();

try
{
// Register the channel with the selector, indicating
// interest in connection completion and attaching the
// target object so that we can get the target back
// after the key is added to the selector's
// selected-key set
t.channel.register(sel, SelectionKey.OP_CONNECT, t);

} catch (IOException x)
{
// Something went wrong, so close the channel and
// record the failure
t.channel.close();
t.failure = x;
printer.add(t);
}
}
}
}

// Process keys that have become selected
//

void processSelectedKeys() throws IOException
{

for (Iterator i = sel.selectedKeys().iterator(); i.hasNext();)
{
// Retrieve the next key and remove it from the set
SelectionKey sk = (SelectionKey)i.next();
i.remove();
// Retrieve the target and the channel
Target t = (Target)sk.attachment();
SocketChannel sc = (SocketChannel)sk.channel();
// Attempt to complete the connection sequence

try
{

if (sc.finishConnect())
{
sk.cancel();
t.connectFinish = System.currentTimeMillis();
sc.close();
printer.add(t);
}

} catch (IOException x)
{
sc.close();
t.failure = x;
printer.add(t);
}
}
}

volatile boolean shutdown = false;

// Invoked by the main thread when it's time to shut down

void shutdown()
{
shutdown = true;
sel.wakeup();
}

// connector线程执行的过程描述
// 启动-->调用select()-->导致线程阻塞
// -->add()方法调用selecor.wakeup激活阻塞-->n=0-->执行注册感兴趣的事件-->select()-->n=1-->处理target连接-->select()-->线程阻塞
// -->main函数中的sleep过后,调用shutdown()方法,shutdown()中调用selector.wakeup(),唤醒线程,线程执行完成。

public void run()
{

for (;;)
{

try
{
int n = sel.select();//这里需要注意:线程启动后,select方法的执行将会导致线程阻塞,直到有感兴趣的事件或者selector.wakeup()调用激活。
if (n > 0)processSelectedKeys();//由于通道没有注册感兴趣事件,即使是selector.wakeup()激活阻塞,这里的n也等于0
processPendingTargets();

if (shutdown)
{
sel.close();
return;
}

} catch (IOException x)
{
x.printStackTrace();
}
}
}

}



public static void main(String[] args)throws InterruptedException, IOException
{
// if (args.length < 1) {
// System.err.println("Usage: java Ping [port] host
");
// return;
// }
//
int firstArg = 1;
//
// // If the first argument is a string of digits then we take that
// // to be the port number to use
// if (Pattern.matches("[0-9]+", args[0])) {
// port = Integer.parseInt(args[0]);
// firstArg = 1;
// }
port = new Integer(80);
// Create the threads and start them up
Printer printer = new Printer();
printer.start();
Connector connector = new Connector(printer);
connector.start();
// Create the targets and add them to the connector
LinkedList targets = new LinkedList();

for (int i = firstArg; i < 2; i++)
{
Target t = new Target("localhost");
targets.add(t);
connector.add(t);
}
//等待指定的时间关闭connector线程
Thread.sleep(50000);
connector.shutdown();//设置connector线程关闭标识位,同时唤醒阻塞的selector
//阻塞当前调用线程,等待connector线程执行完毕(join的经典用法)
connector.join();
// Print status of targets that have not yet been shown

for (Iterator i = targets.iterator(); i.hasNext();)
{
Target t = (Target)i.next();
if (!t.shown)
t.show();
}

}
打印结果:
/10.10.10.153:80 : 50024ms
posted on 2012-02-07 16:53
zhangxl 阅读(912)
评论(0) 编辑 收藏 所属分类:
java concurrency