E81086713E446D36F62B2AA2A3502B5EB155

Java杂家

杂七杂八。。。一家之言

BlogJava 首页 新随笔 联系 聚合 管理
  141 Posts :: 1 Stories :: 174 Comments :: 0 Trackbacks
前一篇博客,我简单提了下怎么为NIO2增加TransmitFile支持,文件传送吞吐量是一个性能关注点,此外,并发连接数也是重要的关注点。

不过JDK7中又一次做了简单的实现,不支持同时投递多个AcceptEx请求,只支持一次一个,返回后再投递。这样,客户端连接的接受速度必然大打折扣。不知道为什么sun会做这样的实现,WSASend()/WSAReceive()一次只允许一个还是可以理解,毕竟简化了编程,不用考虑封包乱序问题。
也降低了内存耗尽的风险。AcceptEx却没有这样的理由了。

于是再一次为了性能,我增加了同时投递多个的支持。

另外,在JDK7的默认实现中,AcceptEx返回后,为了设置远程和本地InetSocketAddress也采用了效率很低的方法。4次通过JNI调用getsockname,2次为了取sockaddr,2次为了取port. 这些操作本人采用GetAcceptExSockaddrs一次完成,进一步提高效率。


先看Java部分的代码,框架跟JDK7的一样,细节处理不一样:

/**
 * 
 
*/
package sun.nio.ch;

import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.channels.AcceptPendingException;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.CompletionHandler;
import java.nio.channels.NotYetBoundException;
import java.nio.channels.ShutdownChannelGroupException;
import java.security.AccessControlContext;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import sun.misc.Unsafe;

/**
 * This class enable multiple 'AcceptEx' post on the completion port, hence improve the concurrent connection number.
 * 
@author Yvon
 *
 
*/
public class WindowsMultiAcceptSupport {

    WindowsAsynchronousServerSocketChannelImpl schannel;

    
private static final Unsafe unsafe = Unsafe.getUnsafe();

    
// 2 * (sizeof(SOCKET_ADDRESS) + 16)
    private static final int ONE_DATA_BUFFER_SIZE = 88;

    
private long handle;
    
private Iocp iocp;

    
// typically there will be zero, or one I/O operations pending. In rare
    
// cases there may be more. These rare cases arise when a sequence of accept
    
// operations complete immediately and handled by the initiating thread.
    
// The corresponding OVERLAPPED cannot be reused/released until the completion
    
// event has been posted.
    private PendingIoCache ioCache;

    
private Queue<Long> dataBuffers;
    
// the data buffer to receive the local/remote socket address
    
//        private final long dataBuffer;

    
private AtomicInteger pendingAccept;
    
private int maxPending;

    Method updateAcceptContextM;
    Method acceptM;

    WindowsMultiAcceptSupport() {
        
//dummy for JNI code.
    }

    
public void close() throws IOException {

        schannel.close();

        
for (int i = 0; i < maxPending + 1; i++)//assert there is maxPending+1 buffer in the queue
        {
            
long addr = dataBuffers.poll();
            
// release  resources
            unsafe.freeMemory(addr);
        }

    }

    
/**
     * 
     
*/
    
public WindowsMultiAcceptSupport(AsynchronousServerSocketChannel ch, int maxPost) {
        
if (maxPost <= 0 || maxPost > 1024)
            
throw new IllegalStateException("maxPost can't less than 1 and greater than 1024");
        
this.schannel = (WindowsAsynchronousServerSocketChannelImpl) ch;
        maxPending 
= maxPost;
        dataBuffers 
= new ConcurrentLinkedQueue<Long>();
        
for (int i = 0; i < maxPending + 1; i++) {
            dataBuffers.add(unsafe.allocateMemory(ONE_DATA_BUFFER_SIZE));
        }

        pendingAccept 
= new AtomicInteger(0);
        
try {
            Field f 
= WindowsAsynchronousServerSocketChannelImpl.class.getDeclaredField("handle");
            f.setAccessible(
true);
            handle 
= f.getLong(schannel);


            f 
= WindowsAsynchronousServerSocketChannelImpl.class.getDeclaredField("iocp");
            f.setAccessible(
true);
            iocp 
= (Iocp) f.get(schannel);

            f 
= WindowsAsynchronousServerSocketChannelImpl.class.getDeclaredField("ioCache");
            f.setAccessible(
true);
            ioCache 
= (PendingIoCache) f.get(schannel);

            f 
= WindowsAsynchronousServerSocketChannelImpl.class.getDeclaredField("accepting");
            f.setAccessible(
true);
            AtomicBoolean accepting 
= (AtomicBoolean) f.get(schannel);

            accepting.set(
true);//disable accepting by origin channel.

        } 
catch (Exception e) {
            e.printStackTrace();
        }

    }

    @SuppressWarnings(
"unchecked")
    
public final <A> void accept(A attachment,
        CompletionHandler
<AsynchronousSocketChannel, ? super A> handler) {
        
if (handler == null)
            
throw new NullPointerException("'handler' is null");
        implAccept(attachment, (CompletionHandler
<AsynchronousSocketChannel, Object>) handler);
    }

    
/**
     * Task to initiate accept operation and to handle result.
     
*/
    
private class AcceptTask implements Runnable, Iocp.ResultHandler {

        
private final WindowsAsynchronousSocketChannelImpl channel;
        
private final AccessControlContext acc;
        
private final PendingFuture<AsynchronousSocketChannel, Object> result;
        
private final long dataBuffer;

        AcceptTask(WindowsAsynchronousSocketChannelImpl channel, AccessControlContext acc,
            
long dataBuffer, PendingFuture<AsynchronousSocketChannel, Object> result) {
            
this.channel = channel;
            
this.acc = acc;
            
this.result = result;
            
this.dataBuffer = dataBuffer;
        }

        
void enableAccept() {
            pendingAccept.decrementAndGet();
            dataBuffers.add(dataBuffer);
        }

        
void closeChildChannel() {
            
try {
                channel.close();
            } 
catch (IOException ignore) {
            }
        }

        
// caller must have acquired read lock for the listener and child channel.
        void finishAccept() throws IOException {
            
/**
             * JDK7 use 4 calls to getsockname  to setup
             * local& remote address, this is very inefficient.
             * 
             * I change this to use GetAcceptExSockaddrs
             
*/

            InetAddress[] socks 
= new InetAddress[2];
            
int[] ports = new int[2];
            updateAcceptContext(handle, channel.handle(), socks, ports, dataBuffer);
            InetSocketAddress local 
= new InetSocketAddress(socks[0], ports[0]);
            
final InetSocketAddress remote = new InetSocketAddress(socks[1], ports[1]);
            channel.setConnected(local, remote);

            
// permission check (in context of initiating thread)
            if (acc != null) {
                AccessController.doPrivileged(
new PrivilegedAction<Void>() {

                    
public Void run() {
                        SecurityManager sm 
= System.getSecurityManager();
                        sm.checkAccept(remote.getAddress().getHostAddress(), remote.getPort());

                        
return null;
                    }
                }, acc);
            }
        }

        
/**
         * Initiates the accept operation.
         
*/
        @Override
        
public void run() {
            
long overlapped = 0L;

            
try {
                
// begin usage of listener socket
                schannel.begin();
                
try {
                    
// begin usage of child socket (as it is registered with
                    
// completion port and so may be closed in the event that
                    
// the group is forcefully closed).
                    channel.begin();

                    
synchronized (result) {
                        overlapped 
= ioCache.add(result);

                      
                        
int n = accept0(handle, channel.handle(), overlapped, dataBuffer);//Be careful for the buffer address
                        if (n == IOStatus.UNAVAILABLE) {
                            
return;
                        }

                        
// connection accepted immediately
                        finishAccept();

                        
// allow another accept before the result is set
                        enableAccept();
                        result.setResult(channel);
                    }
                } 
finally {
                    
// end usage on child socket
                    channel.end();
                }
            } 
catch (Throwable x) {
                
// failed to initiate accept so release resources
                if (overlapped != 0L)
                    ioCache.remove(overlapped);
                closeChildChannel();
                
if (x instanceof ClosedChannelException)
                    x 
= new AsynchronousCloseException();
                
if (!(x instanceof IOException) && !(x instanceof SecurityException))
                    x 
= new IOException(x);
                enableAccept();
                result.setFailure(x);
            } 
finally {
                
// end of usage of listener socket
                schannel.end();
            }

            
// accept completed immediately but may not have executed on
            
// initiating thread in which case the operation may have been
            
// cancelled.
            if (result.isCancelled()) {
                closeChildChannel();
            }

            
// invoke completion handler
            Invoker.invokeIndirectly(result);
        }

        
/**
         * Executed when the I/O has completed
         
*/
        @Override
        
public void completed(int bytesTransferred, boolean canInvokeDirect) {
            
try {
                
// connection accept after group has shutdown
                if (iocp.isShutdown()) {
                    
throw new IOException(new ShutdownChannelGroupException());
                }

                
// finish the accept
                try {
                    schannel.begin();
                    
try {
                        channel.begin();
                        finishAccept();
                    } 
finally {
                        channel.end();
                    }
                } 
finally {
                    schannel.end();
                }

                
// allow another accept before the result is set
                enableAccept();
                result.setResult(channel);
            } 
catch (Throwable x) {
                enableAccept();
                closeChildChannel();
                
if (x instanceof ClosedChannelException)
                    x 
= new AsynchronousCloseException();
                
if (!(x instanceof IOException) && !(x instanceof SecurityException))
                    x 
= new IOException(x);
                result.setFailure(x);
            }

            
// if an async cancel has already cancelled the operation then
            
// close the new channel so as to free resources
            if (result.isCancelled()) {
                closeChildChannel();
            }

            
// invoke handler (but not directly)
            Invoker.invokeIndirectly(result);
        }

        @Override
        
public void failed(int error, IOException x) {
            enableAccept();
            closeChildChannel();

            
// release waiters
            if (schannel.isOpen()) {
                result.setFailure(x);
            } 
else {
                result.setFailure(
new AsynchronousCloseException());
            }
            Invoker.invokeIndirectly(result);
        }
    }

    Future
<AsynchronousSocketChannel> implAccept(Object attachment,
        
final CompletionHandler<AsynchronousSocketChannel, Object> handler) {
        
if (!schannel.isOpen()) {
            Throwable exc 
= new ClosedChannelException();
            
if (handler == null)
                
return CompletedFuture.withFailure(exc);
            Invoker.invokeIndirectly(schannel, handler, attachment, 
null, exc);
            
return null;
        }
        
if (schannel.isAcceptKilled())
            
throw new RuntimeException("Accept not allowed due to cancellation");

        
// ensure channel is bound to local address
        if (schannel.localAddress == null)
            
throw new NotYetBoundException();

        
// create the socket that will be accepted. The creation of the socket
        
// is enclosed by a begin/end for the listener socket to ensure that
        
// we check that the listener is open and also to prevent the I/O
        
// port from being closed as the new socket is registered.
        WindowsAsynchronousSocketChannelImpl ch = null;
        IOException ioe 
= null;
        
try {
            schannel.begin();
            ch 
= new WindowsAsynchronousSocketChannelImpl(iocp, false);
        } 
catch (IOException x) {
            ioe 
= x;
        } 
finally {
            schannel.end();
        }
        
if (ioe != null) {
            
if (handler == null)
                
return CompletedFuture.withFailure(ioe);
            Invoker.invokeIndirectly(
this.schannel, handler, attachment, null, ioe);
            
return null;
        }

        
// need calling context when there is security manager as
        
// permission check may be done in a different thread without
        
// any application call frames on the stack
        AccessControlContext acc =
            (System.getSecurityManager() 
== null? null : AccessController.getContext();

        PendingFuture
<AsynchronousSocketChannel, Object> result =
            
new PendingFuture<AsynchronousSocketChannel, Object>(schannel, handler, attachment);

        
// check and set flag to prevent concurrent accepting
        if (pendingAccept.get() >= maxPending)
            
throw new AcceptPendingException();
        pendingAccept.incrementAndGet();
        AcceptTask task 
= new AcceptTask(ch, acc, dataBuffers.poll(), result);
        result.setContext(task);

        
// initiate I/O
        if (Iocp.supportsThreadAgnosticIo()) {
            task.run();
        } 
else {
            Invoker.invokeOnThreadInThreadPool(
this.schannel, task);
        }
        
return result;
    }

    
//    //reimplements for performance
    static native void updateAcceptContext(long listenSocket, long acceptSocket,
        InetAddress[] addresses, 
int[] ports, long dataBuffer) throws IOException;

    
static native int accept0(long handle, long handle2, long overlapped, long dataBuffer);

}


对应的CPP代码如下:


/*
 * Class:     sun_nio_ch_WindowsMultiAcceptSupport
 * Method:    updateAcceptContext
 * Signature: (JJ[Ljava/net/InetAddress;[IJ)V
 
*/
JNIEXPORT 
void JNICALL Java_sun_nio_ch_WindowsMultiAcceptSupport_updateAcceptContext
(JNIEnv 
*env , jclass clazz, jlong listenSocket, jlong acceptSocket, jobjectArray sockArray,jintArray portArray,jlong buf)
{
    SOCKET s1 
= (SOCKET)jlong_to_ptr(listenSocket);
    SOCKET s2 
= (SOCKET)jlong_to_ptr(acceptSocket);
    PVOID outputBuffer 
= (PVOID)jlong_to_ptr(buf);
    INT iLocalAddrLen
=0;
    INT iRemoteAddrLen
=0;
    SOCKETADDRESS
* lpLocalAddr;
    SOCKETADDRESS
* lpRemoteAddr;
    jobject localAddr;
    jobject remoteAddr;
    jint ports[
2]={0};

    

    setsockopt(s2, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (
char *)&s1, sizeof(s1));

    (lpGetAcceptExSockaddrs)(outputBuffer,
        
0,
        
sizeof(SOCKETADDRESS)+16,
        
sizeof(SOCKETADDRESS)+16,
        (LPSOCKADDR
*)&lpLocalAddr,
        
&iLocalAddrLen,
        (LPSOCKADDR
*)&lpRemoteAddr,
        
&iRemoteAddrLen);

    localAddr
=lpNET_SockaddrToInetAddress(env,(struct sockaddr *)lpLocalAddr,(int *)ports);
    remoteAddr
=lpNET_SockaddrToInetAddress(env,(struct sockaddr *)lpRemoteAddr,(int *)(ports+1));

    env
->SetObjectArrayElement(sockArray,0,localAddr);
    env
->SetObjectArrayElement(sockArray,1,remoteAddr);
    env
->SetIntArrayRegion(portArray,0,2,ports);

}

/*
 * Class:     sun_nio_ch_WindowsMultiAcceptSupport
 * Method:    accept0
 * Signature: (JJJJ)I
 
*/
jint JNICALL Java_sun_nio_ch_WindowsMultiAcceptSupport_accept0
  (JNIEnv 
*env, jclass clazz, jlong listenSocket, jlong acceptSocket, jlong ov, jlong buf)
{

    BOOL res;
    SOCKET s1 
= (SOCKET)jlong_to_ptr(listenSocket);
    SOCKET s2 
= (SOCKET)jlong_to_ptr(acceptSocket);
    PVOID outputBuffer 
= (PVOID)jlong_to_ptr(buf);

    DWORD nread 
= 0;
    OVERLAPPED
* lpOverlapped = (OVERLAPPED*)jlong_to_ptr(ov);
    ZeroMemory((PVOID)lpOverlapped, 
sizeof(OVERLAPPED));

    

    
//why use SOCKETADDRESS?
    
//because client may use IPv6 to connect to server.
    res = (lpAcceptEx)(s1,
        s2,
        outputBuffer,
        
0,
        
sizeof(SOCKETADDRESS)+16,
        
sizeof(SOCKETADDRESS)+16,
        
&nread,
        lpOverlapped);

    
    
if (res == 0) {
        
int error = WSAGetLastError();
        
        
if (error == ERROR_IO_PENDING) {
            
            
return NIO2_IOS_UNAVAILABLE;
        }
    
    
        
return NIO2_THROWN;
    }



    
    
return 0;

}

这里用到的lpNET_SockaddrToInetAddress是JDK7中NET.DLL暴露的方法,从DLL里加载。相应代码如下:

*
 
* Class:     com_yovn_jabhttpd_utilities_SunPackageFixer
 
* Method:    initFds
 
* Signature: ()V
 
*/
JNIEXPORT 
void JNICALL Java_com_yovn_jabhttpd_utilities_SunPackageFixer_initFds
  (JNIEnv 
*env, jclass clazz)
{


    GUID GuidAcceptEx 
= WSAID_ACCEPTEX;
    GUID GuidTransmitFile 
= WSAID_TRANSMITFILE;
    GUID GuidGetAcceptExSockAddrs 
= WSAID_GETACCEPTEXSOCKADDRS;
    SOCKET s;
    
int rv;
    DWORD dwBytes;
    HMODULE hModule;


    s 
= socket(AF_INET, SOCK_STREAM, 0);
    
if (s == INVALID_SOCKET) {
        JNU_ThrowByName(env,
"java/io/IOException""socket failed");
        
return;
    }
    rv 
= WSAIoctl(s,
        SIO_GET_EXTENSION_FUNCTION_POINTER,
        (LPVOID)
&GuidAcceptEx,
        
sizeof(GuidAcceptEx),
        
&lpAcceptEx,
        
sizeof(lpAcceptEx),
        
&dwBytes,
        NULL,
        NULL);
    
if (rv != 0)
    {
        JNU_ThrowByName(env, 
"java/io/IOException","WSAIoctl failed on get AcceptEx ");
        
goto _ret;
    }
    rv 
= WSAIoctl(s,
        SIO_GET_EXTENSION_FUNCTION_POINTER,
        (LPVOID)
&GuidTransmitFile,
        
sizeof(GuidTransmitFile),
        
&lpTransmitFile,
        
sizeof(lpTransmitFile),
        
&dwBytes,
        NULL,
        NULL);
    
if (rv != 0)
    {
        JNU_ThrowByName(env, 
"java/io/IOException","WSAIoctl failed on get TransmitFile");
        
goto _ret;
    }
    rv 
= WSAIoctl(s,
        SIO_GET_EXTENSION_FUNCTION_POINTER,
        (LPVOID)
&GuidGetAcceptExSockAddrs,
        
sizeof(GuidGetAcceptExSockAddrs),
        
&lpGetAcceptExSockaddrs,
        
sizeof(lpGetAcceptExSockaddrs),
        
&dwBytes,
        NULL,
        NULL);
    
if (rv != 0)
    {
        JNU_ThrowByName(env, 
"java/io/IOException","WSAIoctl failed on get GetAcceptExSockaddrs");
        
goto _ret;
    }

    hModule
=LoadLibrary("net.dll");
    
if(hModule==NULL)
    {
        JNU_ThrowByName(env, 
"java/io/IOException","can't load java net.dll");
        
goto _ret;
    }


    lpNET_SockaddrToInetAddress
=(NET_SockaddrToInetAddress_t)GetProcAddress(hModule,"_NET_SockaddrToInetAddress@12");

    
if(lpNET_SockaddrToInetAddress==NULL)
    {
        JNU_ThrowByName(env, 
"java/io/IOException","can't resolve _NET_SockaddrToInetAddress function ");
        
        
    }

_ret:
    closesocket(s);
    
return;


}

细心的同学可能会发现,在创建socket之前没有初始化WinSock库,因为在这段代码前,我初始化了一个InetSocketAddress对象,这样JVM会加载NET.DLL并初始化WinSock库了。

OK,现在,你可以在支持类上同时发起多个AcceptEx请求了。

PS:基于这个我简单测试了下我的服务器,同时开5000个线程,每个下载3M多点的文件,一分钟内能够全部正确完成。
服务器正在开发中,有兴趣的请加入:http://code.google.com/p/jabhttpd


posted on 2009-12-04 17:57 DoubleH 阅读(3890) 评论(6)  编辑  收藏

Feedback

# re: 基于JDK7 NIO2的高性能web服务器实践之二[未登录] 2011-04-23 20:54 java
很想看看牛人的实现,但报403,说我没有权限,可否传一份给我.邮箱wahel30615571@hotmail.com  回复  更多评论
  

# re: 基于JDK7 NIO2的高性能web服务器实践之二[未登录] 2011-11-16 16:37 随意
哥们,代码能发到zhouchn8@163.com一份吗,谢谢  回复  更多评论
  

# re: 基于JDK7 NIO2的高性能web服务器实践之二[未登录] 2011-12-19 10:47 VV
老师 代码给一份吧,,,, vanlin (AT) 139 dot com  回复  更多评论
  

# re: 基于JDK7 NIO2的高性能web服务器实践之二 2012-02-05 14:39 梦想在飞
兄弟:给我一份,
544286609@qq.com 谢谢  回复  更多评论
  

# re: 基于JDK7 NIO2的高性能web服务器实践之二 2012-03-12 21:14 为梦想奔跑
老师,能否发份代码asshp1790@126.com,谢谢  回复  更多评论
  

# re: 基于JDK7 NIO2的高性能web服务器实践之二 2012-05-08 17:05 xingye
老师,你好,可以给我一份代码吗?liangxingye@163.com,谢谢  回复  更多评论
  


只有注册用户登录后才能发表评论。


网站导航: