庄周梦蝶

生活、程序、未来
   :: 首页 ::  ::  :: 聚合  :: 管理

    外面的阳光多么明媚,厦门空气真好,真想去海边走走,可惜我在上班。第一次遇到周6要上班(而非加班,是正常上班)是在广州那家公司,每月两个周6上班,两个周6不用,俗称大小周。刚开始很不习惯,后来也慢慢接受了,至少还有两个双休日。然而金融危机后,公司要求每周6都要上班,从那时候开始俺就坚定了要走的决心了。我很不明白为什么要求周6上班,程序员又不是计件计时的工人,不是时间花的越多,做的就越多,从很大程度上说程序员的工作是很难衡量的,一个人坐在那机器前,你很难知道他是在上网冲浪还是在写代码。
    管理者的逻辑仍然是传统工业的方式来看待程序员,以为人呆在公司,他就能做更多的事情,付出的薪水才值的,然而,他们忘了,程序员这个工作是纯粹的脑力工作(嗯,敲键盘可能要点体力),他们所做的东西依赖于他们的大脑,他们可以把代码写的很高效很健壮,也可以留下一堆bug和极其低效的垃圾代码,周6可能他写了10行代码,接下来的一周他可能需要用3天来修改上周6写下的那10行代码。大脑是需要休息的,超人是有,比如我认识的几个,可以毫无娱乐活动天天上班时下班后就是写代码,然而这样的生活不是我想要的,这样的生活也没办法维持多久,你得到的肯定比你失去的多,代码之外还有好多有趣的事情。

    我现在的生活就是太紧迫了,周6下班后,匆匆忙忙往车站赶,晚上9点到家,周日或者周一早上又匆匆忙忙赶回来上班,几乎没有休息的时间,厦门来了两个多月,一直想去中山路(《疯狂的赛车》拍摄地)和鼓浪屿瞧瞧也抽不出时间,平常下班后人都散了更是懒的动。这样下去我怀疑自己会得抑郁症,最近的效率感觉很低下。郁闷啊,可恨的周6上班。

posted @ 2009-03-14 12:38 dennis 阅读(693) | 评论 (6)编辑 收藏

    xmemcached发布1.0稳定版,下载地址这里
    相比于1.0-beta版本,这个released版本的主要改进如下:
1、xmemcached跟yanf4j都是默认采用common-logging,你可以使用log4j,也可以默认使用jdk的日志库。1.0添加了log4j的配置和依赖包。log4j的性能比jdk自带的日志库性能好多了。

2、添加了BufferAllocator接口,用于分配ByteBufferWrapper,ByteBufferWrapper顾名思义就是ByteBuffer的包装接口,因此BufferAllocator就是ByteBuffer的分配器,有两个实现:SimpleBufferAllocator,直接调用ByteBuffer.allocate(capacity)方法,不做任何缓存;一个是CachedBufferAllocator,采用ThreadLocal缓存ByteBuffer,避免重复创建,如果你对mina熟悉的话,这个概念没什么特别的。默认xmemcached采用的是SimpleBufferAllocator,你可以通过XMemcachedClient的构造方法设置想要采用的BufferAllocator:

public XMemcachedClient(BufferAllocator allocator) throws IOException;
//其他重载构造函数


    经过测试,采用CachedBufferAllocator并没有带来显著的性能提升,需要更多测试,慎用。

