摘要: 1、 为什么使用Nexus        如果没有私服,我们所需的所有构件都需要通过maven的中央仓库和第三方的Maven仓库下载到本地,而一个团队中的所有人都重复的从maven仓库下载构件无疑加大了仓库的负载和浪费了外网带宽,如果网速慢的话,还会影响项目的进程。很多情况下项目的开发都是在内网进行的,连接不到maven仓库怎么...  阅读全文
posted @ 2014-05-13 11:18 小马歌 阅读(34931) | 评论 (0)编辑 收藏
 

生产环境下,jvm参数可以设置如下:

-server –Xms256m –Xmx2G -XX:MaxPermSize=256m -XX:-UseGCOverheadLimit -XX:+UseParallelOldGC -XX:+UseParallelGC -XX:CompileThreshold=10 -XX:MaxInlineSize=1024 -Djava.awt.headless=true -Djmagick.systemclassloader=no -Djava.net.preferIPv4Stack=true -Dsun.net.client.defaultConnectTimeout=60000 -Dsun.net.client.defaultReadTimeout=60000 -Dnetworkaddress.cache.ttl=300 -Dsun.net.inetaddr.ttl=300


顺便说一句:–Xms和–Xmx如果配置成一样容易出现fullgc
posted @ 2014-05-09 17:02 小马歌 阅读(570) | 评论 (0)编辑 收藏
 

Redis常用数据类型

Redis最为常用的数据类型主要有以下五种:

  • String
  • Hash
  • List
  • Set
  • Sorted set

在具体描述这几种数据类型之前,我们先通过一张图了解下Redis内部内存管理中是如何描述这些不同数据类型的:

首先Redis内部使用一个redisObject对象来表示所有的key和value,redisObject最主要的信息如上图所示:type代表一个value对象具体是何种数据类型,encoding是不同数据类型在redis内部的存储方式,比如:type=string代表value存储的是一个普通字符串,那么对应的encoding可以是raw或者是int,如果是int则代表实际redis内部是按数值型类存储和表示这个字符串的,当然前提是这个字符串本身可以用数值表示,比如:"123" "456"这样的字符串。

这里需要特殊说明一下vm字段,只有打开了Redis的虚拟内存功能,此字段才会真正的分配内存,该功能默认是关闭状态的,该功能会在后面具体描述。通过上图我们可以发现Redis使用redisObject来表示所有的key/value数据是比较浪费内存的,当然这些内存管理成本的付出主要也是为了给Redis不同数据类型提供一个统一的管理接口,实际作者也提供了多种方法帮助我们尽量节省内存使用,我们随后会具体讨论。

下面我们先来逐一的分析下这五种数据类型的使用和内部实现方式:

  • String

    常用命令:

    set,get,decr,incr,mget 等。

    应用场景:

    String是最常用的一种数据类型,普通的key/value存储都可以归为此类,这里就不所做解释了。

    实现方式:

    String在redis内部存储默认就是一个字符串,被redisObject所引用,当遇到incr,decr等操作时会转成数值型进行计算,此时redisObject的encoding字段为int。

  • Hash

    常用命令:

    hget,hset,hgetall 等。

    应用场景:

    我们简单举个实例来描述下Hash的应用场景,比如我们要存储一个用户信息对象数据,包含以下信息:

    用户ID为查找的key,存储的value用户对象包含姓名,年龄,生日等信息,如果用普通的key/value结构来存储,主要有以下2种存储方式:

    第一种方式将用户ID作为查找key,把其他信息封装成一个对象以序列化的方式存储,这种方式的缺点是,增加了序列化/反序列化的开销,并且在需要修改其中一项信息时,需要把整个对象取回,并且修改操作需要对并发进行保护,引入CAS等复杂问题。

    第二种方法是这个用户信息对象有多少成员就存成多少个key-value对儿,用用户ID+对应属性的名称作为唯一标识来取得对应属性的值,虽然省去了序列化开销和并发问题,但是用户ID为重复存储,如果存在大量这样的数据,内存浪费还是非常可观的。

    那么Redis提供的Hash很好的解决了这个问题,Redis的Hash实际是内部存储的Value为一个HashMap,并提供了直接存取这个Map成员的接口,如下图:

    也就是说,Key仍然是用户ID, value是一个Map,这个Map的key是成员的属性名,value是属性值,这样对数据的修改和存取都可以直接通过其内部Map的Key(Redis里称内部Map的key为field), 也就是通过 key(用户ID) + field(属性标签) 就可以操作对应属性数据了,既不需要重复存储数据,也不会带来序列化和并发修改控制的问题。很好的解决了问题。

    这里同时需要注意,Redis提供了接口(hgetall)可以直接取到全部的属性数据,但是如果内部Map的成员很多,那么涉及到遍历整个内部Map的操作,由于Redis单线程模型的缘故,这个遍历操作可能会比较耗时,而另其它客户端的请求完全不响应,这点需要格外注意。

    实现方式:

    上面已经说到Redis Hash对应Value内部实际就是一个HashMap,实际这里会有2种不同实现,这个Hash的成员比较少时Redis为了节省内存会采用类似一维数组的方式来紧凑存储,而不会采用真正的HashMap结构,对应的value redisObject的encoding为zipmap,当成员数量增大时会自动转成真正的HashMap,此时encoding为ht。

  • List

    常用命令:

    lpush,rpush,lpop,rpop,lrange等。

    应用场景:

    Redis list的应用场景非常多,也是Redis最重要的数据结构之一,比如twitter的关注列表,粉丝列表等都可以用Redis的list结构来实现,比较好理解,这里不再重复。

    实现方式:

    Redis list的实现为一个双向链表,即可以支持反向查找和遍历,更方便操作,不过带来了部分额外的内存开销,Redis内部的很多实现,包括发送缓冲队列等也都是用的这个数据结构。

  • Set

    常用命令:

    sadd,spop,smembers,sunion 等。

    应用场景:

    Redis set对外提供的功能与list类似是一个列表的功能,特殊之处在于set是可以自动排重的,当你需要存储一个列表数据,又不希望出现重复数据时,set是一个很好的选择,并且set提供了判断某个成员是否在一个set集合内的重要接口,这个也是list所不能提供的。

    实现方式:

    set 的内部实现是一个 value永远为null的HashMap,实际就是通过计算hash的方式来快速排重的,这也是set能提供判断一个成员是否在集合内的原因。

  • Sorted set

    常用命令:

    zadd,zrange,zrem,zcard等

    使用场景:

    Redis sorted set的使用场景与set类似,区别是set不是自动有序的,而sorted set可以通过用户额外提供一个优先级(score)的参数来为成员排序,并且是插入有序的,即自动排序。当你需要一个有序的并且不重复的集合列表,那么可以选择sorted set数据结构,比如twitter 的public timeline可以以发表时间作为score来存储,这样获取时就是自动按时间排好序的。

    实现方式:

    Redis sorted set的内部使用HashMap和跳跃表(SkipList)来保证数据的存储和有序,HashMap里放的是成员到score的映射,而跳跃表里存放的是所有的成员,排序依据是HashMap里存的score,使用跳跃表的结构可以获得比较高的查找效率,并且在实现上比较简单。

