背景:
容错重试机制,是系统的一种自我调节,是对系统鲁棒性的一种考量,在很多后台程序中都经常涉及,特别是基于 task的系统中,往往这类系统要处理的事情很多,一个task的完成时间比较长,涉及的环境也比较复杂,出现很多临时错误的概率较大,比如IO读取出错,网络临时不可用等等,同时这种系统往往对响应时间要求不是很高,更加看重系统的稳定和鲁棒性;另外对于依赖于第三方的远程调用,或者说其他资源的获取,也常常涉及要考虑容错重试;让程序在不人工干预的情况下,处理更多的场景;
适用场景:
从上面的描述我们可以总结它的适用场景:
1、系统对响应时间不太关心;
2、系统对鲁棒性要求较高;
3、系统涉及远程调用,资源获取;
设计目标:
1、通用性:希望作为一个独立的模块,为需要的程序提高方便的使用;
2、轻量级:希望是一个轻量级的实现,不对使用的系统有太强的侵入性;
3、细粒度:重试以方法为单位,而不用重试整个task;
概要设计:
要保持重试模块的独立性,不侵入到原有的系统中,首先的面临的问题是,需要重试的数据从何而来,我们很容易想到DB,那么整个系统应该分为两大块,一是client,负责将需要重试的数据放入DB,这个也就是各个应用程序要做的事情;第二是server,负责将DB中数据取出,做相应的处理;大家可以看到这是一个典型的生产者-消费者模式;
这里区分应用程序和容错服务器,只是概念上的,因为容错服务器事实上必须依赖于引用程序(需要执行部分引用程序);所以在实际应用中,一般在一台虚拟机上,如果是应用本身在多台服务器上的话,可以通过配置项决定是否启用容错重试功能;
详细设计-UML图:
1、系统的UML类图
2、类图说明:
1)、从上图可以看出系统主要可以分为三大块的内容,第一块是TaskExecutor 类以上部分,通过spring的TaskExecutor与下面的模块弱依赖,这一块主要负责从数据库去取出需要处理的、并且应该自己(loadbalance)处理的数据;讲数据封装在一个Notify中,交给NotifyServerServices处理;
2)、第二块为TaskExecutor和RetryHandlerStrategy之前的部分;这一部分主要关注容错重试处理后的工作;如:成功则删除DB中的记录,否则,负责判断是否还需要重试,间隔时间等;
3)、第三块为每个系统自己实现的各种RetryHandlerStrategy类;他们负责真正的重试工作;这里所有的类,可以看成是一个sever,对于client端来说,是非常简单的,因为它只需要讲数据插入到数据库,可以通过一个clientService提供一个createNotify方法,供应用调用;
详细设计-数据库设计:
1、一共需要两张表,task、task_history;两张字段完全一样:
字段 |
类型 |
描述 |
可空 |
默认值 |
task_id |
varchar2 |
PK、UUID |
NOT |
|
create_time |
DATE |
创建时间 |
NOT |
|
handle_time |
DATE |
任务待执行的时间 |
NOT |
|
task_handler |
varchar2 |
任务处理器类型 |
NOT |
|
load_balance_num |
number |
负责均衡值 |
NOT |
0 |
task_parameter |
varchar2 |
执行任务的参数,json格式 |
YES |
|
retry_count |
number |
已经重试的次数 |
NOT |
0 |
retry_reason |
varchar2 |
失败的原因 |
YES |
|
说明:
1)、load_balance_num:当使用集群的时候可以考虑,可以通过上图的LoadBalanceNumStrategy类来控制他的值,比如平均分布,比如按照机器的性能使用权重分布;
2)、task_parameter:这个用来保存重试的参数,可以约定为一种格式,自己方便解析就好了,比如json、xml等;
3)、retry_count:当系统要求我最多重试5次的时候可以使用这个参数;当5次后还是失败,直接移动要历史表中,人工处理;
4)、handle_time:当然系统要求第二次重试的时候时间间隔30分钟的时候使用;当处理失败的时候更新这个时间;
5)、task_handler:任务处理器类型,比如上面类图中的RetryHandler,通过spring,得到RetryHandler的实例来做处理;
关键类伪代码:
从上面的设计图可以发现主要只有两个类,即是:NotifyScheduleMainExecutor和NotifyServerServiceImpl,其他的都是一些策略类;这里伪代码描述这个两个类的逻辑,策略类可以自己选择不同的实现;
1、NotifyScheduleMainExecutor:
if(NotifyHandlerStrategy != null){
获取本机待处理的handler的列表;
}
if(LoadBalanceNumStrategy != null){
获取本机待处理的loand_balance_num的列表
}
if(NotifyMaxNumPerJobStrategy != null){
获取本机每次调度的处理的最大的notify记录数
}
执行轮询语句,提取待处理的任务的列表
for(对每一个notify){
if(NotifyIdCacheStrategy已经包含该ID){
说明线程已经在执行,
}else{
放入cache;
TaskExecutor.excute(new notifyExecutor(notify,notifyServerService))
}
}
分析一下这里的查询sql:
基础的sql = select * from notify where handle_time <=sysdate ;
if(handlerlist 不为空)
{
sql+=sql+ and hander in (handlerlist)
}
if(loadbalancenumlist不为空)
{
sql+=sql+ and load_balance_num in (loadbalancenumlist)
}
if(maxnum不为空)
{
sql+=sql+ and rownum<=maxnum
}
2、 NotifyServerServiceImpl
处理结果=success;
errormessage=null;
根据notify的task_handler得到处理的handler;
try{
handler.invoke(notify.getparameter())返回notifyHandlerResult
if(notifyHandlerResult == null){
throw exception;
}else if(notifyHandlerResult==失败){
处理结果=fail;
errormessage=原因;
}
}cath(){
使用NotifyHandlerExceptionStrategy处理;返回notifyHandlerResult
if(notifyHandlerResult == null){
处理结果=exception;
errormessage=原因;
}else if(notifyHandlerResult==失败){
处理结果=fail;
errormessage=原因;
}
}
try{
if(notifyHandlerResult=success){
清除DB的数据;
}else{
得到已经重试的次数oldRetryCount;
得到上一次执行的时间oldExecuteTime;
根据NotifyRetryStrategy类返回重试策略的结果 notifyRetryResult;
if(需要重试){
重试次数+1;
计算下一次时间;
设置上一次失败原因;
更新DB;
}else{
移动到历史表中;
}
}
}cath{
}finally{
cache的操作;
}
使用及client配置:
现在假设有一个应用需要使用容错机制,需要的操作:
1、引入二方库;
2、在需要容错的方法里面调用clientService提供的createNotify方法,插入项目的数据;
3、编写重试处理类;必须继承RetryHandlerStrategy接口;
4、编写配置文件:整个系统依赖spring,可以分为三个配置文件,一个是client,一个是server,另外是handler,下面给出一个例子:
client.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans>
<bean id="notifyClientService" class="com.*.service.impl.NotifyClientServiceImpl">
<!-- notify.load_balance_num字段值生成、以及调度时where条件中取值的策略实现类,可自行扩展 -->
<!-- 当有多台notify服务器时才有用,用于平衡各台server间的压力;一般不用配置 -->
<property name="loadBalanceNumStrategy" ref="alternateLoadBalanceNumStrategy" />
<property name="notifyDao" ref="notifyDao" />
</bean>
<!-- 生成0、1交替的LOAD_BALANCE_NUM值,适用于2台Notify服务器 -->
<bean id="alternateLoadBalanceNumStrategy" class="com.*.strategy.impl.AlternateLoadBalanceNumStrategyImpl">
<!-- 主机名对应的LOAD_BALANCE_NUM列表,多个用,隔开 -->
<property name="lbnMapByHostname">
<map>
<entry key=“dev1" value="0"/>
<entry key="dev2" value="1"/>
</map>
</property>
</bean>
<bean id="notifyDao" class="com.*.dao.impl.NotifyDaoImpl">
<!-- ref可以修改为自己应用中已经配置过的sqlMapClientTemplate bean,要求内部已经嵌入datasource -->
<property name="sqlMapClientTemplate" ref="sqlMapTemplate" />
<property name="namespace" value="com.*.notify" />
</bean>
</beans>
server.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans >
<!-- 该文件是Notify Server运行时需要的配置文件,加载了该文件,调度就会自动执行; -->
<!-- 若Client/Server都在同一个应用中,则请在部署时区别加载该文件,否则会导致多台服务器执行相同的调度任务 -->
<!-- 添加对Notify Client的配置支持 -->
<import resource="billing-spring-notify-client.xml" />
<!-- end for Notify Client -->
<!-- 任务从此处开始加载 -->
<bean id="notifySpringScheduledExecutorFactoryBean" class="org.springframework.scheduling.concurrent.ScheduledExecutorFactoryBean">
<property name="scheduledExecutorTasks">
<list>
<ref bean="retryScheduledExecutorTask" />
</list>
</property>
</bean>
<!-- 容错任务 -->
<bean id="retryScheduledExecutorTask" class="org.springframework.scheduling.concurrent.ScheduledExecutorTask">
<property name="runnable" ref="retryScheduledMainExecutor" />
<!-- 初次执行任务delay时间,单位为ms,默认值为0,代表首次加载任务时立即执行;比如1min -->
<property name="delay" value="5000" />
<!-- 间隔时间,单位为ms,默认值为0,代表任务只执行一次;比如2min -->
<property name="period" value="1000" />
<!-- 是否采用fixedRate方式进行任务调度,默认为false,即采用fixedDelay方式 -->
<!-- fixedRate:定时间隔执行,不管上次任务是否已执行完毕;fixedDelay:每次任务执行完毕之后delay固定的时间 -->
<property name="fixedRate" value="true" />
</bean>
<!-- 容错主线程 -->
<bean id="retryScheduledMainExecutor" class="com.*.NotifyScheduledMainExecutor">
<!-- 针对Notify服务端的Service,用于更新Notify重试信息等 -->
<property name="notifyServerService" ref="notifyServerService" />
<!-- notify.notifyId缓存策略实现类,可自行扩展 -->
<property name="notifyIdCacheStrategy" ref="defaultNotifyIdCacheStrategy" />
<!-- notify.load_balance_num字段值生成、以及调度时where条件中取值的策略实现类,可自行扩展 -->
<!-- 当有多台notify服务器时才有用,用于平衡各台server间的压力;一般不用配置 -->
<!-- <property name="loadBalanceNumStrategy" ref="alternateLoadBalanceNumStrategy" /> -->
<!-- notify.handler字段值在调度时where条件中取值的策略实现类,可自行扩展 -->
<!-- 当有多台notify服务器时才有用,用于表明某台server可执行哪些handler;一般不用配置 -->
<property name="notifyHandlerStrategy" ref="handerNameByRetry" />
<!-- 当有多台notify服务器时才有用,用于设置某台server调度时每次读取的Notify最大数,用于覆盖maxNum;一般不用配置 -->
<!-- <property name="notifyMaxNumPerJobStrategy" ref="defaultNotifyMaxNumPerJobStrategy" /> -->
<!-- 用于并发的线程池 -->
<property name="notifyTaskExecutor" ref="syncTaskExecutor" />
<!-- 每次调度读取的Notify最大记录数,默认为1000 -->
<property name="maxNum" value="1000" />
<property name="notifyDao" ref="notifyDao" />
</bean>
<!-- 同步处理 -->
<bean id="syncTaskExecutor" class="org.springframework.core.task.SyncTaskExecutor">
</bean>
<bean id="notifyServerService" class="com.*.impl.NotifyServerServiceImpl">
<!-- 针对任务执行失败后Notify如何重试的策略实现类,可自行扩展 -->
<property name="notifyRetryStrategy" ref="defaultNotifyRetryStrategy" />
<!-- 针对任务执行失败后异常处理策略实现类,可自行扩展 -->
<!-- 默认不对异常进行补救,具体handler实现类中若返回NULL或抛出异常,则均按异常处理,直接将Notify记录迁移到历史表中,不进行重试; -->
<!-- <property name="notifyHandlerExceptionStrategy" ref="defaultNotifyHandlerExceptionStrategy" /> -->
<!-- 描述见notifyScheduledMainExecutor -->
<property name="notifyIdCacheStrategy" ref="defaultNotifyIdCacheStrategy" />
<!-- 事务模板,需保证能够找到对应的bean -->
<property name="transactionTemplate" ref="transactionTemplate" />
<property name="notifyDao" ref="notifyDao" />
</bean>
<!-- 以下几个default*的bean为系统提供的默认实现,若有需要,可自行扩展,但必须实现相应接口 -->
<bean id="defaultNotifyIdCacheStrategy" class="com.*.DefaultNotifyIdCacheStrategyImpl" />
<bean id="defaultNotifyHandlerExceptionStrategy" class="com*.impl.DefaultNotifyHandlerExceptionStrategyImpl" />
<!--容错handler-->
<bean id="handerNameByRetry" class="com.*.asyn.HandlerNameFilter">
<property name="handerNames">
<list>
<value>retryHandler</value>
</list>
</property>
</bean>
<bean id="defaultNotifyMaxNumPerJobStrategy" class="com.*.DefaultNotifyMaxNumPerJobStrategyImpl">
<!-- 主机名对应的每次调度读取Notify记录的最大值 -->
<property name="maxNumPerJobMapByHostname">
<map>
<entry key="dev1" value="500"/>
<entry key="dev2" value="800"/>
</map>
</property>
</bean>
<bean id="defaultNotifyRetryStrategy" class="com.*.DefaultNotifyRetryStrategyImpl">
<!-- 任务执行失败之后每次重试的间隔ms数 -->
<property name="retryIntervals">
<list>
<!-- 依次为第一次间隔1min,第二次5min,第三次10min,第四次30min,第五次1h -->
<value>60000</value>
<value>300000</value>
<value>600000</value>
<value>1800000</value>
<value>3600000</value>
</list>
</property>
</bean>
<!-- end default* -->
</beans>
handler.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans >
<bean id="retryHandler" class="com.*asyn.RetryHandler" />
</beans>
总结:
现在回去看系统的目标实现情况:
1、通用性:整个模块对应用系统的侵入性是很小了,可以打包为一个二方库,在公司范围的使用;对于应用来说只增加几个配置文件,在需要重试的地方,通过通过接口,完全于模块解耦;
2、轻量级:很明显,模块只是依赖spring,
3、细粒度:在上面的设计中,并没有特别强调细粒度,是因为对于选择多大粒度完全由应用自己决定,应用在自己的重试实现类和方法之间平衡,对模块来讲,没有任何限制;