3、允许设置网络参数,在多个memcached节点的情况下,强烈推荐将网络层的读线程数(处理OP_READ)设置为接近节点数(具体还是要看场景测试,因为读线程数本质上是启动了一个线程池来处理读事件,太大也会影响效率):
    //XMemcachedClient的getDefaultConfiguration静态方法,获取默认配置
    public static Configuration getDefaultConfiguration() {
        Configuration configuration 
= new Configuration();
        configuration.setTcpRecvBufferSize(TCP_RECV_BUFF_SIZE);
        configuration.setSessionReadBufferSize(READ_BUFF_SIZE);
        configuration.setTcpNoDelay(TCP_NO_DELAY);
        configuration.setReadThreadCount(READ_THREAD_COUNT);
        
return configuration;
    }

    使用方法:
 Configuration conf=XMemcachedClient.getDefaultConfiguration();
  //设置读线程数为节点数,更多设置方法请参见Configuration类                       
 conf.setReadThreadCount(5);

 XMemcachedClient mc = new XMemcachedClient(
                    conf,
new CachedBufferAllocator());
 mc.addServer(ip1, port1);
 mc.addServer(ip2, port2);
 mc.addServer(ip3,port3);
 mc.addServer(ip4,port4);
 mc.addServer(ip5,port5);


4、修复一系列发现的bug,如Command返回结果需要设置成原子引用、更严格的方法参数检查、提示信息的友好、日志的优化、操作超时的时候取消操作等,重构部分代码

5、提供了javadoc文档,这里下载。


posted @ 2009-03-13 22:07 dennis 阅读(1882) | 评论 (2)编辑 收藏

    memcached本身是集中式的缓存系统,要搞多节点分布,只能通过客户端实现。memcached的分布算法一般有两种选择:
1、根据hash(key)的结果,模连接数的余数决定存储到哪个节点,也就是hash(key)% sessions.size(),这个算法简单快速,表现良好。然而这个算法有个缺点,就是在memcached节点增加或者删除的时候,原有的缓存数据将大规模失效,命中率大受影响,如果节点数多,缓存数据多,重建缓存的代价太高,因此有了第二个算法。
2、Consistent Hashing,一致性哈希算法,他的查找节点过程如下:
    首先求出memcached服务器(节点)的哈希值,并将其配置到0~232的圆(continuum)上。然后用同样的方法求出存储数据的键的哈希值,并映射到圆上。然后从数据映射到的位置开始顺时针查找,将数据保存到找到的第一个服务器上。如果超过232仍然找不到服务器,就会保存到第一台memcached服务器上。

    一致性哈希算法来源于P2P网络的路由算法,更多的信息可以读这里

    spymemcached和xmemcached都实现了一致性算法(其实我是照抄的),这里要测试下在使用一致性哈希的情况下,增加节点,看不同散列函数下命中率和数据分布的变化情况,这个测试结果对于spymemcached和xmemcached是一样的,测试场景:
    从一篇英文小说(《黄金罗盘》前三章)进行单词统计,并将最后的统计结果存储到memcached,以单词为key,以次数为value。单词个数为3061,memcached原来节点数为10,运行在局域网内同一台服务器上的不同端口,在存储统计结果后,增加两个memcached节点(也就是从10个节点增加到12个节点),统计此时的缓存命中率并查看数据的分布情况。
    结果如下表格,命中率一行表示增加节点后的命中率情况(增加前为100%),后续的行表示各个节点存储的单词数,CRC32_HASH表示采用CRC32散列函数,KETAMA_HASH是基于md5的散列函数也是默认情况下一致性哈希的推荐算法,FNV1_32_HASH就是FNV 32位散列函数,NATIVE_HASH就是java.lang.String.hashCode()方法返回的long取32位的结果,MYSQL_HASH是xmemcached添加的传说来自于mysql源码中的哈希函数。

   CRC32_HASH  KETAMA_HASH  FNV1_32_HASH  NATIVE_HASH  MYSQL_HASH
命中率
 78.5%  83.3%  78.2%  99.89%  86.9%
 节点1  319  366  546  3596  271
 节点2  399  350  191  1  233
 节点3  413  362  491  0  665
 节点4  393  364  214  1  42
 节点5  464  403  427  1  421
 节点6  472  306  299  0  285
 节点7  283  347  123  0  635
 节点8  382  387  257  2  408
 节点9  238  341  297  0  55
 节点10  239  375  756  0  586
 范围  200~500   300~400
 150~750  0~3600  50~650