常用内存优化手段与参数

通过我们上面的一些实现上的分析可以看出redis实际上的内存管理成本非常高,即占用了过多的内存,作者对这点也非常清楚,所以提供了一系列的参数和手段来控制和节省内存,我们分别来讨论下。

首先最重要的一点是不要开启Redis的VM选项,即虚拟内存功能,这个本来是作为Redis存储超出物理内存数据的一种数据在内存与磁盘换入换出的一个持久化策略,但是其内存管理成本也非常的高,并且我们后续会分析此种持久化策略并不成熟,所以要关闭VM功能,请检查你的redis.conf文件中 vm-enabled 为 no。

其次最好设置下redis.conf中的maxmemory选项,该选项是告诉Redis当使用了多少物理内存后就开始拒绝后续的写入请求,该参数能很好的保护好你的Redis不会因为使用了过多的物理内存而导致swap,最终严重影响性能甚至崩溃。

另外Redis为不同数据类型分别提供了一组参数来控制内存使用,我们在前面详细分析过Redis Hash是value内部为一个HashMap,如果该Map的成员数比较少,则会采用类似一维线性的紧凑格式来存储该Map, 即省去了大量指针的内存开销,这个参数控制对应在redis.conf配置文件中下面2项:

hash-max-zipmap-entries 64  hash-max-zipmap-value 512  hash-max-zipmap-entries 

含义是当value这个Map内部不超过多少个成员时会采用线性紧凑格式存储,默认是64,即value内部有64个以下的成员就是使用线性紧凑存储,超过该值自动转成真正的HashMap。

hash-max-zipmap-value 含义是当 value这个Map内部的每个成员值长度不超过多少字节就会采用线性紧凑存储来节省空间。

以上2个条件任意一个条件超过设置值都会转换成真正的HashMap,也就不会再节省内存了,那么这个值是不是设置的越大越好呢,答案当然是否定的,HashMap的优势就是查找和操作的时间复杂度都是O(1)的,而放弃Hash采用一维存储则是O(n)的时间复杂度,如果

成员数量很少,则影响不大,否则会严重影响性能,所以要权衡好这个值的设置,总体上还是最根本的时间成本和空间成本上的权衡。

同样类似的参数还有:

list-max-ziplist-entries 512

说明:list数据类型多少节点以下会采用去指针的紧凑存储格式。

list-max-ziplist-value 64 

说明:list数据类型节点值大小小于多少字节会采用紧凑存储格式。

set-max-intset-entries 512 

说明:set数据类型内部数据如果全部是数值型,且包含多少节点以下会采用紧凑格式存储。

最后想说的是Redis内部实现没有对内存分配方面做过多的优化,在一定程度上会存在内存碎片,不过大多数情况下这个不会成为Redis的性能瓶颈,不过如果在Redis内部存储的大部分数据是数值型的话,Redis内部采用了一个shared integer的方式来省去分配内存的开销,即在系统启动时先分配一个从1~n 那么多个数值对象放在一个池子中,如果存储的数据恰好是这个数值范围内的数据,则直接从池子里取出该对象,并且通过引用计数的方式来共享,这样在系统存储了大量数值下,也能一定程度上节省内存并且提高性能,这个参数值n的设置需要修改源代码中的一行宏定义REDIS_SHARED_INTEGERS,该值默认是10000,可以根据自己的需要进行修改,修改后重新编译就可以了。

Redis的持久化机制

Redis由于支持非常丰富的内存数据结构类型,如何把这些复杂的内存组织方式持久化到磁盘上是一个难题,所以Redis的持久化方式与传统数据库的方式有比较多的差别,Redis一共支持四种持久化方式,分别是:

  • 定时快照方式(snapshot)
  • 基于语句追加文件的方式(aof)
  • 虚拟内存(vm)
  • Diskstore方式

在设计思路上,前两种是基于全部数据都在内存中,即小数据量下提供磁盘落地功能,而后两种方式则是作者在尝试存储数据超过物理内存时,即大数据量的数据存储,截止到本文,后两种持久化方式仍然是在实验阶段,并且vm方式基本已经被作者放弃,所以实际能在生产环境用的只有前两种,换句话说Redis目前还只能作为小数据量存储(全部数据能够加载在内存中),海量数据存储方面并不是Redis所擅长的领域。下面分别介绍下这几种持久化方式:

定时快照方式(snapshot):

该持久化方式实际是在Redis内部一个定时器事件,每隔固定时间去检查当前数据发生的改变次数与时间是否满足配置的持久化触发的条件,如果满足则通过操作系统fork调用来创建出一个子进程,这个子进程默认会与父进程共享相同的地址空间,这时就可以通过子进程来遍历整个内存来进行存储操作,而主进程则仍然可以提供服务,当有写入时由操作系统按照内存页(page)为单位来进行copy-on-write保证父子进程之间不会互相影响。

该持久化的主要缺点是定时快照只是代表一段时间内的内存映像,所以系统重启会丢失上次快照与重启之间所有的数据。

基于语句追加方式(aof):

aof方式实际类似mysql的基于语句的binlog方式,即每条会使Redis内存数据发生改变的命令都会追加到一个log文件中,也就是说这个log文件就是Redis的持久化数据。

aof的方式的主要缺点是追加log文件可能导致体积过大,当系统重启恢复数据时如果是aof的方式则加载数据会非常慢,几十G的数据可能需要几小时才能加载完,当然这个耗时并不是因为磁盘文件读取速度慢,而是由于读取的所有命令都要在内存中执行一遍。另外由于每条命令都要写log,所以使用aof的方式,Redis的读写性能也会有所下降。

虚拟内存方式:

虚拟内存方式是Redis来进行用户空间的数据换入换出的一个策略,此种方式在实现的效果上比较差,主要问题是代码复杂,重启慢,复制慢等等,目前已经被作者放弃。

diskstore方式:

diskstore方式是作者放弃了虚拟内存方式后选择的一种新的实现方式,也就是传统的B-tree的方式,目前仍在实验阶段,后续是否可用我们可以拭目以待。

Redis持久化磁盘IO方式及其带来的问题

有Redis线上运维经验的人会发现Redis在物理内存使用比较多,但还没有超过实际物理内存总容量时就会发生不稳定甚至崩溃的问题,有人认为是基于快照方式持久化的fork系统调用造成内存占用加倍而导致的,这种观点是不准确的,因为fork 调用的copy-on-write机制是基于操作系统页这个单位的,也就是只有有写入的脏页会被复制,但是一般你的系统不会在短时间内所有的页都发生了写入而导致复制,那么是什么原因导致Redis崩溃的呢?

答案是Redis的持久化使用了Buffer IO造成的,所谓Buffer IO是指Redis对持久化文件的写入和读取操作都会使用物理内存的Page Cache,而大多数数据库系统会使用Direct IO来绕过这层Page Cache并自行维护一个数据的Cache,而当Redis的持久化文件过大(尤其是快照文件),并对其进行读写时,磁盘文件中的数据都会被加载到物理内存中作为操作系统对该文件的一层Cache,而这层Cache的数据与Redis内存中管理的数据实际是重复存储的,虽然内核在物理内存紧张时会做Page Cache的剔除工作,但内核很可能认为某块Page Cache更重要,而让你的进程开始Swap ,这时你的系统就会开始出现不稳定或者崩溃了。我们的经验是当你的Redis物理内存使用超过内存总容量的3/5时就会开始比较危险了。

下图是Redis在读取或者写入快照文件dump.rdb后的内存数据图:

总结:

  1. 根据业务需要选择合适的数据类型,并为不同的应用场景设置相应的紧凑存储参数。
  2. 当业务场景不需要数据持久化时,关闭所有的持久化方式可以获得最佳的性能以及最大的内存使用量。
  3. 如果需要使用持久化,根据是否可以容忍重启丢失部分数据在快照方式与语句追加方式之间选择其一,不要使用虚拟内存以及diskstore方式。
  4. 不要让你的Redis所在机器物理内存使用超过实际内存总量的3/5。

转载自:http://www.infoq.com/cn/articles/tq-redis-memory-usage-optimization-storage

posted @ 2014-05-09 17:00 小马歌 阅读(309) | 评论 (0)编辑 收藏
 

由于Dubbo底层采用Socket进行通信,自己对通信理理论也不是很清楚,所以顺便把通信的知识也学习一下。

n  通信理论

计算机与外界的信息交换称为通信。基本的通信方法有并行通信和串行通信两种。

1.一组信息(通常是字节)的各位数据被同时传送的通信方法称为并行通信。并行通信依靠并行I/O接口实现。并行通信速度快,但传输线根数多,只适用于近距离(相距数公尺)的通信。

2.一组信息的各位数据被逐位顺序传送的通信方式称为串行通信。串行通信可通过串行接口来实现。串行通信速度慢,但传输线少,适宜长距离通信。

串行通信按信息传送方向分为以下3种:

1)   单工

只能一个方向传输数据

【原创】Alibaba Dubbo框架同步调用原理分析-1 - sun - 学无止境

2)   半双工

信息能双向传输,但不能同时双向传输

【原创】Alibaba Dubbo框架同步调用原理分析-1 - sun - 学无止境

3)   全双工

能双向传输并且可以同时双向传输

【原创】Alibaba Dubbo框架同步调用原理分析-1 - sun - 学无止境 

n  Socket

Socket 是一种应用接口, TCP/IP 是网络传输协议,虽然接口相同, 但是不同的协议会有不同的服务性质。创建Socket 连接时,可以指定使用的传输层协议,Socket 可以支持不同的传输层协议(TCP 或UDP ),当使用TCP 协议进行连接时,该Socket 连接就是一个TCP 连接。Soket 跟TCP/IP 并没有必然的联系。Socket 编程接口在设计的时候,就希望也能适应其他的网络协议。所以,socket 的出现只是可以更方便的使用TCP/IP 协议栈而已。

引自:http://hi.baidu.com/lewutian/blog/item/b28e27fd446d641d09244d08.html

上一个通信理论其实是想说Socket(TCP)通信是全双工的方式

n  Dubbo远程同步调用原理分析

从Dubbo开源文档上了解到一个调用过程如下图

http://code.alibabatech.com/wiki/display/dubbo/User+Guide#UserGuide-APIReference

另外文档里有说明:Dubbo缺省协议采用单一长连接和NIO异步通讯,适合于小数据量大并发的服务调用,以及服务消费者机器数远大于服务提供者机器数的情况。

【原创】Alibaba Dubbo框架同步调用原理分析-1 - sun - 学无止境

Dubbo缺省协议,使用基于mina1.1.7+hessian3.2.1的tbremoting交互。

  • 连接个数:单连接
  • 连接方式:长连接
  • 传输协议:TCP
  • 传输方式:NIO异步传输
  • 序列化:Hessian二进制序列化
  • 适用范围:传入传出参数数据包较小(建议小于100K),消费者比提供者个数多,单一消费者无法压满提供者,尽量不要用dubbo协议传输大文件或超大字符串
  • 适用场景:常规远程服务方法调用

 通常,一个典型的同步远程调用应该是这样的:

【原创】Alibaba Dubbo框架同步调用原理分析-1 - sun - 学无止境

1, 客户端线程调用远程接口,向服务端发送请求,同时当前线程应该处于“暂停“状态,即线程不能向后执行了,必需要拿到服务端给自己的结果后才能向后执行

2, 服务端接到客户端请求后,处理请求,将结果给客户端
3, 客户端收到结果,然后当前线程继续往后执行

Dubbo里使用到了Socket(采用apache mina框架做底层调用)来建立长连接,发送、接收数据,底层使用apache mina框架的IoSession进行发送消息。

查看Dubbo文档及源代码可知,Dubbo底层使用Socket发送消息的形式进行数据传递,结合了mina框架,使用IoSession.write()方法,这个方法调用后对于整个远程调用(从发出请求到接收到结果)来说是一个异步的,即对于当前线程来说,将请求发送出来,线程就可以往后执行了,至于服务端的结果,是服务端处理完成后,再以消息的形式发送给客户端的。于是这里出现了2个问题:
  • 当前线程怎么让它“暂停”,等结果回来后,再向后执行?
  • 正如前面所说,Socket通信是一个全双工的方式,如果有多个线程同时进行远程方法调用,这时建立在client server之间的socket连接上会有很多双方发送的消息传递,前后顺序也可能是乱七八糟的,server处理完结果后,将结果消息发送给client,client收到很多消息,怎么知道哪个消息结果是原先哪个线程调用的?

分析源代码,基本原理如下:
  1. client一个线程调用远程接口,生成一个唯一的ID(比如一段随机字符串,UUID等),Dubbo是使用AtomicLong从0开始累计数字的
  2. 将打包的方法调用信息(如调用的接口名称,方法名称,参数值列表等),和处理结果的回调对象callback,全部封装在一起,组成一个对象object
  3. 向专门存放调用信息的全局ConcurrentHashMap里面put(ID, object)
  4. 将ID和打包的方法调用信息封装成一对象connRequest,使用IoSession.write(connRequest)异步发送出去
  5. 当前线程再使用callback的get()方法试图获取远程返回的结果,在get()内部,则使用synchronized获取回调对象callback的锁, 再先检测是否已经获取到结果,如果没有,然后调用callback的wait()方法,释放callback上的锁,让当前线程处于等待状态。
  6. 服务端接收到请求并处理后,将结果(此结果中包含了前面的ID,即回传)发送给客户端,客户端socket连接上专门监听消息的线程收到消息,分析结果,取到ID,再从前面的ConcurrentHashMap里面get(ID),从而找到callback,将方法调用结果设置到callback对象里。
  7. 监听线程接着使用synchronized获取回调对象callback的锁(因为前面调用过wait(),那个线程已释放callback的锁了),再notifyAll(),唤醒前面处于等待状态的线程继续执行(callback的get()方法继续执行就能拿到调用结果了),至此,整个过程结束。
这里还需要画一个大图来描述,后面再补了
需要注意的是,这里的callback对象是每次调用产生一个新的,不能共享,否则会有问题;另外ID必需至少保证在一个Socket连接里面是唯一的。

现在,前面两个问题已经有答案了,
  • 当前线程怎么让它“暂停”,等结果回来后,再向后执行?
     答:先生成一个对象obj,在一个全局map里put(ID,obj)存放起来,再用synchronized获取obj锁,再调用obj.wait()让当前线程处于等待状态,然后另一消息监听线程等到服务端结果来了后,再map.get(ID)找到obj,再用synchronized获取obj锁,再调用obj.notifyAll()唤醒前面处于等待状态的线程。
  • 正如前面所说,Socket通信是一个全双工的方式,如果有多个线程同时进行远程方法调用,这时建立在client server之间的socket连接上会有很多双方发送的消息传递,前后顺序也可能是乱七八糟的,server处理完结果后,将结果消息发送给client,client收到很多消息,怎么知道哪个消息结果是原先哪个线程调用的?
     答:使用一个ID,让其唯一,然后传递给服务端,再服务端又回传回来,这样就知道结果是原先哪个线程的了。

这种做法不是第一次见了,10年在上一公司里,也是远程接口调用,不过走的消息中间件rabbitmq,同步调用的原理跟这类似,详见:rabbitmq 学习-9- RpcClient发送消息和同步接收消息原理

关键代码:

com.taobao.remoting.impl.DefaultClient.java

//同步调用远程接口

public Object invokeWithSync(Object appRequest, RequestControl control) throws RemotingException, InterruptedException {

        byte protocol = getProtocol(control);

        if (!TRConstants.isValidProtocol(protocol)) {

            throw new RemotingException("Invalid serialization protocol [" + protocol + "] on invokeWithSync.");

        }

        ResponseFuture future = invokeWithFuture(appRequest, control);

        return future.get();  //获取结果时让当前线程等待,ResponseFuture其实就是前面说的callback

}

public ResponseFuture invokeWithFuture(Object appRequest, RequestControl control) {

         byte protocol = getProtocol(control);

         long timeout = getTimeout(control);

         ConnectionRequest request = new ConnectionRequest(appRequest);

         request.setSerializeProtocol(protocol);

         Callback2FutureAdapter adapter = new Callback2FutureAdapter(request);

         connection.sendRequestWithCallback(request, adapter, timeout);

         return adapter;

}

 

Callback2FutureAdapter implements ResponseFuture

public Object get() throws RemotingException, InterruptedException {

synchronized (this) {  // 旋锁

   while (!isDone) {  // 是否有结果了

wait(); //没结果是释放锁,让当前线程处于等待状态

   }

}

if (errorCode == TRConstants.RESULT_TIMEOUT) {

   throw new TimeoutException("Wait response timeout, request["

   + connectionRequest.getAppRequest() + "].");

}

else if (errorCode > 0) {

   throw new RemotingException(errorMsg);

}

else {

   return appResp;

}

}

客户端收到服务端结果后,回调时相关方法,即设置isDone = true并notifyAll()

public void handleResponse(Object _appResponse) {

         appResp = _appResponse; //将远程调用结果设置到callback中来

         setDone();

}

public void onRemotingException(int _errorType, String _errorMsg) {

         errorCode = _errorType;

         errorMsg = _errorMsg;

         setDone();

}

private void setDone() {

         isDone = true;

         synchronized (this) { //获取锁,因为前面wait()已经释放了callback的锁了

             notifyAll(); // 唤醒处于等待的线程

         }

}

 

com.taobao.remoting.impl.DefaultConnection.java

 

// 用来存放请求和回调的MAP

private final ConcurrentHashMap<Long, Object[]> requestResidents;

 

//发送消息出去

