实现了Messagerecognizer和Message之后,要实现Server和Client是非常容易的事情,通过下面的代码,你会很容易理解如何去实现协议的处理流程。
实现服务端两个主要的类,一个是Server类,另一个是ServerSessionListener. Server类负责启动主程序并监听连接。而ServerSessionListener用于处理和发送消息。
public
class ServerSessionListener implements SessionListener {
public ServerSessionListener() {
}
public
void connectionEstablished(Session session) {
System.out.println(session.getSocketAddress() + " connected");
// 设置空闲时间为60秒
session.getConfig().setIdleTime(60);
// 设置sum的初始值为0。
session.setAttachment(new Integer(0));
}
public
void connectionClosed(Session session) {
System.out.println(session.getSocketAddress() + " closed");
}
// 当收到client发来的消息时,此方法被调用
public
void messageReceived(Session session, Message message) {
System.out.println(session.getSocketAddress() + " RCVD: " + message);
// client端只发送AddMessage. 其它情况要另作处理
// 在这里只是简单的进行类型转换处理
AddMessage am = (AddMessage) message;
// 将收到的消息里的值加上当前sum的值.
int sum = ((Integer) session.getAttachment()).intValue();
int value = am.getValue();
long expectedSum = (long) sum + value;
if (expectedSum > Integer.MAX_VALUE || expectedSum < Integer.MIN_VALUE) {
// 如果溢位返回错误消息
ResultMessage rm = newResultMessage();
rm.setSequence(am.getSequence()); // 从送来的Add消息中得到sequence值。
rm.setOk(false);
session.write(rm);
} else {
// 加总
sum = (int) expectedSum;
session.setAttachment(new Integer(sum));
// 返回结果消息
ResultMessage rm = newResultMessage();
rm.setSequence(am.getSequence()); // 从送来的Add消息中得到sequence值。
rm.setOk(true);
rm.setValue(sum);
session.write(rm);
}
}
public
void messageSent(Session session, Message message) {
System.out.println(session.getSocketAddress() + " SENT: " + message);
}
public
void sessionIdle(Session session) {
System.out.println(session.getSocketAddress()
+ " disconnecting the idle");
// 关闭空闲的会话。
session.close();
}
// 异常发生时,将调用此方法
public
void exceptionCaught(Session session, Throwable cause) {
System.out.println(Thread.currentThread().getName()
+ session.getSocketAddress() + " exception:");
cause.printStackTrace(System.out);
if (cause instanceof MessageParseException) {
// 印出错误信息内容,便于调试
MessageParseException mpe = (MessageParseException) cause;
ByteBuffer buf = mpe.getBuffer();
System.out.println(buf);
System.out.print("Buffer Content: ");
while (buf.remaining() > 0) {
System.out.print(buf.get() & 0xFF);
System.out.print(' ');
}
System.out.println();
}
// 关闭会话
session.close();
}
}
服务端运行后,其输出的内容示例如下:
listening on port 8080
/127.0.0.1:4753 connected
/127.0.0.1:4753 RCVD: 0:ADD(4)
/127.0.0.1:4753 RCVD: 1:ADD(6)
/127.0.0.1:4753 RCVD: 2:ADD(2)
/127.0.0.1:4753 RCVD: 3:ADD(7)
/127.0.0.1:4753 RCVD: 4:ADD(8)
/127.0.0.1:4753 RCVD: 5:ADD(1)
/127.0.0.1:4753 SENT: 0:RESULT(4)
/127.0.0.1:4753 SENT: 1:RESULT(10)
/127.0.0.1:4753 SENT: 2:RESULT(12)
/127.0.0.1:4753 SENT: 3:RESULT(19)
/127.0.0.1:4753 SENT: 4:RESULT(27)
/127.0.0.1:4753 SENT: 5:RESULT(28)
/127.0.0.1:4753 closed
实现客户端
跟服务端对应,主要由Client和ClientSessionListener组成。
public
class Client {
private
static
final String HOSTNAME = "localhost";
private
static
final
int PORT = 8080;
private
static
final
int CONNECT_TIMEOUT = 30; // seconds
private
static
final
int DISPATCHER_THREAD_POOL_SIZE = 4;
public
static
void main(String[] args) throws Throwable {
// 预备要加总的值。
int[] values = newint[args.length];
for (int i = 0; i < args.length; i++) {
values[i] = Integer.parseInt(args[i]);
}
// 初始化 I/O processor 和 event dispatcher
IoProcessor ioProcessor = new IoProcessor();
ThreadPooledEventDispatcher eventDispatcher = new OrderedEventDispatcher();
// 开始缺省数量的I/O工作线程
ioProcessor.start();
// 启动指定数量的event dispatcher线程
eventDispatcher.setThreadPoolSize(DISPATCHER_THREAD_POOL_SIZE
eventDispatcher.start();
// 准备 message recognizer
MessageRecognizer recognizer = new SumUpMessageRecognizer(
SumUpMessageRecognizer.CLIENT_MODE);
// 准备客户端会话。
Session session = new Session(ioProcessor, new InetSocketAddress(
HOSTNAME, PORT), recognizer, eventDispatcher);
session.getConfig().setConnectTimeout(CONNECT_TIMEOUT);
// 开始会话,并使用ClientSessionListener监听。
ClientSessionListener listener = new ClientSessionListener(values);
session.addSessionListener(listener);
session.start();
// 一直等到加总完成
while ( !listener.isComplete() ) {
Thread.sleep(1000);
}
// 停止 I/O processor 和 event dispatcher
eventDispatcher.stop();
ioProcessor.stop();
}
}
public
class ClientSessionListener implements SessionListener {
private
final
int[] values;
private
boolean complete;
public ClientSessionListener(int[] values) {
this.values = values;
}
public
boolean isComplete() {
return complete;
}
// 当连接建立好后会调用此方法。
public
void connectionEstablished(Session session) {
System.out.println("connected to " + session.getSocketAddress());
// 发送加总请求。
for (int i = 0; i < values.length; i++) {
AddMessage m = new AddMessage();
m.setSequence(i);
m.setValue(values[i]);
session.write(m);
}
}
public
void connectionClosed(Session session) {
System.out.println("disconnected from " + session.getSocketAddress());
}
// 当收到server的回应信息时,会调用此方法
public
void messageReceived(Session session, Message message) {
System.out.println("RCVD: " + message);
// 服务端只发送ResultMessage. 其它情况下
// 要通过instanceOf来判断它的类型.
ResultMessage rm = (ResultMessage) message;
if (rm.isOk()) {
// 如果ResultMessage是OK的.
// 根据ResultMessage的sequence值来判断如果,
// 一次消息的sequence值,则
if (rm.getSequence() == values.length - 1) {
// 打印出结果.
System.out.println("The sum: " + rm.getValue());
// 关闭会话
session.close();
complete = true;
}
} else {
// 如有错误,则打印错误信息,并结束会话.
System.out.println("server error, disconnecting...");
session.close();
complete = true;
}
}
public
void messageSent(Session session, Message message) {
System.out.println("SENT: " + message);
}
public
void sessionIdle(Session session) {
}
public
void exceptionCaught(Session session, Throwable cause) {
cause.printStackTrace(System.out);
if (cause instanceof ConnectException) {
// 如果连接server失败, 则间隔5秒重试连接.
System.out.println("sleeping...");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
}
System.out.println("reconnecting... " + session.getSocketAddress());
session.start();
} else {
session.close();
}
}
}
通过上面的例子,你也许会发现实现一个自定义的协议原来如此简单。你如果用Netty试着去实现自己的smtp或pop协议,我想也不会是一件难事了。
Netty2的首页在http://gleamynode.net/dev/projects/netty2/index.html,你可以在这找到本文的全部源码。