结果分析:

1、命中率最高看起来是NATIVE_HASH,然而NATIVE_HASH情况下数据集中存储在第一个节点,显然没有实际使用价值。为什么会集中存储在第一个节点呢?这是由于在查找存储的节点的过程中,会比较hash(key)和hash(节点IP地址),而在采用了NATIVE_HASH的情况下,所有连接的hash值会呈现一个递增状况(因为String.hashCode是乘法散列函数),如:
192.168.0.100:12000 736402923
192.168.0.100:12001 736402924
192.168.0.100:12002 736402925
192.168.0.100:12003 736402926
如果这些值很大的会,那么单词的hashCode()会通常小于这些值的第一个,那么查找就经常只找到第一个节点并存储数据,当然,这里有测试的局限性,因为memcached都跑在一个台机器上只是端口不同造成了hash(节点IP地址)的连续递增,将分布不均匀的问题放大了。

2、从结果上看,KETAMA_HASH维持了一个最佳平衡,在增加两个节点后还能访问到83.3%的单词,并且数据分布在各个节点上的数目也相对平均,难怪作为默认散列算法。

3、最后,单纯比较下散列函数的计算效率:

CRC32_HASH:3266
KETAMA_HASH:7500
FNV1_32_HASH:375
NATIVE_HASH:187
MYSQL_HASH:500

   NATIVE_HASH > FNV1_32_HASH > MYSQL_HASH > CRC32_HASH > KETAMA_HASH


posted @ 2009-03-10 16:31 dennis 阅读(7762) | 评论 (0)编辑 收藏

   xmemcached发布1.0-beta,从0.60直接到1.0-beta,主要改进如下:
1、支持更多协议,在已有协议支持的基础上添加了append、prepend、gets、批量gets、cas协议的支持,具体请查看XMemcachedClient类的实例方法。重点是cas操作,下文将详细描述下。
2、memcached分布支持,支持连接多个memcached server,支持简单的余数分布和一致性哈希分布。
3、0.60版本以来的bug修复。

   memcached 1.2.4之后开始支持cas协议,该协议存储数据同时发送一个版本号,只有当这个版本号与memcached server上该key的最新版本一致时才更新成功,否则返回EXISTS,版本号的获取需要通过gets协议获得,cas全称就是compare and set,如果对hibernate乐观锁和java.util.concurrent.atomic包都比较熟悉的话这个概念应该很了解了。xmemcached 1.0-beta开始支持cas协议,看例子:
XMemcachedClient client = new XMemcachedClient();
client.addServer(
"localhost",11211);
client.set(
"a"01); //设置a为1
GetsResponse result = client.gets("a");
long cas = result.getCas(); //获取当前cas
//尝试更新a成2
if (!client.cas("a"02, cas)) 
    System.err.println(
"cas error");

    XMemcachedClient.cas(final String key, final int exp, Object value, long cas)将尝试更新key的值到value,如果失败就返回false。这样搞好像很麻烦,需要先gets获取cas值,然后再调用cas方法更新,因此XMemcached提供了一个包装类可以帮你搞定这两步,并且提供重试机制:

             /**
             * 合并gets和cas,利用CASOperation
             
*/
            client.cas(
"a"0new CASOperation() {

                @Override
                
public int getMaxTries() {
                    
return 10;
                }

                @Override
                
public Object getNewValue(long currentCAS, Object currentValue) {
                    System.out.println(
"current value " + currentValue);
                    
return 2;
                }

            });
    通过CASOperation,你只要实现两个方法即可,getMaxTries返回最大重试次数,超过这个次数还没有更新成功就抛出TimeoutException;getNewValue方法返回依据当前cas和缓存值,你希望设置的更新值。看一个cas更详细的例子,开100个线程递增缓冲中的变量a,采用cas才能保证最后a会等于100:

import java.util.concurrent.CountDownLatch;

import net.rubyeye.xmemcached.CASOperation;
import net.rubyeye.xmemcached.XMemcachedClient;
/**
 * 测试CAS
 * 
@author dennis
 
*/
class CASThread extends Thread {
    
private XMemcachedClient mc;
    
private CountDownLatch cd;

    
public CASThread(XMemcachedClient mc, CountDownLatch cdl) {
        
super();
        
this.mc = mc;
        
this.cd = cdl;

    }

    
public void run() {
        
try {
            
if (mc.cas("a"0new CASOperation() {
                @Override
                
public int getMaxTries() {
                    
return 50;
                }

                @Override
                
public Object getNewValue(long currentCAS, Object currentValue) {
                    System.out.println(
"currentValue=" + currentValue
                            
+ ",currentCAS=" + currentCAS);
                    
return ((Integer) currentValue).intValue() + 1;
                }

            }))
                
this.cd.countDown();
        } 
catch (Exception e) {
            e.printStackTrace();
        }
    }
}

public class CASTest {
    
static int NUM = 100;

    
public static void main(String[] args) throws Exception {
        XMemcachedClient mc 
= new XMemcachedClient();
        mc.addServer(
"192.168.222.100"11211);
        
// 设置初始值为0
        mc.set("a"00);
        CountDownLatch cdl 
= new CountDownLatch(NUM);
        
// 开NUM个线程递增变量a
        for (int i = 0; i < NUM; i++)
            
new CASThread(mc, cdl).start();

        cdl.await();
        
// 打印结果,最后结果应该为NUM
        System.out.println("result=" + mc.get("a"));
        mc.shutdown();
    }
}

    最高重试次数设置成了50,观察输出你就会知道cas冲突在高并发下非常频繁,这个操作应当慎用。

    说完cas,我们再来看下xmemcached对分布的支持。
1、如何添加多个memcached server?
通过XMemcachdClient.addServer(String ip,int port)方法,
            XMemcachedClient mc = new XMemcachedClient();
            mc.addServer(ip1, port1);
            mc.addServer(ip2, port2);
            mc.addServer(ip3, port3);
            mc.addServer(ip4, port3);

2、怎么分布?
在添加了>=2个memcached server后,对XMemcachdClient的存储、删除等操作都将默认根据key的哈希值连接数的余数做分布,这也是spymemcached默认的分布算法。这个算法简单快速,然而在添加或者移除memcached server后,缓存会大面积失效需要重组,这个代价太高,因此还有所谓Consistent Hashing算法,通过将memcached节点分布在一个0-2^128-1的环上,发送数据到某个节点经过的跳跃次数可以缩减到O(lgn)次,并且在添加或者移除节点时最大限度的降低影响,这个算法的思想其实来源于p2p网络的路由算法,不过路由算法比这个复杂多了,毕竟memcached的分布是在客户端,因此不需要节点之间的通讯和路由表的存储更新等。这个算法在java上的实现可以通过TreeMap红黑树,具体可以参考这里这里
  在xmemcached启动Consistent Hashing如下:

XMemcachedClient client = new XMemcachedClient(new KetamaMemcachedSessionLocator(HashAlgorithm.CRC32_HASH));
client.addServer(ip, 
12000);
client.addServer(ip, 
12001);
client.addServer(ip, 
11211);
client.addServer(ip, 
12003);
client.addServer(ip, 
12004);

  散列函数采用CRC32,你也可以采用其他散列函数,具体看场景测试而定,散列函数决定了你的查找节点效率和缓存重新分布的均衡程度。
 
  在完成1.0-beta这个里程碑版本后,xmemcached将集中于稳定性方面的测试和性能优化。欢迎提交测试报告和建议,我的email killme2008@gmail.com





posted @ 2009-03-09 15:32 dennis 阅读(1937) | 评论 (1)编辑 收藏

    翠花,上图,首先是容器类和自定义对象的get、set在不同并发下的表现





    很明显,在linux下,spymemcached读写复杂对象的效率远远超过在windows下的表现,xmemcached在两个平台之间表现平稳,在linux上get效率低于spymemcached,差距比较大,准备再优化下;set效率略高于spymemcached。

    xmemcached  0.70将支持多服务器功能和简单的分布能力,基于hash key后模节点数的余数值做分布,这也是spymemcached默认的分布方式,一致性哈希暂不实现。下面是在linux下多节点情况下读写简单类型的效率对比





   两者都是在从一个节点到两个节点的变化中效率有一个显著下降,在2个节点到更多节点过程中下降的幅度开始减小,曲线变的相对平稳。

xmemcached路线图
0.70  多服务器和简单分布
0.80  更多memcached协议支持
0.90  一致性哈希算法的实现


posted @ 2009-03-07 10:43 dennis 阅读(1702) | 评论 (1)编辑 收藏


   充分利用jprofile等工具观察性能瓶颈,才能对症下药,盲目的优化只是在浪费时间,并且效果可能恰恰相反
1、 观察到CountDownLatch.await占据最多CPU时间,一开始认为是由于jprofiler带来的影响,导致这个方法调用时间过长,从而忽 略了这一点,导致后面走了不少弯路。实际上await方法占用50%的CPU,而网络层和序列化开销却比较低,这恰恰说明这两者的效率低下,没办法充分利 用CPU时间,后来观察spymemcached的CPU占用情况,await占用的时间低于30%,优化后的结果也是如此。

2、因为没有深入理解这一点,我就盲目地开始优化,先从优化协议匹配算法开始,匹配ByteBuffer一开始用简单匹配(O(m*n)复杂 度),后来替代以KMP算法做匹配,想当然以为会更快,比较了两者效率之后才发现KMP的实现竟然比简单匹配慢了很多,马上google,得知比之kmp 算法效率高上几倍的有BM算法,马上实现之,果然比KMP和简单匹配都快。换了算法后,一测试,有提升,但很少,显然这不是热点。然后开始尝试改线程模型并测试,一开始想的是往上加线程,毕竟序列化是计算密集型,搞cpu个数的线程去发送command,调整读Buffer的线程数,测试效率没有提升甚至 有所降低,期间还测试了将协议处理改成批处理模式等,全部以失败告终。

3、此时才想起应该观察下spymemcached的CPU使用情况,才有了上面1点提到的观察,记的在测试yanf4j的echo server的时候,我发现读Buffer线程数设为0的事情下比之1的效率更高,也就是说仅启动一个线程处理Select、OP_WRITE和 OP_READ的事件,对于echo这样简单的任务来说是非常高效的,难道memcached也如此?立马设置为0并测试,果然提升很多,与 spymemcached的TPS差距一下减小了2000多,进一步观察,由于xmemcached构建在yanf4j的基础上,为了分层清晰导致在发送 和接收消息环节有很多冗余的操作,并且我还多启动了一个线程做command发送和优化get、set操作,如果能磨平这些差异,扩展yanf4j,避免了队列同步开销,这样也不用额外启动线程,效率是否更高呢?得益于yanf4j的模块化,修改工作顺利进行,最后的测试结果也证明了我的猜测,效率已经接近 spymemcached甚至超过。




posted @ 2009-03-06 14:37 dennis 阅读(1497) | 评论 (2)编辑 收藏

测试1:开N个线程读写删各10000次,key是String,Value是Integer,数据单位皆为TPS

 线程数           set            get          delete
   xmemcached  spymemcached  xmemcached spymemcached
 xmemcached spymemcached
 1  3368  3047  3422  3232  3787  3404
 10  12307  11742  15274  12623  13473  13473
 50  22115  23021  30769  22630  24483  23222
 100  22448  25467  32569  24105  25538  28119
 200  24187  26165  35320  21379  26683  28181
 500  24623  28810  36955  14328  27609  29789




观察下结果,明显的一点是xmemcached的get比之spyememcached快得多,考虑到memcached是作为缓存使用,这一点很重要。在set、delete上面仍然比spymemcached稍有不如,但是差距已经很小。



测试2:开N个线程读写各100次,key是String,Value是100个元素的map(map的key和value分别是String和一个自定义类NameClass),memcached内存加大,防止lru起作用。

 线程数           set            get
   xmemcached  spymemcached  xmemcached spymemcached
 1  492  377  581  531
 10  1362  84  831  753
 30  1536  66  1015  872
 50  1608  68  1126  1084
 100  1576  67  989  1347

  
 观察数据结果,难以理解的是spymemcached在写集合方面竟然如此低效,通过jprofiler观察两者的CPU占用,最大头的都是序列化自定义对象;不过我昨天在ubuntu下开发xmemcached的时候随手测过,spymemcached写集合并没有在windows下这么慢。

    以上测试数据使用的memcached是2.2版本,xmemcached是0.6版本,系统是windows xp,AMD双核2G内存,memcached是跑在局域网内的服务器上,版本是1.2.2。linux下的测试数据等晚上回家补上。


  



posted @ 2009-03-06 12:36 dennis 阅读(2030) | 评论 (0)编辑 收藏


   读写简单类型
   测试方法:开N个线程 ,每个线程set(或者get、delete) 10000次,表格如下(数据为tps,仅供参考)
 线程数    spymemcached      xmemcached  
   set  get delete
set
 get  delete
 1  2870  2922  3018  2237  2352  2500
 10  11015  11227  11449  8579  10440  8354
 50  19838  20685  22727  13239  24113  14382
 100  25427  22646  26700  18068  29046  18259
  
  结论:显然在简单类型的读写上,spymemcached全面占优,xmemcached唯一的亮点在于高并发下get的效率超过了spymemcached。对于连续的get操作,xmemcached将合并成一个批量的get操作提交,从而提高效率。

   读写100个元素的map,map的value是个自定义类,启动N个线程,每个线程set(或者get、delete) 100次,表格如下

 线程数    spymemcached    xmemcached
   set  get set
 get
 1  492  492  427  492
 10  159  680  1103  1122
 50  57  1103  1561  1226
 100  71  1308  1530  1223


    结论:在复杂对象的读写上,xmemcached全面占优。两者的CPU和内存占用差不多,肉眼观察做不得准。比较奇怪的是spymemcached的set竟然那么慢。

    测试所用类下载

    xmemcached发布0.50版本,欢迎更多测试和建议,邮箱 killme2008@gmail.com

posted @ 2009-03-04 19:09 dennis 阅读(5084) | 评论 (1)编辑 收藏

1、xmemcached是什么?

xmemcached是基于java nio实现的memcached客户端API。

实际上是基于我实现的一个简单nio框架 http://code.google.com/p/yanf4j/的基础上实现的(目前是基于yanf4j 0.52),核心代码不超过1000行,序列化机制直接挪用spymemcached的Transcoder。

性能方面,在读写简单类型上比之spymemcached还是有差距,在读写比较大的对象(如集合)有效率优势。

当 前0.50-beta版本,仅支持单个memcached服务器,以后考虑扩展。目前已经支持get、set、add、replace、delete、 incr、decr、version这几个协议。API为阻塞模型,而非spymemcached的异步模式,异步模型在批处理的时候有优势,但是阻塞模 式在编程难度和使用上会容易很多。

2、为什么叫xmemcached?

因为我在厦门(XM)混饭......


3、xmemcached的下载和使用

项目主页:http://code.google.com/p/xmemcached/

下载地址:http://code.google.com/p/xmemcached/downloads/list

下载的压缩包中包括了依赖库、源码和打包后的jar,放到项目的lib目录下即可使用。

示例参考:

package net.rubyeye.xmemcached.test;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.io.Serializable;

import net.rubyeye.xmemcached.XMemcachedClient;

class Name implements Serializable {
    String firstName;
    String lastName;
    
int age;
    
int money;

    
public Name(String firstName, String lastName, int age, int money) {
        
super();
        
this.firstName = firstName;
        
this.lastName = lastName;
        
this.age = age;
        
this.money = money;
    }

    
public String toString() {
        
return "[" + firstName + " " + lastName + ",age=" + age + ",money="
                
+ money + "]";
    }

}

public class Example {
    
public static void main(String[] args) {
        
try {
            String ip 
= "192.168.222.100";

            
int port = 11211;
            XMemcachedClient client 
= new XMemcachedClient(ip, port);
            
// 存储操作
            if (!client.set("hello"0"dennis")) {
                System.err.println(
"set error");
            }
            client.add(
"hello"0"dennis");
            client.replace(
"hello"0"dennis");

            
// get操作
            String name = (String) client.get("hello");
            System.out.println(name);

            
// 批量获取
            List<String> keys = new ArrayList<String>();
            keys.add(
"hello");
            keys.add(
"test");
            Map
<String, Object> map = client.get(keys);
            System.out.println(
"map size:"+map.size());

            
// delete操作
            if (!client.delete("hello"1000)) {
                System.err.println(
"delete error");
            }

            
// incr,decr
            client.incr("a"4);
            client.decr(
"a"4);

            
// version
            String version = client.version();
            System.out.println(version);
            
// 增删改查自定义对象
            Name dennis = new Name("dennis""zhuang"26-1);
            System.out.println(
"dennis:" + dennis);
            client.set(
"dennis"0, dennis);

            Name cachedPerson 
= (Name) client.get("dennis");
            System.out.println(
"cachedPerson:" + cachedPerson);
            cachedPerson.money 
= -10000;

            client.replace(
"dennis"0, cachedPerson);
            Name cachedPerson2 
= (Name) client.get("dennis");
            System.out.println(
"cachedPerson2:" + cachedPerson2);

            
// delete
            client.delete("dennis");
            System.out.println(
"after delete:" + client.get("dennis"));
            client.shutdown();
        } 
catch (Exception e) {
            e.printStackTrace();
        }

    }
}

4、xmemcached的计划?

1)、添加多服务器和集群支持

2)、性能优化、重构

3)、添加cas原子操作以及更多协议支持

   
    有兴趣的瞧瞧,提提建议。

posted @ 2009-03-03 16:31 dennis 阅读(3240) | 评论 (6)编辑 收藏

    yanf4j发布一个0.50-beta2版本,这个版本最重要的改进就是引入了客户端连接非阻塞API,主要最近的工作要用到,所以添加了。两个核心类TCPConnectorControllerUDPConnectorController分别用于TCP和UDP的客户端连接控制。例如,现在的UDP echo client可以写成:

     //客户端echo handler
     class EchoClientHandler extends HandlerAdapter {

        
public void onReceive(Session udpSession, Object t) {
            DatagramPacket datagramPacket 
= (DatagramPacket) t;
            System.out.println(
"recv:" + new String(datagramPacket.getData()));
        }

        @Override
        
public void onMessageSent(Session session, Object t) {
            System.out.println(
"send:" + new String((byte[]) t));
        }

    }

       //连接代码,并发送UDP包

        UDPConnectorController connector 
= new UDPConnectorController();
        connector.setSoTimeout(
1000);
        connector.setHandler(
new EchoClientHandler());
        connector.connect(
new InetSocketAddress(InetAddress.getByName(host),
                port));
        
for (int i = 0; i < 10000; i++) {
            String s 
= "hello " + i;
            DatagramPacket packet 
= new DatagramPacket(s.getBytes(), s.length());
            connector.send(packet);
        }

    UDP不是面向连接的,因此connect方法仅仅是调用了底层DatagramChannel.connect方法,用来限制接收和发送的packet的远程端点。

    再来看看TCPConnectorController的使用,同样看Echo Client的实现:
