从JDK 1.4开始,Java的标准库中就包含了NIO,
即所谓的“New
IO”。其中最重要的功能就是提供了“非阻塞”的IO,当然包括了Socket。NonBlocking的IO就是对select(Unix平台下)以及
WaitForMultipleObjects(Windows平台)的封装,提供了高性能、易伸缩的服务架构。
说来惭愧,直到JDK1.4才有这种功能,但迟到者不一定没有螃蟹吃,NIO就提供了优秀的面向对象的解决方案,可以很方便地编写高性能的服务器。
话说回来,传统的Server/Client实现是基于Thread per request,即服务器为每个客户端请求建立一个线程处理,单独负责处理一个客户的请求。比如像Tomcat(新版本也会提供NIO方案)、Resin等Web服务器就是这样实现的。当然为了减少瞬间峰值问题,服务器一般都使用线程池,规定了同时并发的最大数量,避免了线程的无限增长。
但这样有一个问题:如果线程池的大小为100,当有100个用户同时通过HTTP现在一个大文件时,服务器的线程池会用完,因为所有的线程都在传输大文件了,即使第101个请求者仅仅请求一个只有10字节的页面,服务器也无法响应了,只有等到线程池中有空闲的线程出现。
另外,线程的开销也是很大的,特别是达到了一个临界值后,性能会显著下降,这也限制了传统的Socket方案无法应对并发量大的场合,而“非阻塞”的IO就能轻松解决这个问题。
下面只是一个简单的例子:服务器提供了下载大型文件的功能,客户端连接上服务器的12345端口后,就可以读取服务器发送的文件内容信息了。注意这里的服务器只有一个主线程,没有其他任何派生线程,让我们看看NIO是如何用一个线程处理N个请求的。
NIO服务器最核心的一点就是反应器模式:当有感兴趣的事件发生的,就通知对
应的事件处理器去处理这个事件,如果没有,则不处理。所以使用一个线程做轮询就可以了。当然这里这是个例子,如果要获得更高性能,可以使用少量的线程,一
个负责接收请求,其他的负责处理请求,特别是对于多CPU时效率会更高。
关于使用NIO过程中出现的问题,最为普遍的就是为什么没有请求时CPU的占
用率为100%?出现这种问题的主要原因是注册了不感兴趣的事件,比如如果没有数据要发到客户端,而又注册了写事件(OP_WRITE),则在
Selector.select()上就会始终有事件出现,CPU就一直处理了,而此时select()应该是阻塞的。
另外一个值得注意的问题是:由于只使用了一个线程(多个线程也如此)处理用户请求,所以要避免线程被阻塞,解决方法是事件的处理者必须要即刻返回,不能陷入循环中,否则会影响其他用户的请求速度。
具体到本例子中,由于文件比较大,如果一次性发送整个文件(这里的一次性不是
指send整个文件内容,而是通过while循环不间断的发送分组包),则主线程就会阻塞,其他用户就不能响应了。这里的解决方法是当有WRITE事件
时,仅仅是发送一个块(比如4K字节)。发完后,继续等待WRITE事件出现,依次处理,直到整个文件发送完毕,这样就不会阻塞其他用户了。
服务器的例子:
package
nio.file;
import
java.io.FileInputStream;
import
java.io.IOException;
import
java.net.InetSocketAddress;
import
java.nio.ByteBuffer;
import
java.nio.CharBuffer;
import
java.nio.channels.FileChannel;
import
java.nio.channels.Selecti;
import
java.nio.channels.Selector;
import
java.nio.channels.ServerSocketChannel;
import
java.nio.channels.SocketChannel;
import
java.nio.charset.Charset;
import
java.nio.charset.CharsetDecoder;
import
java.util.Iterator;
/**
* 测试文件下载的NIOServer
*
*
@author
tenyears.cn
*/
public class
NIOServer
{
static
int
BLOCK =
4096
;
// 处理与客户端的交互
public class
HandleClient
{
protected
FileChannel channel;
protected
ByteBuffer buffer;
public
HandleClient
()
throws
IOException
{
this
.channel =
new
FileInputStream
(
filename
)
.getChannel
()
;
this
.buffer = ByteBuffer.allocate
(
BLOCK
)
;
}
public
ByteBuffer readBlock
() {
try
{
buffer.clear
()
;
int
count = channel.read
(
buffer
)
;
buffer.flip
()
;
if
(
count <=
0
)
return null
;
}
catch
(
IOException e
) {
e.printStackTrace
()
;
}
return
buffer;
}
public
void
close
() {
try
{
channel.close
()
;
}
catch
(
IOException e
) {
e.printStackTrace
()
;
}
}
}
protected
Selector selector;
protected
String filename =
"d:\\bigfile.dat"
;
// a big file
protected
ByteBuffer clientBuffer = ByteBuffer.allocate
(
BLOCK
)
;
protected
CharsetDecoder decoder;
public
NIOServer
(
int
port
)
throws
IOException
{
selector =
this
.getSelector
(
port
)
;
Charset charset = Charset.forName
(
"GB2312"
)
;
decoder = charset.newDecoder
()
;
}
// 获取Selector
protected
Selector getSelector
(
int
port
)
throws
IOException
{
ServerSocketChannel server = ServerSocketChannel.open
()
;
Selector sel = Selector.open
()
;
server.socket
()
.bind
(
new
InetSocketAddress
(
port
))
;
server.configureBlocking
(
false
)
;
server.register
(
sel, Selecti.OP_ACCEPT
)
;
return
sel;
}
// 监听端口
public
void
listen
() {
try
{
for
(
;;
) {
selector.select
()
;
Iterator iter = selector.selectedKeys
()
.iterator
()
;
while
(
iter.hasNext
()) {
Selecti key = iter.next
()
;
iter.remove
()
;
handleKey
(
key
)
;
}
}
}
catch
(
IOException e
) {
e.printStackTrace
()
;
}
}
// 处理事件
protected
void
handleKey
(
Selecti key
)
throws
IOException
{
if
(
key.isAcceptable
()) {
// 接收请求
ServerSocketChannel server =
(
ServerSocketChannel
)
key.channel
()
;
SocketChannel channel = server.accept
()
;
channel.configureBlocking
(
false
)
;
channel.register
(
selector, Selecti.OP_READ
)
;
}
else if
(
key.isReadable
()) {
// 读信息
SocketChannel channel =
(
SocketChannel
)
key.channel
()
;
int
count = channel.read
(
clientBuffer
)
;
if
(
count >
0
) {
clientBuffer.flip
()
;
CharBuffer charBuffer = decoder.decode
(
clientBuffer
)
;
System.out.println
(
"Client >>"
+ charBuffer.toString
())
;
Selecti wKey = channel.register
(
selector,
Selecti.OP_WRITE
)
;
wKey.attach
(
new
HandleClient
())
;
}
else
channel.close
()
;
clientBuffer.clear
()
;
}
else if
(
key.isWritable
()) {
// 写事件
SocketChannel channel =
(
SocketChannel
)
key.channel
()
;
HandleClient handle =
(
HandleClient
)
key.attachment
()
;
ByteBuffer block = handle.readBlock
()
;
if
(
block !=
null
)
channel.write
(
block
)
;
else
{
handle.close
()
;
channel.close
()
;
}
}
}
public static
void
main
(
String
[]
args
) {
int
port =
12345
;
try
{
NIOServer server =
new
NIOServer
(
port
)
;
System.out.println
(
"Listernint on "
+ port
)
;
while
(
true
) {
server.listen
()
;
}
}
catch
(
IOException e
) {
e.printStackTrace
()
;
}
}
}
|
该代码中,通过一个HandleClient来获取文件的一块数据,每一个客户都会分配一个HandleClient的实例。
下面是客户端请求的代码,也比较简单,模拟100个用户同时下载文件。
package
nio.file;
import
java.io.IOException;
import
java.net.InetSocketAddress;
import
java.nio.ByteBuffer;
import
java.nio.CharBuffer;
import
java.nio.channels.Selecti;
import
java.nio.channels.Selector;
import
java.nio.channels.SocketChannel;
import
java.nio.charset.Charset;
import
java.nio.charset.CharsetEncoder;
import
java.util.Iterator;
import
java.util.concurrent.ExecutorService;
import
java.util.concurrent.Executors;
/**
* 文件下载客户端
*
@author
tenyears.cn
*/
public class
NIOClient
{
static
int
SIZE =
100
;
static
InetSocketAddress ip =
new
InetSocketAddress
(
"localhost"
,
12345
)
;
static
CharsetEncoder encoder = Charset.forName
(
"GB2312"
)
.newEncoder
()
;
static class
Download
implements
Runnable
{
protected
int
index;
public
Download
(
int
index
) {
this
.index = index;
}
public
void
run
() {
try
{
long
start = System.currentTimeMillis
()
;
SocketChannel client = SocketChannel.open
()
;
client.configureBlocking
(
false
)
;
Selector selector = Selector.open
()
;
client.register
(
selector, Selecti.OP_CONNECT
)
;
client.connect
(
ip
)
;
ByteBuffer buffer = ByteBuffer.allocate
(
8
*
1024
)
;
int
total =
0
;
FOR:
for
(
;;
) {
selector.select
()
;
Iterator iter = selector.selectedKeys
()
.iterator
()
;
while
(
iter.hasNext
()) {
Selecti key = iter.next
()
;
iter.remove
()
;
if
(
key.isConnectable
()) {
SocketChannel channel =
(
SocketChannel
)
key
.channel
()
;
if
(
channel.isConnectionPending
())
channel.finishConnect
()
;
channel.write
(
encoder.encode
(
CharBuffer
.wrap
(
"Hello from "
+ index
)))
;
channel.register
(
selector, Selecti.OP_READ
)
;
}
else if
(
key.isReadable
()) {
SocketChannel channel =
(
SocketChannel
)
key
.channel
()
;
int
count = channel.read
(
buffer
)
;
if
(
count >
0
) {
total += count;
buffer.clear
()
;
}
else
{
client.close
()
;
break
FOR;
}
}
}
}
double
last =
(
System.currentTimeMillis
()
- start
)
*
1.0
/
1000
;
System.out.println
(
"Thread "
+ index +
" downloaded "
+ total
+
"bytes in "
+ last +
"s."
)
;
}
catch
(
IOException e
) {
e.printStackTrace
()
;
}
}
}
public static
void
main
(
String
[]
args
)
throws
IOException
{
ExecutorService exec = Executors.newFixedThreadPool
(
SIZE
)
;
for
(
int
index =
0
; index < SIZE; index++
) {
exec.execute
(
new
Download
(
index
))
;
}
exec.shutdown
()
;
}
}
posted on 2007-01-19 00:03
苦笑枯 阅读(464)
评论(0) 编辑 收藏 所属分类:
Java