honzeland

记录点滴。。。

常用链接

统计

Famous Websites

Java

Linux

P2P

最新评论

实现java UDP Server --2008农历新年第一贴(原创)

一、UDP Server
项目的需要,需要利用java实现一个udp server,主要的功能是侦听来自客户端的udp请求,客户请求可能是大并发量的,对于每个请求Server端的处理很简单,处理每个请求的时间大约在1ms左右,但是Server端需要维护一个对立于请求的全局变量Cache,项目本身已经采用Mina架构(http://mina.apache.org/),我要开发的Server作为整个项目的一个模块,由于之前没有开发UDP Server,受TCP Server的影响,很自然的想利用多线程来实现,对于每个客户请求,新建一个线程来处理相应的逻辑,在实现的过程中,利用Mina的Thread Model,实现了一个多线程的UDP Server,但是由于要维护一个全局Cache,需要在各线程之间同步,加之处理请求的时间很短,很快就发现在此利用多线程,并不能提高性能,于是决定采用单线程来实现,在动手之前,还是发现有两种方案来实现单线程UDP Server:
1) 采用JDK的DatagramSocket和DatagramPacket来实现
public class UDPServerUseJDK extends Thread{
    /**
     * Constructor
     * @param port port used to listen incoming connection
     * @throws SocketException error to consturct a DatagramSocket with this port
     */
    public MediatorServerUseJDK(int port) throws SocketException{
        this("MediatorServerThread", port);
    }
    
    /**
     * Constructor
     * @param name the thread name
     * @param port port used to listen incoming connection
     * @throws SocketException error to consturct a DatagramSocket with this port
     */
    public MediatorServerUseJDK(String name, int port) throws SocketException{
        super(name);
        socket = new DatagramSocket(port);
        System.out.println("Mediator server started on JDK model...");
        System.out.println("Socket buffer size: " + socket.getReceiveBufferSize());
    }
    
    public void run(){
        long startTime = 0;
        while(true){
            try {
                buf = new byte[1024];
                // receive request
                packet = new DatagramPacket(buf, buf.length);
                socket.receive(packet);
                .........
            }catch (IOException e) {
            .........
            }
        }
    }

2) 采用Mina的DatagramAcceptor来实现,在创建Exector的时候,只传递单个线程
public class MediatorServerUseMina {
    private DatagramAcceptor acceptor;

    public MediatorServerUseMina() {

    }

    public void startListener(InetSocketAddress address, IoHandler handler) {
        // create an acceptor with a single thread
        this.acceptor = new DatagramAcceptor(Executors.newSingleThreadExecutor());
        // configure the thread models
        DatagramAcceptorConfig acceptorConfig = acceptor.getDefaultConfig();
        acceptorConfig.setThreadModel(ThreadModel.MANUAL);
        // set the acceptor to reuse the address
        acceptorConfig.getSessionConfig().setReuseAddress(true);
        // add io filters
        DefaultIoFilterChainBuilder filterChainBuilder = acceptor.getFilterChain();
        // add CPU-bound job first,
        filterChainBuilder.addLast("codec", new ProtocolCodecFilter(new StringCodecFactory()));
        try {
            // bind
            acceptor.bind(address, handler);
            System.out.println("Mediator Server started on mina model...");
            System.out.println("Socket buffer size: " + acceptorConfig.getSessionConfig().getReceiveBufferSize());
        } catch (IOException e) {
            System.err.println("Error starting component listener on port(UDP) " + address.getPort() + ": "
                    + e.getMessage());
        }
    }
}
二、Performance Test
为了测试两个Server的性能,写了个简单的测试客户端
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.SocketException;
import java.net.UnknownHostException;

/**
 * @author Herry Hong
 *
 */

public class PerformanceTest {
    /** Number of threads to be created */
    static final int THREADS = 100;
    /** Packets to be sent per thread */
    static final int PACKETS = 500;
    /** The interval of two packets been sent for each thread */
    private static final int INTERVAL = 80;
    private static final String DATA = "5a76d93cb435fc54eba0b97156fe38f432a4e1da3a87cce8a222644466ed1317";

    private class Sender implements Runnable {
        private InetAddress address = null;
        private DatagramSocket socket = null;
        private String msg = null;
        private String name = null;
        private int packet_sent = 0;

        public Sender(String addr, String msg, String name) throws SocketException,
                UnknownHostException {
            this.address = InetAddress.getByName(addr);
            this.socket = new DatagramSocket();
            this.msg = msg;
            this.name = name;
        }

        @Override
        public void run() {
            // send request
            byte[] buf = msg.getBytes();
            DatagramPacket packet = new DatagramPacket(buf, buf.length,
                    address, 8000);
            try {
                for (int i = 0; i < PerformanceTest.PACKETS; i++) {
                    socket.send(packet);
                    packet_sent++;
                    Thread.sleep(INTERVAL);
                }
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //System.out.println("Thread " + name + " sends " + packet_sent + " packets.");
            //System.out.println("Thread " + name + " end!");
        }
    }

    /**
     * @param args
     */
    public static void main(String[] args) {
        if (args.length != 1) {
            System.out.println("Usage: java PerformanceTest <hostname>");
            return;
        }
        String msg;
        for (int i = 0; i < THREADS; i++) {
            if(i % 2 == 0){
                msg = i + "_" + (i+1) + ""r"n" + (i+1) + "_" + i + ""r"n" + DATA;
            }else{
                msg = i + "_" + (i-1) + ""r"n" + (i-1) + "_" + i + ""r"n" + DATA;
            }
            try {
                new Thread(new PerformanceTest().new Sender(args[0], msg, "" + i)).start();
            } catch (SocketException e) {
                e.printStackTrace();
            } catch (UnknownHostException e) {
                e.printStackTrace();
            }
        }
    }
}
三、测试结果
测试环境:
Server:AMD Athlon(tm) 64 X2 Dual Core Processor 4000+,1G memory
Client:AMD Athlon(tm) 64 X2 Dual Core Processor 4000+,1G memory
在测试的过程中,当INTERVAL设置的太小时,服务器端会出现丢包现象,INTERVAL越小,丢包越严重,为了提高Server的性能,特将Socket的ReceiveBufferSize设置成默认大小的两倍,
对于JDK实现:
    public MediatorServerUseJDK(String name, int port) throws SocketException{
        super(name);
        socket = new DatagramSocket(port);
        // set the receive buffer size to double default size
        socket.setReceiveBufferSize(socket.getReceiveBufferSize() * 2);
        System.out.println("Mediator server started on JDK model...");
        System.out.println("Socket buffer size: " + socket.getReceiveBufferSize());
    }
对于Mina实现:
        DatagramAcceptorConfig acceptorConfig = acceptor.getDefaultConfig();
        acceptorConfig.setThreadModel(ThreadModel.MANUAL);
        // set the acceptor to reuse the address
        acceptorConfig.getSessionConfig().setReuseAddress(true);
        // set the receive buffer size to double default size
        int recBufferSize = acceptorConfig.getSessionConfig().getReceiveBufferSize();
        acceptorConfig.getSessionConfig().setReceiveBufferSize(recBufferSize * 2);
此时,相同的INTERVAL,丢包现象明显减少。
接下来,再测试不同实现的性能差异:
UDP server started on JDK model...
Socket buffer size: 110592
INTERVAL = 100ms,没有出现丢包,
Process time: 49988
Process time: 49982
Process time: 49984
Process time: 49986
Process time: 49984
INTERVAL = 80ms,仍然没有丢包,不管Server是不是初次启动
Process time: 40006
Process time: 40004
Process time: 40003
Process time: 40005
Process time: 40013
UDP Server started on mina model...
Socket buffer size: 110592
INTERVAL = 80ms,Server初次启动时,经常会出现丢包,当第一次(指服务器初次启动时)没有丢包时,随后基本不丢包,
Process time: 39973
Process time: 40006
Process time: 40007
Process time: 40008
Process time: 40008
INTERVAL = 100ms,没有出现丢包
Process time: 49958
Process time: 49985
Process time: 49983
Process time: 49988
四、结论
在该要求下,采用JDK和Mina实现性能相当,但是在Server初次启动时JDK实现基本不会出现丢包,而Mina实现则在Server初次启动时经常出现丢包现象,在经历第一次测试后,两种实现处理时间相近,请求并发量大概为每ms一个请求时,服务器不会出现丢包。

posted on 2008-02-15 16:19 honzeland 阅读(7859) 评论(4)  编辑  收藏 所属分类: Java

评论

# re: 实现java UDP Server --2008农历新年第一贴(原创) 2008-04-24 10:19 yoson

你用的Mina什么版本?Mina2.0-M1中能this.acceptor = new DatagramAcceptor(Executors.newSingleThreadExecutor());
这样初始化吗?  回复  更多评论   

# re: 实现java UDP Server --2008农历新年第一贴(原创) 2008-04-29 11:50 honzeland

我的mina是1.1.5,2中就不知道了。呵呵,可以查看一下它的官方文档  回复  更多评论   

# re: 实现java UDP Server --2008农历新年第一贴(原创) 2008-09-14 16:11 hankesi2000

我在用1.1.7做测试时,也遇到了mina启动时经常丢包的情况,而且客户端如果使用IO方式的UDP则丢包率会比使用MINA的UDP丢包率少。估计mina的UDP客户端在启动时也不稳定。  回复  更多评论   

# re: 实现java UDP Server --2008农历新年第一贴(原创)[未登录] 2011-11-01 15:27

非常感谢你的分享 给我的帮助很大很大  回复  更多评论   


只有注册用户登录后才能发表评论。


网站导航: