庄周梦蝶

生活、程序、未来
   :: 首页 ::  ::  :: 聚合  :: 管理

yanf4j引入了客户端非阻塞API

Posted on 2009-02-19 00:15 dennis 阅读(1769) 评论(2)  编辑  收藏 所属分类: javamy open-source
    yanf4j发布一个0.50-beta2版本,这个版本最重要的改进就是引入了客户端连接非阻塞API,主要最近的工作要用到,所以添加了。两个核心类TCPConnectorControllerUDPConnectorController分别用于TCP和UDP的客户端连接控制。例如,现在的UDP echo client可以写成:

     //客户端echo handler
     class EchoClientHandler extends HandlerAdapter {

        
public void onReceive(Session udpSession, Object t) {
            DatagramPacket datagramPacket 
= (DatagramPacket) t;
            System.out.println(
"recv:" + new String(datagramPacket.getData()));
        }

        @Override
        
public void onMessageSent(Session session, Object t) {
            System.out.println(
"send:" + new String((byte[]) t));
        }

    }

       //连接代码,并发送UDP包

        UDPConnectorController connector 
= new UDPConnectorController();
        connector.setSoTimeout(
1000);
        connector.setHandler(
new EchoClientHandler());
        connector.connect(
new InetSocketAddress(InetAddress.getByName(host),
                port));
        
for (int i = 0; i < 10000; i++) {
            String s 
= "hello " + i;
            DatagramPacket packet 
= new DatagramPacket(s.getBytes(), s.length());
            connector.send(packet);
        }

    UDP不是面向连接的,因此connect方法仅仅是调用了底层DatagramChannel.connect方法,用来限制接收和发送的packet的远程端点。

    再来看看TCPConnectorController的使用,同样看Echo Client的实现:
//客户端的echo handler
class EchoHandler extends HandlerAdapter<String> {

        @Override
        
public void onConnected(Session session) {
            
try {
                
//一连接就发送NUM个字符串
                for (int i = 0; i < NUM; i++)
                    session.send(generateString(i));
             } 
catch (Exception e) {

             }
        }

        
public String generateString(int len) {
            StringBuffer sb 
= new StringBuffer();
            
for (int i = 0; i < MESSAGE_LEN; i++)
                sb.append(i);
            
return sb.toString();
        }

        @Override
        
public void onReceive(Session session, String t) {
            //打印接收到字符串
            if (DEBUG)
                System.out.println(
"recv:" + t);
            
        }

    }


//...连接API,TCPConnectorController示例
    Configuration configuration = new Configuration();
        configuration.setTcpSessionReadBufferSize(
256 * 1024); // 设置读的缓冲区大小
    TCPConnectorController    connector = new TCPConnectorController(configuration,
                
new StringCodecFactory());
    connector.setHandler(
new EchoHandler());
    connector.setCodecFactory(
new StringCodecFactory());
   
try {
            connector.Connect(
new InetSocketAddress("localhost"8080));
    } 
catch (IOExceptione) {
            e.printStackTrace();
    }

    注意,connect方法并不阻塞,而是立即返回,连接是否建立可以通过TCPConnectorController.isConnected()方法来判断,因此通常你可能会这样使用:

try {
            connector.Connect(
new InetSocketAddress("localhost"8080));
            
while(!connector.isConnected())
                ;
        } 
catch (Exception e) {
            e.printStackTrace();
        }

    来强制确保后面对connector的使用是已经连接上的connector,然而更好的做法是在Handler的onConnected()回调方法中处理逻辑,因为这个方法仅仅在连接建立后才会被调用。
    两个ConnectorController都有系列send方法,用于发送数据:
TCPConnectorController.send(Object msg) throws InterruptedException
UDPConnectorController.send(DatagramPacket packet) 
throws InterruptedException
UDPConnectorController.send(SocketAddress targetAddr, Object msg)
throws InterruptedException


    0.50-beta2带来的另一个修改就是Session接口添加setReadBufferByteOrder方法,用于设置session接收缓冲区的字节序,默认是网络字节序,也就是大端法。这个方法建议在Handler的onSessionStarted回调方法中调用。

    在0.50-beta最重要的修改是引入了session发送队列缓冲区的流量控制选项。默认情况下,session的发送缓冲队列是无界的,队列的push和pop也全然不会阻塞。在设置了缓冲队列的高低水位选项后即引入了发送流量控制,规则如下:
a)当发送队列中的数据总量大于高水位标记(highWaterMark),Session.send将阻塞
b)在条件a的作用下,Session.send的阻塞将持续到发送队列中的数据总量小于于低水位标记(lowWaterMark)才解除。


缓冲队列高低水位的设置通过Controller的下列方法设置:
     public void setSessionWriteQueueHighWaterMark(int highWaterMark);

     
public void setSessionWriteQueueLowWaterMark(int lowWaterMark);
 
缓冲队列的流量控制想法来自ACE的ACE_Message_Queue,是通过com.google.code.yanf4j.util.MessageQueue类实现的。

   0.50-beta还引入了Session.send(Object msg)的重载版本 Session.send(Object msg,long timeout),在超过timeout时间后send仍然阻塞时即终止send。注意,现在Session.send的这两个方法都返回一个bool值来表示send成功与否,并且都将响应中断(仅限启动了流量控制选项)抛出InterruptedException。

评论

# re: yanf4j引入了客户端非阻塞API  回复  更多评论   

2009-02-19 09:40 by Arbow
呃。。。工作不仅仅要用到ACE,连Java也有啊?

# re: yanf4j引入了客户端非阻塞API  回复  更多评论   

2009-02-19 09:46 by dennis
@Arbow
用java写测试客户端,毕竟比较熟悉

只有注册用户登录后才能发表评论。


网站导航: