posts - 66,  comments - 40,  trackbacks - 0

本文将告诉你如何使用Netty2来编一个网络应用程序(包括客户端和服务端)。我会介绍一个简单的SumUp协议,用来对整数求和。通过源代码的一步步讲解,你会了解到Netty2的每个特性。

SumUp 协议

SumUp服务会加总从客户端送来的ADD消息中的所有值,并且为每个ADD消息返回一个RESULT消息。所有消息都是由header和body两部分组成:

header包含type和sequence两个字段。type表示消息的类型(0是RESULT消息,1是ADD消息)。sequence用来表示一组对应的ADD和RESULT(也就是说,服务器回应ADD消息时,应在RESULT中使用与ADD一样的sequence值)。

ADD 消息

ADD消息包含了要被求和的值。

RESULT消息

RESULT具有不固定长度的消息体。当计算没问题时,body内容是加总的值(4bytes),如果有错误或溢位,则是2bytes. 见下图:

 

实现MessageRecognizer

MessageRecognizer从送来的数据中重组出Message对象。这儿我们实现了一个SumUpMessageRecognizer,用于客户端和服务端的信息重组。

public class SumUpMessageRecognizer implements MessageRecognizer {
  
   publicstaticfinalint CLIENT_MODE = 1;
  
   publicstaticfinalint SERVER_MODE = 2;
  
   privateint mode;
  
   publicSumUpMessageRecognizer(int mode) {
        switch (mode) {
            case CLIENT_MODE:
                    case SERVER_MODE:
                 this.mode = mode;
                 break;
                     default:
                 thrownew IllegalArgumentException("invalid mode: " + mode);
        }
   }
  
   public Message recognize(ByteBuffer buf) throws MessageParseException {
        // return null if message type is not arrived yet.
        if (buf.remaining() < Constants.TYPE_LEN)
             returnnull;
    
        int type = buf.getShort();
        switch (mode) {
            // 如果是server模式,只让它接收ADD消息.
            case SERVER_MODE:
                 switch (type) {
                     case Constants.ADD:
                      returnnewAddMessage();
                     default:
                          thrownew MessageParseException("unknown type: " + type);
                 }
            //如果是客户端模式,只让它接收RESULT消息.
            case CLIENT_MODE:
                         switch (type) {
                     case Constants.RESULT:
                          returnnewResultMessage();
                     default:
                              thrownew MessageParseException("unknown type: " + type);
                 }
            default:
                 thrownew InternalError(); // this cannot happen
        }
   }
  }

实现ADD和RESULT消息

我们必须实现ADD和RESULT消息: ADD和RESULT。 它们都有公共的header,最好的方式是实现一个AbstractMessage,并且从它继承出Add和Result消息。

源代码:

  • AbstractMessage.java
    						1
    						/*
    				
  • 						2
    						 * @(#) $Id: AbstractMessage.java 11 2005-04-18 03:42:45Z trustin $
    						 */
    						package 
    						net.gleamynode.netty2.example.sumup;
    56import java.nio.ByteBuffer;
    78import net.gleamynode.netty2.Message;
    9import net.gleamynode.netty2.MessageParseException;
    
  • 						17
    						public
    						abstract
    						class
    						AbstractMessage implements Message {
    1819privatefinalint type;
    2021privateint sequence;
    2223privateboolean readHeader;
    2425privateboolean wroteHeader;
    2627protectedAbstractMessage(int type) {
    28this.type = type;
    29  	}
    3031publicint getSequence() {
    32return sequence;
    33  	}
    3435publicvoid setSequence(int sequence) {
    36this.sequence = sequence;
    37  	}
    3839publicfinalboolean read(ByteBuffer buf) throws MessageParseException {
    40// read a header if not read yet.41if (!readHeader) {
    42  			readHeader = readHeader(buf);
    43if (!readHeader)
    44return false;
    45  		}
    4647// Header is read, now try to read body48if (readBody(buf)) {
    49// finished reading single complete message50
  • 		readHeader = false; // reset state51returntrue;
    52  		} else53return false;
    54  	}
    5556privateboolean readHeader(ByteBuffer buf) throws MessageParseException {
    57// if header is not fully read, don't read it.58if (buf.remaining() < Constants.HEADER_LEN)
    59return false;
    6061// read header and validate the message62int readType = buf.getShort();
    63if (type != readType)
    64thrownew MessageParseException("type mismatches: " + readType
    65  					+ " (expected: " + type + ')');
    6667// read sequence number of the message68  		sequence = buf.getInt();
    69returntrue;
    70  	}
    7172protectedabstractboolean readBody(ByteBuffer buf)
    73  			throws MessageParseException;
    7475publicboolean write(ByteBuffer buf) {
    76// write a header if not written yet.77if (!wroteHeader) {
    78  			wroteHeader = writeHeader(buf);
    79if (!wroteHeader)
    80return false; // buffer is almost full perhaps81  		}
    8283// Header is written, now try to write body84if (writeBody(buf)) {
    85// finished writing single complete message86  			wroteHeader = false;
    87returntrue;
    88  		} else {
    89return false;
    90  		}
    91  	}
    9293privateboolean writeHeader(ByteBuffer buf) {
    94// check if there is enough space to write header95if (buf.remaining() < Constants.HEADER_LEN) return false;
    96  		buf.putShort((short) type);
    97  		buf.putInt(sequence);
    98returntrue;
    99  	}
    100101protectedabstractboolean writeBody(ByteBuffer buf);
    102 }
    



  •  

  • AddMessage.java
    						
    						
    						package 
    						
    								net.gleamynode.netty2.example.sumup
    						
    						
    								5
    						
    						
    								6
    						
    						import java.nio.ByteBuffer;
    78import net.gleamynode.netty2.MessageParseException;
    publicclassAddMessageextendsAbstractMessage {
    1718privateint value;
    1920publicAddMessage() {
    21super(Constants.ADD);
    22  	}
    2324publicint getValue() {
    25return value;
    26  	}
    2728publicvoid setValue(int value) {
    29this.value = value;
    30  	}
    3132protectedboolean readBody(ByteBuffer buf) throws MessageParseException {
    33// don't read body if it is partially readable34if (buf.remaining() < Constants.ADD_BODY_LEN) return false;
    35  		value = buf.getInt();
    36returntrue;
    37  	}
    3839protectedboolean writeBody(ByteBuffer buf) {
    40// check if there is enough space to write body41if (buf.remaining() < Constants.ADD_BODY_LEN)
    42return false;
    4344  		buf.putInt(value);
    4546returntrue;
    47  	}
    4849public String toString() {
    50// it is a good practice to create toString() method on message classes.51return getSequence() + ":ADD(" + value + ')';
    52  	}
    53  }
    


     

  • ResultMessage.jav

     

  • 								package 
    								
    										net.gleamynode.netty2.example.sumup
    								
    								
    										5
    								
    								
    										6
    								
    								import java.nio.ByteBuffer;
    78import net.gleamynode.netty2.MessageParseException;
    • 16 public class ResultMessage extends AbstractMessage { 1718privateboolean ok; 1920privateint value; 2122privateboolean processedResultCode; 2324publicResultMessage() { 25super(Constants.RESULT); 26 } 2728publicboolean isOk() { 29return ok; 30 } 3132publicvoid setOk(boolean ok) { 33this.ok = ok; 34 } 3536publicint getValue() { 37return value; 38 } 3940publicvoid setValue(int value) { 41this.value = value; 42 } 4344protectedboolean readBody(ByteBuffer buf) throws MessageParseException { 45if (!processedResultCode) { 46 processedResultCode = readResultCode(buf); 47if (!processedResultCode) 48return false; 49 } 5051if (ok) { 52if (readValue(buf)) { 53 processedResultCode = false; 54returntrue; 55 } else56return false; 57 } else { 58 processedResultCode = false; 59returntrue; 60 } 61 } 6263privateboolean readResultCode(ByteBuffer buf) { 64if (buf.remaining() < Constants.RESULT_CODE_LEN) 65return false; 66 ok = buf.getShort() == Constants.RESULT_OK; 67returntrue; 68 } 6970privateboolean readValue(ByteBuffer buf) { 71if (buf.remaining() < Constants.RESULT_VALUE_LEN) 72return false; 73 value = buf.getInt(); 74returntrue; 7576 } 7778protectedboolean writeBody(ByteBuffer buf) { 79// check if there is enough space to write body80if (buf.remaining() < Constants.RESULT_CODE_LEN 81 + Constants.RESULT_VALUE_LEN) 82return false; 8384 buf.putShort((short) (ok ? Constants.RESULT_OK 86 : Constants.RESULT_ERROR)); 87if (ok) 88 buf.putInt(value); 8990returntrue; 91 } 9293public String toString() { 94if (ok) { 95return getSequence() + ":RESULT(" + value + ')'; 96 } else { 97return getSequence() + ":RESULT(ERROR)"; 98 } 99 } 100 }

  •  

  • 实现协议处理流程

    实现了Messagerecognizer和Message之后,要实现Server和Client是非常容易的事情,通过下面的代码,你会很容易理解如何去实现协议的处理流程。

    实现Server

    实现服务端两个主要的类,一个是Server类,另一个是ServerSessionListener. Server类负责启动主程序并监听连接。而ServerSessionListener用于处理和发送消息。

    public class Server {

    privatestaticfinalint SERVER_PORT = 8080;
    privatestaticfinalint DISPATCHER_THREAD_POOL_SIZE = 16;

    publicstaticvoid main(String[] args) throws Throwable {
    // 初始化 I/O processor 和 event dispatcher
    IoProcessor ioProcessor = new IoProcessor();
    ThreadPooledEventDispatcher eventDispatcher = new OrderedEventDispatcher();
     
      // 启动缺省数量的I/O工作线程
      ioProcessor.start();
     
      // 启动指定数量的event dispatcher 线程
      eventDispatcher.setThreadPoolSize(DISPATCHER_THREAD_POOL_SIZE);
      eventDispatcher.start();
     
      // 准备 message recognizer
      MessageRecognizer recognizer = new SumUpMessageRecognizer(
      SumUpMessageRecognizer.SERVER_MODE);
     
      // 准备session监听器,用于处理通讯过程.
     ServerSessionListener listener = new ServerSessionListener();
     
     // 开启server socket通道
     ServerSocketChannel ssc = ServerSocketChannel.open();
     ssc.socket().bind(new InetSocketAddress(SERVER_PORT));
     
     // 监听连接,并开始通讯
     System.out.println("listening on port " + SERVER_PORT);
     for (;;) {
     // 接受connection
     SocketChannel channel = ssc.accept();
     
     // 建立新的session
     Session session = new Session(ioProcessor, channel, recognizer, eventDispatcher);
     
     // 添加session监听器
     session.addSessionListener(listener);
     
     // 开始通讯
     session.start();
     }
     }
     }
     

    public class ServerSessionListener implements SessionListener {

     

            public ServerSessionListener() {

            }

     

            public void connectionEstablished(Session session) {

                   System.out.println(session.getSocketAddress() + " connected");

     

                   // 设置空闲时间为60秒

                   session.getConfig().setIdleTime(60);

     

                   // 设置sum的初始值为0。

                   session.setAttachment(new Integer(0));

            }

     

            public void connectionClosed(Session session) {

                   System.out.println(session.getSocketAddress() + " closed");

             }

            // 当收到client发来的消息时,此方法被调用

             public void messageReceived(Session session, Message message) {

                    System.out.println(session.getSocketAddress() + " RCVD: " + message);

     

                   // client端只发送AddMessage. 其它情况要另作处理

                   // 在这里只是简单的进行类型转换处理

                   AddMessage am = (AddMessage) message;

     

                   // 将收到的消息里的值加上当前sum的值.

                   int sum = ((Integer) session.getAttachment()).intValue();

                   int value = am.getValue();

                   long expectedSum = (long) sum + value;

                   if (expectedSum > Integer.MAX_VALUE || expectedSum < Integer.MIN_VALUE) {

                           // 如果溢位返回错误消息

                           ResultMessage rm = newResultMessage();

                           rm.setSequence(am.getSequence()); // 从送来的Add消息中得到sequence值。

                           rm.setOk(false);

                           session.write(rm);

                   } else {

                           //  加总

                           sum = (int) expectedSum;

                           session.setAttachment(new Integer(sum));

     

                           // 返回结果消息

                           ResultMessage rm = newResultMessage();

                           rm.setSequence(am.getSequence()); // 从送来的Add消息中得到sequence值。

                           rm.setOk(true);

                           rm.setValue(sum);

                           session.write(rm);

                   }

            }

     

            public void messageSent(Session session, Message message) {

                   System.out.println(session.getSocketAddress() + " SENT: " + message);

            }

     

            public void sessionIdle(Session session) {

                   System.out.println(session.getSocketAddress()

                                   + " disconnecting the idle");

     

                   // 关闭空闲的会话。

                   session.close();

            }

            // 异常发生时,将调用此方法

            public void exceptionCaught(Session session, Throwable cause) {

                   System.out.println(Thread.currentThread().getName()

                                   + session.getSocketAddress() + " exception:");

                    cause.printStackTrace(System.out);

     

                    if (cause instanceof MessageParseException) {

                            // 印出错误信息内容,便于调试

                           MessageParseException mpe = (MessageParseException) cause;

                           ByteBuffer buf = mpe.getBuffer();

                           System.out.println(buf);

                           System.out.print("Buffer Content: ");

                           while (buf.remaining() > 0) {

                                   System.out.print(buf.get() & 0xFF);

                                   System.out.print(' ');

                           }

                           System.out.println();

                   }

     

                   // 关闭会话

                   session.close();

            }

    }

     

    服务端运行后,其输出的内容示例如下:

    listening on port 8080
    /127.0.0.1:4753 connected
    /127.0.0.1:4753 RCVD: 0:ADD(4)
    /127.0.0.1:4753 RCVD: 1:ADD(6)
    /127.0.0.1:4753 RCVD: 2:ADD(2)
    /127.0.0.1:4753 RCVD: 3:ADD(7)
    /127.0.0.1:4753 RCVD: 4:ADD(8)
    /127.0.0.1:4753 RCVD: 5:ADD(1)
    /127.0.0.1:4753 SENT: 0:RESULT(4)
    /127.0.0.1:4753 SENT: 1:RESULT(10)
    /127.0.0.1:4753 SENT: 2:RESULT(12)
    /127.0.0.1:4753 SENT: 3:RESULT(19)
    /127.0.0.1:4753 SENT: 4:RESULT(27)
    /127.0.0.1:4753 SENT: 5:RESULT(28)
    /127.0.0.1:4753 closed
    														实现客户端
    												
    跟服务端对应,主要由Client和ClientSessionListener组成。
    														

    public class Client {

            private static final String HOSTNAME = "localhost";

     

            private static final int PORT = 8080;

     

            private static final int CONNECT_TIMEOUT = 30; // seconds

     

            private static final int DISPATCHER_THREAD_POOL_SIZE = 4;

     

            public static void main(String[] args) throws Throwable {

                   // 预备要加总的值。

                   int[] values = newint[args.length];

                   for (int i = 0; i < args.length; i++) {

                           values[i] = Integer.parseInt(args[i]);

                   }

     

                   // 初始化 I/O processor 和 event dispatcher

                   IoProcessor ioProcessor = new IoProcessor();

                   ThreadPooledEventDispatcher eventDispatcher = new OrderedEventDispatcher();

     

                   // 开始缺省数量的I/O工作线程

                   ioProcessor.start();

     

                   // 启动指定数量的event dispatcher线程

            eventDispatcher.setThreadPoolSize(DISPATCHER_THREAD_POOL_SIZE

                   eventDispatcher.start();

     

                   // 准备 message recognizer

                   MessageRecognizer recognizer = new SumUpMessageRecognizer(

                                   SumUpMessageRecognizer.CLIENT_MODE);

     

                   // 准备客户端会话。

                   Session session = new Session(ioProcessor, new InetSocketAddress(

                                   HOSTNAME, PORT), recognizer, eventDispatcher);

                  

                  

                   session.getConfig().setConnectTimeout(CONNECT_TIMEOUT);

                  

                   // 开始会话,并使用ClientSessionListener监听。

                   ClientSessionListener listener = new ClientSessionListener(values);

                   session.addSessionListener(listener);

                   session.start();

                  

                   // 一直等到加总完成

                   while ( !listener.isComplete() ) {

                           Thread.sleep(1000);

                   }

                  

                   // 停止 I/O processor 和 event dispatcher

                   eventDispatcher.stop();

                   ioProcessor.stop();

             }

     }

    public class ClientSessionListener implements SessionListener {

     

             private final int[] values;

             private boolean complete;

     

             public ClientSessionListener(int[] values) {

                    this.values = values;

             }

            

             public boolean isComplete() {

                    return complete;

             }

            // 当连接建立好后会调用此方法。

             public void connectionEstablished(Session session) {

                    System.out.println("connected to " + session.getSocketAddress());

     

                    // 发送加总请求。

                    for (int i = 0; i < values.length; i++) {

                            AddMessage m = new AddMessage();

                            m.setSequence(i);

                            m.setValue(values[i]);

                            session.write(m);

                    }

             }

     

             public void connectionClosed(Session session) {

                    System.out.println("disconnected from " + session.getSocketAddress());

            }

            // 当收到server的回应信息时,会调用此方法

            public void messageReceived(Session session, Message message) {

                   System.out.println("RCVD: " + message);

     

                   // 服务端只发送ResultMessage. 其它情况下

                    // 要通过instanceOf来判断它的类型.

                    ResultMessage rm = (ResultMessage) message;

                    if (rm.isOk()) {

                            // 如果ResultMessage是OK的.

                            // 根据ResultMessage的sequence值来判断如果,

                            // 一次消息的sequence值,则

                            if (rm.getSequence() == values.length - 1) {

                                    // 打印出结果.

                                    System.out.println("The sum: " + rm.getValue());
                                   // 关闭会话

                                   session.close();

                                   complete = true;

                           }

                   } else {

                           // 如有错误,则打印错误信息,并结束会话.

                            System.out.println("server error, disconnecting...");

                           session.close();

                           complete = true;

                   }

            }

     

            public void messageSent(Session session, Message message) {

                   System.out.println("SENT: " + message);

            }

     

            public void sessionIdle(Session session) {

                  

            }

     

            public void exceptionCaught(Session session, Throwable cause) {

                   cause.printStackTrace(System.out);

     

                   if (cause instanceof ConnectException) {

                           // 如果连接server失败, 则间隔5秒重试连接.

                            System.out.println("sleeping...");

                            try {

                                    Thread.sleep(5000);

                            } catch (InterruptedException e) {

                            }

                           

                            System.out.println("reconnecting... " + session.getSocketAddress());

                            session.start();

                    } else {

                            session.close();

                    }

             }

     }

    通过上面的例子,你也许会发现实现一个自定义的协议原来如此简单。你如果用Netty试着去实现自己的smtp或pop协议,我想也不会是一件难事了。

     

    Netty2的首页在http://gleamynode.net/dev/projects/netty2/index.html,你可以在这找到本文的全部源码。

  • posted on 2006-06-20 19:45 happytian 阅读(368) 评论(0)  编辑  收藏

    只有注册用户登录后才能发表评论。


    网站导航:
     
    <2024年11月>
    272829303112
    3456789
    10111213141516
    17181920212223
    24252627282930
    1234567

    Welcome here, my friend!

    常用链接

    留言簿(12)

    随笔档案(66)

    文章分类

    文章档案(63)

    web

    最新随笔

    搜索

    •  

    积分与排名

    • 积分 - 88670
    • 排名 - 646

    最新评论

    阅读排行榜

    评论排行榜