void sendRequestWithCallback(ConnectionRequest connRequest, ResponseCallback callback, long timeoutMs) {

         long requestId = connRequest.getId();

         long waitBegin = System.currentTimeMillis();

         long waitEnd = waitBegin + timeoutMs;

         Object[] queue = new Object[4];

         int idx = 0;

         queue[idx++] = waitEnd;

         queue[idx++] = waitBegin;   //用于记录日志

         queue[idx++] = connRequest; //用于记录日志

         queue[idx++] = callback;

         requestResidents.put(requestId, queue); // 记录响应队列

         write(connRequest);

 

         // 埋点记录等待响应的Map的大小

         StatLog.addStat("TBRemoting-ResponseQueues", "size", requestResidents.size(),

                   1L);

}

public void write(final Object connectionMsg) {

//mina里的IoSession.write()发送消息

         WriteFuture writeFuture = ioSession.write(connectionMsg);

         // 注册FutureListener,当请求发送失败后,能够立即做出响应

         writeFuture.addListener(new MsgWrittenListener(this, connectionMsg));

}

 

/**

* 在得到响应后,删除对应的请求队列,并执行回调

* 调用者:MINA线程

*/

public void putResponse(final ConnectionResponse connResp) {

         final long requestId = connResp.getRequestId();

         Object[] queue = requestResidents.remove(requestId);

         if (null == queue) {

             Object appResp = connResp.getAppResponse();

             String appRespClazz = (null == appResp) ? "null" : appResp.getClass().getName();

             StringBuilder sb = new StringBuilder();

             sb.append("Not found response receiver for requestId=[").append(requestId).append("],");

             sb.append("from [").append(connResp.getHost()).append("],");

             sb.append("response type [").append(appRespClazz).append("].");

             LOGGER.warn(sb.toString());

             return;

         }

         int idx = 0;

         idx++;

         long waitBegin = (Long) queue[idx++];

         ConnectionRequest connRequest = (ConnectionRequest) queue[idx++];

         ResponseCallback callback = (ResponseCallback) queue[idx++];

         // ** 把回调任务交给业务提供的线程池执行 **

         Executor callbackExecutor = callback.getExecutor();

         callbackExecutor.execute(new CallbackExecutorTask(connResp, callback));

 

         long duration = System.currentTimeMillis() - waitBegin; // 实际读响应时间

         logIfResponseError(connResp, duration, connRequest.getAppRequest());

}

 

CallbackExecutorTask

static private class CallbackExecutorTask implements Runnable {

         final ConnectionResponse resp;

         final ResponseCallback callback;

         final Thread createThread;

 

         CallbackExecutorTask(ConnectionResponse _resp, ResponseCallback _cb) {

             resp = _resp;

             callback = _cb;

             createThread = Thread.currentThread();

         }

 

         public void run() {

             // 预防这种情况:业务提供的Executor,让调用者线程来执行任务

             if (createThread == Thread.currentThread()

                       && callback.getExecutor() != DIYExecutor.getInstance()) {

                   StringBuilder sb = new StringBuilder();

                   sb.append("The network callback task [" + resp.getRequestId() + "] cancelled, cause:");

                   sb.append("Can not callback task on the network io thhread.");

                   LOGGER.warn(sb.toString());

                   return;

             }

 

             if (TRConstants.RESULT_SUCCESS == resp.getResult()) {

                   callback.handleResponse(resp.getAppResponse()); //设置调用结果

             }

             else {

                   callback.onRemotingException(resp.getResult(), resp

                            .getErrorMsg());  //处理调用异常

             }

         }

}

 

另外:

1, 服务端在处理客户端的消息,然后再处理时,使用了线程池来并行处理,不用一个一个消息的处理

同样,客户端接收到服务端的消息,也是使用线程池来处理消息,再回调

 

转载自:http://sunjun041640.blog.163.com/blog/static/256268322011111882453405/

posted @ 2014-05-09 15:49 小马歌 阅读(4672) | 评论 (0)编辑 收藏
 
     摘要: 一、Redis服务器端的安装和客户端Jedis的安装1.下载Redis   下载地址:http://redis.googlecode.com/files/redis-2.4.8.tar.gz 2.安装Redis在linux下运行如下命令进行安装。Shell代码  $ tar xzf redis-2.4.8.tar.gz...  阅读全文
posted @ 2014-05-09 15:21 小马歌 阅读(2184) | 评论 (0)编辑 收藏
 

javapns是一个java实现的APNs的provider库,利用这个库可以向apple的APNs服务器推送基本的以及自定义的APNs消息、从APNs服务器接收详细发送情况报告(error packet)和查询反馈信息(feedback)。下面介绍其基本的用法。

一、下载javapns库和其依赖的库:

[codesyntax lang="bash"]

svn checkout http://javapns.googlecode.com/svn/trunk/ javapns-read-only

[/codesyntax]

依赖库:

· commons-lang-2.4.jar

· commons-io-1.4.jar

· bcprov-jdk15-146.jar

· log4j-1.2.15.jar

这几个都是开源库。

在工程中导入这几个库。

 

二、推送通知的方法:

Push.alert:推送一条仅包含alert的消息

Push.sound:推送一条仅包含sound的消息

Push.badge:推送一条仅包含badge的消息

Push.combine:推送一条包含alertsoundbadge的消息

也可以自己构造Payload,然后传递给Push.payload方法发送一个payload,给一个或多个设备。或者调用Push.payloads方法把payload一对一的发送给设备,这个需要预先构造PayloadPerDevice类的实例。

自己构造构造Payload的实例的基本方法:

 

[codesyntax lang="java5" lines="fancy"]

PushNotificationPayload payload = new PushNotificationPayload();  //声明一个空的payload  payload.setExpiry(1);  //设置payload的过期时间  payload.addAlert("alert message");  //设置alert消息  payload.addBadge(3);  //设置badge值  payload.addSound("beep.wav");  //设置声音  payload.addCustomAlertActionLocKey("launch apns");  //设置ActionLocKey  payload.addCustomAlertLocKey("locKey");  //设置LocKey  payload.addCustomDictionary("custom1", "value1");  //增加一个自定义键值对  List<PushedNotification> notifications = Push.payload(payload, "apns-key+cert-exported-from-mac.p12", "hadoop",   false,   "def981279b88b3a858b9dc9ea35b893175d5d190e2a271d448ee0679ad5bd880");  //调用Push.payload方法发送这个payload,发回一个已发送的notification的列表

[/codesyntax]

 

三、处理APNs服务器的反馈

苹果的推送服务器提供两个的反馈系统,实际生产过程中,要对两个反馈系统中的反馈消息都进行检查,不能只用其一。这两个反馈系统是:Feedback Service vs Error-response packets 。

javapns系统已经对这两种反馈系统提供的良好的支持。

(1)Error-response packets

在发送消息之后返回的PushedNotificationresponse成员中,会保存有苹果返回的Error-response packets的信息,若消息推送为发生错误,则该成员为空null。可通过如下方法使用:

 

[codesyntax lang="java5" lines="fancy"]

for (PushedNotification notification : notifications) {  response = notification.getResponse();  if(response != null)  {  response.getMessage();  System.out.println(response.getMessage());  }              if (notification.isSuccessful())   {                    System.out.println("Push notification sent successf ully to: " + notification.getDevice().getToken());              }   else   {                      String invalidToken =   notification.getDevice().getToken();              }  }

[/codesyntax]

(2)feedback service

feedback service会列出apple 服务器认为永远不可达的设备(可能是由于你的client端的应用程序已被删除等原因)。可通过如下方法使用feedback:

 

[codesyntax lang="java5" lines="fancy"]

List<Device> devList = Push.feedback("apns-key+cert-exported-from- mac.p12", "hadoop", false);  for(Device basicDevice: devList)  {  System.out.println(basicDevice.getToken());  System.out.println(basicDevice.getDeviceId());  }

[/codesyntax]

posted @ 2014-05-06 18:08 小马歌 阅读(454) | 评论 (0)编辑 收藏
 

from:http://www.oschina.net/p/atlas/

Atlas是由 Qihoo 360,  Web平台部基础架构团队开发维护的一个基于MySQL协议的数据中间层项目。它在MySQL官方推出的MySQL-Proxy 0.8.2版本的基础上,修改了大量bug,添加了很多功能特性。目前该项目在360公司内部得到了广泛应用,很多MySQL业务已经接入了Atlas平台,每天承载的读写请求数达几十亿条。

主要功能:
* 读写分离
* 从库负载均衡
* IP过滤
* SQL语句黑白名单
* 自动分表

Q & A
-------------------
Q: 是否支持多字符集?
A: 这是我们对原版MySQL-Proxy的第一项改进,符合国情是必须的

Q: 自动读写分离挺好,但有时候我写完马上就想读,万一主从同步延迟怎么办?
A: SQL语句前增加 /*master*/ 就可以将读请求强制发往主库

Q: 主库宕机,读操作受影响么?
A: 在atlas中是不会的! 能问这样的问题, 说明你用过官方的mysql-proxy, 很遗憾官方版本并未解决这个问题

Q: 检测后端DB状态会阻塞正常请求么?
A: 不会, atlas中检测线程是异步进行检测的,即使有db宕机,也不会阻塞主流程。在atlas中没有什么异常会让主流程阻塞! 同上,官方版本也会让你失望

Q: 想下线一台DB, 又不想停掉mysql server, 怎么办?
A: 可以通过管理接口手动上下线后端db, atlas会优先考虑管理员的意愿

Q: 想给集群中增加一台DB, 不想影响线上正常访问可以吗?
A: 通过管理接口可以轻松实现

Q: 相比官方mysql-proxy, atlas还有哪些改进?
A: 这实在是个难以回答的问题,性能,稳定性,可靠性,易维护性,我们做过几十项的改进,下面会尽量列一些较大的改动

VS 官方MySQL-Proxy
-------------------
1. 将主流程中所有Lua代码改为纯C实现,Lua仅用在管理接口
2. 重写网络模型、线程模型
3. 实现了真正意义的连接池
4. 优化了锁机制,性能提高数十倍
......

附名字来源:
    Atlas,希腊神话中双肩撑天的巨人,普罗米修斯的兄弟,最高大强壮的神之一,因反抗宙斯失败而被罚顶天。我们期望这个系统能够脚踏后端DB,为前端应用撑起一片天。

二、配置文件示例
-------------------
[mysql-proxy]    #不需要改
plugins = admin, proxy    #Atlas加载的模块名称,不需要改

admin-username = user    #管理接口的用户名
admin-password = pwd    #管理接口的密码
admin-lua-script = /usr/local/mysql-proxy/lib/mysql-proxy/lua/admin.lua    #实现管理接口的Lua脚本所在路径

proxy-backend-addresses = 127.0.0.1:3306    #Atlas后端连接的MySQL主库的IP和端口,可设置多项,用逗号分隔
proxy-read-only-backend-addresses = 127.0.0.1:3305@2    #Atlas后端连接的MySQL从库的IP和端口,2代表权重,用来作负载均衡,若省略则默认为1,可设置多项,用逗号分隔

daemon = false    #设置Atlas的运行方式,设为true时为守护进程方式,设为false时为前台方式,一般开发调试时设为false,线上运行时设为true
keepalive = false    #设置Atlas的运行方式,设为true时Atlas会启动两个进程,一个为monitor,一个为worker,monitor在worker意外退出后会自动将其重启,设为false时只有worker,没有monitor,一般开发调试时设为false,线上运行时设为true

event-threads = 4    #工作线程数,推荐设置与系统的CPU核数相等
log-level = message    #日志级别,分为message、warning、critical、error、debug五个级别
log-path = /usr/local/mysql-proxy/log    #日志存放的路径
instance = test    #实例名称,用于同一台机器上多个Atlas实例间的区分

proxy-address = 0.0.0.0:1234    #Atlas监听的工作接口IP和端口
admin-address = 0.0.0.0:2345    #Atlas监听的管理接口IP和端口

min-idle-connections = 128    #连接池的最小空闲连接数,可根据业务请求量大小适当调大或调小
tables = person.mt.id.3    #分表设置,此例中person为库名,mt为表名,id为分表字段,3为子表数量,可设置多项,以逗号分隔
pwds = user1:+jKsgB3YAG8=, user2:GS+tr4TPgqc=    #用户名与其对应的加密过的密码,密码使用加密程序encrypt加密,此设置项用于多个用户名同时访问同一个Atlas实例的情况,若只有一个用户名则不需要设置该项
charset = utf8    #默认字符集,若不设置该项,则默认字符集为latin1

三、编译安装
-------------------
依赖:glib(2.32.0以上)、libevent(1.4以上)、Lua(5.1以上)、OpenSSL(0.9.8以上)

./bootstrap.sh    #可能需要修改其中的路径
make
sudo make install

四、启动与停止
-------------------
进入PREFIX/conf目录,编辑instance.conf,此处instance的实际名称应与其中instance设置项相同,其他设置项含义见第二节。

启动:
PREFIX/bin/mysql-proxyd instance start

停止:
PREFIX/bin/mysql-proxyd instance stop

重启:
PREFIX/bin/mysql-proxyd instance restart

查看运行状态:
PREFIX/bin/mysql-proxyd instance status

posted @ 2014-05-06 13:33 小马歌 阅读(245) | 评论 (0)编辑 收藏
 
先来看下MYSQL异步复制的概念: 
  异步复制:MySQL本身支持单向的、异步的复制。异步复制意味着在把数据从一台机器拷贝到另一台机器时有一个延时 – 最重要的是这意味着当应用系统的事务提交已经确认时数据并不能在同一时刻拷贝/应用到从机。通常这个延时是由网络带宽、资源可用性和系统负载决定的。然而,使用正确的组件并且调优,复制能做到接近瞬时完成。 
  
   当主库有更新的时候,主库会把更新操作的SQL写入二进制日志(Bin log),并维护一个二进制日志文件的索引,以便于日志文件轮回(Rotate)。在从库启动异步复制的时候,从库会开启两个I/O线程,其中一个线程连接主库,要求主库把二进制日志的变化部分传给从库,并把传回的日志写入本地磁盘。另一个线程则负责读取本地写入的二进制日志,并在本地执行,以反映出这种变化。较老的版本在复制的时候只启用一个I/O线程,实现这两部分的功能。 




同步复制:同步复制可以定义为数据在同一时刻被提交到一台或多台机器,通常这是通过众所周知的“两阶段提交”做到的。虽然这确实给你在多系统中保持一致性,但也由于增加了额外的消息交换而造成性能下降。 



使用MyISAM或者InnoDB存储引擎的MySQL本身并不支持同步复制,然而有些技术,例如分布式复制块设备(简称DRBD),可以在下层的文件系统提供同步复制,允许第二个MySQL服务器在主服务器丢失的情况下接管(使用第二服务器的复本)。要了解更多信息, 

  MYSQL 5。5开始,支持半自动复制。之前版本的MySQL Replication都是异步(asynchronous)的,主库在执行完一些事务后,是不会管备库的进度的。如果备库不幸落后,而更不幸的是主库此时又出现Crash(例如宕机),这时备库中的数据就是不完整的。简而言之,在主库发生故障的时候,我们无法使用备库来继续提供数据一致的服务了。 

Semisynchronous Replication则一定程度上保证提交的事务已经传给了至少一个备库。 

   
Semi synchronous中,仅仅保证事务的已经传递到备库上,但是并不确保已经在备库上执行完成了。 

此外,还有一种情况会导致主备数据不一致。在某个session中,主库上提交一个事务后,会等待事务传递给至少一个备库,如果在这个等待过程中主库Crash,那么也可能备库和主库不一致,这是很致命的。(在主库恢复后,可以通过参数Rpl_semi_sync_master_no_tx观察) 

     如果主备网络故障或者备库挂了,主库在事务提交后等待10秒(rpl_semi_sync_master_timeout的默认值)后,就会继续。这时,主库就会变回原来的异步状态。 

    MySQL在加载并开启Semi-sync插件后,每一个事务需等待备库接收日志后才返回给客户端。如果做的是小事务,两台主机的延迟又较小,则Semi-sync可以实现在性能很小损失的情况下的零数据丢失。 

   主机Crash时的处理 

备库Crash时,主库会在某次等待超时后,关闭Semi-sync的特性,降级为普通的异步复制,这种情况比较简单。 

主库Crash后,那么可能存在一些事务已经在主库Commit,但是还没有传给任何备库,我们姑且称这类事务为"墙头事务"。"墙头事务"都是没有返回给客户端的,所以发起事务的客户端并不知道这个事务是否已经完成。 

这时,如果客户端不做切换,只是等Crash的主库恢复后,继续在主库进行操作,客户端会发现前面的"墙头事务"都已经完成,可以继续进行后续的业务处理;另一种情况,如果客户端Failover到备库上,客户端会发现前面的“墙头事务”都没有成功,则需要重新做这些事务,然后继续进行后续的业务处理。 

   可以做多个备库,任何一个备库接收完成日志后,主库就可以返回给客户端了。 

网络传输在并发线程较多时,一次可能传输很多日志,事务的平均延迟会降低。 

"墙头事务"在墙头上的时候,是可以被读取的,但是这些事务在上面Failover的场景下,是被认为没有完成的。 

     默认情况下MySQL的复制是异步的,Master上所有的更新操作写入Binlog之后并不确保所有的更新都被复制到Slave之上。异步操作虽然效率高,但是在Master/Slave出现问题的时候,存在很高数据不同步的风险,甚至可能丢失数据。 
MySQL5.5引入半同步复制功能的目的是为了保证在master出问题的时候,至少有一台Slave的数据是完整的。在超时的情况下也可以临时转入异步复制,保障业务的正常使用,直到一台salve追赶上之后,继续切换到半同步模式。 
Master: 
INSTALL PLUGIN rpl_semi_sync_master SONAME ‘semisync_master.so’; 
SET GLOBAL rpl_semi_sync_master_enabled=1; 
SET GLOBAL rpl_semi_sync_master_timeout=1000; (1s, default 10s) 
Slave: 
INSTALL PLUGIN rpl_semi_sync_slave SONAME ‘semisync_slave.so’; 
SET GLOBAL rpl_semi_sync_slave_enabled=1; 
复制心跳(用户检测复制是否中断) 

MySQL5.5提供的新的配置master_heartbeat_period,能够在复制停止工作和出现网络中断的时候帮助我们迅速发现问题。 

启用方法: 

STOP SLAVE; 

CHANGE MASTER TO master_heartbeat_period= milliseconds; 

START SLAVE; 

Slave自动恢复同步 

在MySQL5.5版本之前,MySQL Slave实例在异常终止服务之后,可能导致复制中断,并且relay binlog可能损坏,在MySQL再次启动之后并不能正常恢复复制。在MySQL5.5中这一问题得到了解决,MySQL可以自行丢弃顺坏的而未处理的数据,重新从master上获取源数据,进而回复复制。 

跳过指定复制事件 

在多Master或环形复制的情况下,处于复制链条中间的服务器异常,可以通过 

CHANGE MASTER TO MASTER_HOST=xxx IGNORE_SERVER_IDS=y 

跳过出问题的MySQL实例。 

自动转换字段类型 

MySQL5.1在基于语句的复制下,支持部分的字段转换,但是行级的会报错。MySQL5.5语句和行级复制都已支持。还可以通过 SLAVE_TYPE_CONVERSIONS 控制转换的方向。 
posted @ 2014-05-06 13:21 小马歌 阅读(260) | 评论 (0)编辑 收藏
 

摘要

超级负载均衡旨在为解决服务不断扩展、机器不断增多、机器性能差异等问题,以增强系统的稳定性,自动分配请求压力。算法实现了多个模型和均衡策略,能通过配置实现随机、轮询、一致hash等。同时也能实现跨机房的相关分配。现已经在多个系统中使用。

TAG

负载均衡

内容

现有系统中存在的问题:

1. 慢连接、瞬时访问慢。

场景一:

如果后端新增加机器,cache命中率低,因此响应速度慢,但是能连接上且不超时。如果ui持续访问就会把ui夯住。

场景二:

如果后端模块某一台机器响应较慢。如果前端持续访问就会被夯住。

2. 死机。

场景一:

能断断续续响应请求,不过速度很慢。造成ui夯住。

3. 混合部署。

场景一:

多个模块在同一机器上,项目影响。

4. 机器权重。

场景一:

老机器,性能差;新机器,性能彪悍。因此他们应该承载不同的压力。

5. 跨机房冗余。

场景一:

后端对cache依赖很高的模块,因为采用的是一致hash算法,如果挂掉一台机器,对另外的机器cache命中率冲击很大。因此希望将对这个机器的请求均衡到另外一个机房。

6. php和c使用同样的策略。

现在php和c希望能使用的策略实际上是有很大的一致。为了避免重复开发,php和c希望采用同样的负载均衡库。

要解决的问题:

设计思路:

1. 根据均衡策略计算出的均衡值对Server进行逆序排序。

2. 负载选择。对步骤1排序后的Server按以下顺序进行选择:

a、按连接失败概率进行选择。

注:横轴代表失败次数,纵轴代表选择的概率。

Cconn:一段区间内失败次数

f(Cconn):连接概率,取值范围在(0,100]

b、按健康状态选择。

整个模型基于服务处理时间的收敛性。

分析:

1) 如果机器状态良好,则平均处理时间会保持在一个稳定水平;即使是小波动,也会较快平稳在一个状态。

2) 如果机器开始出现问题,处理时间会开始增长。如果增长持续超过一段时间,则说明有可能会影响服务;如果一段时间后稳定了,说明对请求没有太多影响。

f(healthy):机器健康状态,取值范围[0,1]

select(healthy):机器选择概率,取值范围[R,1]

c、如果所有机器都没选中,则随机选择一台机器进行服务。

3. 机器流量均分。

不同的机器处理能力是不一样的。当按照步骤2选择了某台机器,需要将其他处理时间为他的1/T(T>=2)的机器也选取出来,将部分压力分给对应的机器。

设k台机器的处理时间分别是t1, t2,…,tk, 选中的机器id=i,比该机器处理能力高的机器时间分别为p1,p2,..,pr, (其中pj × T <= ti)。设一段时间总访问量为Y,每台机器理论上的访问量应该为Vg=Y/k。而实际的Vr=Y/(ti * (1/t1+1/t2+…+1/tk))。则应该分出Vg-Vr的流量给pj。pj的流量比例为1/p1:1/p2:…:1/pr

算法设计:

A、均衡算法

1. 一致hash算法。

将每个server的ip和port加上balance_key三者做字符串拼接后,做md5签名。

value(server) = md5(server_ip + server_port + balance_key)

2. 随机算法。

value(server) = random();

3. 轮询算法。

value(server) =((server.id – (rounds % server_count)) + server_count) % server_count

4. 多个选一算法。

rank初始化为1, 如果默认的server失败,则rank+1

value(server) =((server.id – (rank % server_count)) + server_count) % server_count

B、负载算法

1. 连接状态算法。

a、对每一个server开辟一个状态队列。bool queue[K] 用来统计失败次数。每次有坏状态进队,计数加一。如果有坏状态出队,则计数减一。

b、按照f(Cconn)公式计算出选择概率。

c、利用rand()%100是否在[0,f(Cconn)]来决定是否选择该机器。

2. 健康状态算法。

a、每台机器维持一个一秒钟内的处理时间T和次数C。

b、当一秒过去以后,将T、C计算为平均处理时间R。

c、每M秒,统计每台机器最近一段时间的平均处理时间, 按照公式select(healthy)算出选择概率。

d、利用rand()%100是否在[0, select(healthy)*100]来决定是否选择该机器。

C、流量均分

按照策略选出满足要求的机器,按照流量均分公式进行流量分配。

分配时按照balance_key+server方式和random()来分配机器, 尽量保证请求落在同一台机器。

by wangbo

posted @ 2014-05-06 10:22 小马歌 阅读(214) | 评论 (0)编辑 收藏
 
     摘要: 2005年,我开始和朋友们开始拉活儿做网站,当时第一个网站是在linux上用jsp搭建的,到后来逐步的引入了多种框架,如webwork、hibernate等。在到后来,进入公司,开始用c/c++,做分布式计算和存储。(到那时才解开了我的一个疑惑:C语言除了用来写HelloWorld,还能干嘛?^_^)。总而言之,网站根据不同的需求,不同的请求压力,不同的业务模型,需要不同的架构来给予支持。我从我的...  阅读全文
posted @ 2014-05-06 10:07 小马歌 阅读(463) | 评论 (1)编辑 收藏
仅列出标题
共95页: First 上一页 20 21 22 23 24 25 26 27 28 下一页 Last