//客户端的echo handler
class EchoHandler extends HandlerAdapter<String> {

        @Override
        
public void onConnected(Session session) {
            
try {
                
//一连接就发送NUM个字符串
                for (int i = 0; i < NUM; i++)
                    session.send(generateString(i));
             } 
catch (Exception e) {

             }
        }

        
public String generateString(int len) {
            StringBuffer sb 
= new StringBuffer();
            
for (int i = 0; i < MESSAGE_LEN; i++)
                sb.append(i);
            
return sb.toString();
        }

        @Override
        
public void onReceive(Session session, String t) {
            //打印接收到字符串
            if (DEBUG)
                System.out.println(
"recv:" + t);
            
        }

    }


//...连接API,TCPConnectorController示例
    Configuration configuration = new Configuration();
        configuration.setTcpSessionReadBufferSize(
256 * 1024); // 设置读的缓冲区大小
    TCPConnectorController    connector = new TCPConnectorController(configuration,
                
new StringCodecFactory());
    connector.setHandler(
new EchoHandler());
    connector.setCodecFactory(
new StringCodecFactory());
   
try {
            connector.Connect(
new InetSocketAddress("localhost"8080));
    } 
catch (IOExceptione) {
            e.printStackTrace();
    }

    注意,connect方法并不阻塞,而是立即返回,连接是否建立可以通过TCPConnectorController.isConnected()方法来判断,因此通常你可能会这样使用:

try {
            connector.Connect(
new InetSocketAddress("localhost"8080));
            
while(!connector.isConnected())
                ;
        } 
catch (Exception e) {
            e.printStackTrace();
        }

    来强制确保后面对connector的使用是已经连接上的connector,然而更好的做法是在Handler的onConnected()回调方法中处理逻辑,因为这个方法仅仅在连接建立后才会被调用。
    两个ConnectorController都有系列send方法,用于发送数据:
TCPConnectorController.send(Object msg) throws InterruptedException
UDPConnectorController.send(DatagramPacket packet) 
throws InterruptedException
UDPConnectorController.send(SocketAddress targetAddr, Object msg)
throws InterruptedException


    0.50-beta2带来的另一个修改就是Session接口添加setReadBufferByteOrder方法,用于设置session接收缓冲区的字节序,默认是网络字节序,也就是大端法。这个方法建议在Handler的onSessionStarted回调方法中调用。

    在0.50-beta最重要的修改是引入了session发送队列缓冲区的流量控制选项。默认情况下,session的发送缓冲队列是无界的,队列的push和pop也全然不会阻塞。在设置了缓冲队列的高低水位选项后即引入了发送流量控制,规则如下:
a)当发送队列中的数据总量大于高水位标记(highWaterMark),Session.send将阻塞
b)在条件a的作用下,Session.send的阻塞将持续到发送队列中的数据总量小于于低水位标记(lowWaterMark)才解除。


缓冲队列高低水位的设置通过Controller的下列方法设置:
     public void setSessionWriteQueueHighWaterMark(int highWaterMark);

     
public void setSessionWriteQueueLowWaterMark(int lowWaterMark);
 
缓冲队列的流量控制想法来自ACE的ACE_Message_Queue,是通过com.google.code.yanf4j.util.MessageQueue类实现的。

   0.50-beta还引入了Session.send(Object msg)的重载版本 Session.send(Object msg,long timeout),在超过timeout时间后send仍然阻塞时即终止send。注意,现在Session.send的这两个方法都返回一个bool值来表示send成功与否,并且都将响应中断(仅限启动了流量控制选项)抛出InterruptedException。

posted @ 2009-02-19 00:15 dennis 阅读(1768) | 评论 (2)编辑 收藏

仅列出标题
共56页: First 上一页 18 19 20 21 22 23 24 25 26 下一页 Last