我的家园

我的家园

背景:

      容错重试机制,是系统的一种自我调节,是对系统鲁棒性的一种考量,在很多后台程序中都经常涉及,特别是基于   task的系统中,往往这类系统要处理的事情很多,一个task的完成时间比较长,涉及的环境也比较复杂,出现很多临时错误的概率较大,比如IO读取出错,网络临时不可用等等,同时这种系统往往对响应时间要求不是很高,更加看重系统的稳定和鲁棒性;另外对于依赖于第三方的远程调用,或者说其他资源的获取,也常常涉及要考虑容错重试;让程序在不人工干预的情况下,处理更多的场景;

 

适用场景:

    从上面的描述我们可以总结它的适用场景:

    1、系统对响应时间不太关心;

    2、系统对鲁棒性要求较高;

    3、系统涉及远程调用,资源获取;

 

设计目标:

    1、通用性:希望作为一个独立的模块,为需要的程序提高方便的使用;

    2、轻量级:希望是一个轻量级的实现,不对使用的系统有太强的侵入性;

    3、细粒度:重试以方法为单位,而不用重试整个task;

 

概要设计:

     要保持重试模块的独立性,不侵入到原有的系统中,首先的面临的问题是,需要重试的数据从何而来,我们很容易想到DB,那么整个系统应该分为两大块,一是client,负责将需要重试的数据放入DB,这个也就是各个应用程序要做的事情;第二是server,负责将DB中数据取出,做相应的处理;大家可以看到这是一个典型的生产者-消费者模式;

 

 


 

这里区分应用程序和容错服务器,只是概念上的,因为容错服务器事实上必须依赖于引用程序(需要执行部分引用程序);所以在实际应用中,一般在一台虚拟机上,如果是应用本身在多台服务器上的话,可以通过配置项决定是否启用容错重试功能;

 

详细设计-UML图:

 

 

1、系统的UML类图


 

2、类图说明:

 

   1)、从上图可以看出系统主要可以分为三大块的内容,第一块是TaskExecutor 类以上部分,通过spring的TaskExecutor与下面的模块弱依赖,这一块主要负责从数据库去取出需要处理的、并且应该自己(loadbalance)处理的数据;讲数据封装在一个Notify中,交给NotifyServerServices处理;

   2)、第二块为TaskExecutor和RetryHandlerStrategy之前的部分;这一部分主要关注容错重试处理后的工作;如:成功则删除DB中的记录,否则,负责判断是否还需要重试,间隔时间等;

   3)、第三块为每个系统自己实现的各种RetryHandlerStrategy类;他们负责真正的重试工作;这里所有的类,可以看成是一个sever,对于client端来说,是非常简单的,因为它只需要讲数据插入到数据库,可以通过一个clientService提供一个createNotify方法,供应用调用;

 

详细设计-数据库设计:

 

1、一共需要两张表,task、task_history;两张字段完全一样:

 

字段 类型 描述 可空 默认值
task_id varchar2 PK、UUID NOT
create_time DATE 创建时间 NOT
handle_time DATE 任务待执行的时间 NOT
task_handler varchar2 任务处理器类型 NOT
load_balance_num number 负责均衡值 NOT 0
task_parameter varchar2 执行任务的参数,json格式 YES
retry_count number 已经重试的次数 NOT 0
retry_reason varchar2 失败的原因 YES

 

说明:

1)、load_balance_num:当使用集群的时候可以考虑,可以通过上图的LoadBalanceNumStrategy类来控制他的值,比如平均分布,比如按照机器的性能使用权重分布;

2)、task_parameter:这个用来保存重试的参数,可以约定为一种格式,自己方便解析就好了,比如json、xml等;

3)、retry_count:当系统要求我最多重试5次的时候可以使用这个参数;当5次后还是失败,直接移动要历史表中,人工处理;

4)、handle_time:当然系统要求第二次重试的时候时间间隔30分钟的时候使用;当处理失败的时候更新这个时间;

5)、task_handler:任务处理器类型,比如上面类图中的RetryHandler,通过spring,得到RetryHandler的实例来做处理;

 

关键类伪代码:

 

从上面的设计图可以发现主要只有两个类,即是:NotifyScheduleMainExecutor和NotifyServerServiceImpl,其他的都是一些策略类;这里伪代码描述这个两个类的逻辑,策略类可以自己选择不同的实现;

 

1、NotifyScheduleMainExecutor:

 

 

if(NotifyHandlerStrategy != null){
  获取本机待处理的handler的列表;
}
if(LoadBalanceNumStrategy != null){
  获取本机待处理的loand_balance_num的列表
}
if(NotifyMaxNumPerJobStrategy != null){
  获取本机每次调度的处理的最大的notify记录数
}
执行轮询语句,提取待处理的任务的列表
for(对每一个notify){
  if(NotifyIdCacheStrategy已经包含该ID){
     说明线程已经在执行,  
   }else{
      放入cache;
      TaskExecutor.excute(new notifyExecutor(notify,notifyServerService))
   }
}

  分析一下这里的查询sql:

 

基础的sql =  select * from notify where handle_time <=sysdate ;
if(handlerlist 不为空)
{
  sql+=sql+ and hander in (handlerlist)
} 
if(loadbalancenumlist不为空)
{
  sql+=sql+ and load_balance_num in (loadbalancenumlist)
} 
if(maxnum不为空)
{
  sql+=sql+ and rownum<=maxnum
} 

 

2、 NotifyServerServiceImpl

 

处理结果=success;
errormessage=null;
根据notify的task_handler得到处理的handler;
try{
    handler.invoke(notify.getparameter())返回notifyHandlerResult
    if(notifyHandlerResult == null){
       throw exception;
    }else if(notifyHandlerResult==失败){
        处理结果=fail;
        errormessage=原因;
    }
}cath(){
    使用NotifyHandlerExceptionStrategy处理;返回notifyHandlerResult
    if(notifyHandlerResult == null){
         处理结果=exception;
        errormessage=原因;
    }else if(notifyHandlerResult==失败){
        处理结果=fail;
        errormessage=原因;
    }
}
try{
    if(notifyHandlerResult=success){
        清除DB的数据;
    }else{
        得到已经重试的次数oldRetryCount;
        得到上一次执行的时间oldExecuteTime;
        根据NotifyRetryStrategy类返回重试策略的结果 notifyRetryResult;
        if(需要重试){
             重试次数+1;
             计算下一次时间;
             设置上一次失败原因;
             更新DB;
        }else{
             移动到历史表中;
        }    
   }
}cath{
   
}finally{
    cache的操作;
}

 

使用及client配置:

    现在假设有一个应用需要使用容错机制,需要的操作:

1、引入二方库;

2、在需要容错的方法里面调用clientService提供的createNotify方法,插入项目的数据;

3、编写重试处理类;必须继承RetryHandlerStrategy接口;

4、编写配置文件:整个系统依赖spring,可以分为三个配置文件,一个是client,一个是server,另外是handler,下面给出一个例子:

 

 

client.xml

 

<?xml version="1.0" encoding="UTF-8"?>
<beans>
	<bean id="notifyClientService" class="com.*.service.impl.NotifyClientServiceImpl">
		<!-- notify.load_balance_num字段值生成、以及调度时where条件中取值的策略实现类,可自行扩展 -->
		<!-- 当有多台notify服务器时才有用,用于平衡各台server间的压力;一般不用配置 -->
		<property name="loadBalanceNumStrategy" ref="alternateLoadBalanceNumStrategy" />
		<property name="notifyDao" ref="notifyDao" />
	</bean>

	<!-- 生成0、1交替的LOAD_BALANCE_NUM值,适用于2台Notify服务器 -->
	<bean id="alternateLoadBalanceNumStrategy" class="com.*.strategy.impl.AlternateLoadBalanceNumStrategyImpl">
		<!-- 主机名对应的LOAD_BALANCE_NUM列表,多个用,隔开 -->
		<property name="lbnMapByHostname">
			<map>
				<entry key=“dev1" value="0"/>
				<entry key="dev2" value="1"/>
			</map>
		</property>
	</bean>
	
	<bean id="notifyDao" class="com.*.dao.impl.NotifyDaoImpl">
		<!-- ref可以修改为自己应用中已经配置过的sqlMapClientTemplate bean,要求内部已经嵌入datasource -->
		<property name="sqlMapClientTemplate" ref="sqlMapTemplate" />
		<property name="namespace" value="com.*.notify" />
	</bean>
</beans>
 

server.xml

 

 

<?xml version="1.0" encoding="UTF-8"?>
<beans >
	<!-- 该文件是Notify Server运行时需要的配置文件,加载了该文件,调度就会自动执行; -->
	<!-- 若Client/Server都在同一个应用中,则请在部署时区别加载该文件,否则会导致多台服务器执行相同的调度任务 -->
	
	<!-- 添加对Notify Client的配置支持 -->
	<import resource="billing-spring-notify-client.xml" />
	<!-- end for Notify Client -->
	
	<!-- 任务从此处开始加载 -->
	<bean id="notifySpringScheduledExecutorFactoryBean" class="org.springframework.scheduling.concurrent.ScheduledExecutorFactoryBean">
		<property name="scheduledExecutorTasks">
			<list>
				<ref bean="retryScheduledExecutorTask" />
			</list>
		</property>
	</bean>

	<!-- 容错任务 -->
	<bean id="retryScheduledExecutorTask" class="org.springframework.scheduling.concurrent.ScheduledExecutorTask">
		<property name="runnable" ref="retryScheduledMainExecutor" />
		<!-- 初次执行任务delay时间,单位为ms,默认值为0,代表首次加载任务时立即执行;比如1min -->
		<property name="delay" value="5000" />
		<!-- 间隔时间,单位为ms,默认值为0,代表任务只执行一次;比如2min -->
		<property name="period" value="1000" />
		<!-- 是否采用fixedRate方式进行任务调度,默认为false,即采用fixedDelay方式 -->
		<!-- fixedRate:定时间隔执行,不管上次任务是否已执行完毕;fixedDelay:每次任务执行完毕之后delay固定的时间 -->
		<property name="fixedRate" value="true" />
	</bean>
	
	<!-- 容错主线程 -->
	<bean id="retryScheduledMainExecutor" class="com.*.NotifyScheduledMainExecutor">
		<!-- 针对Notify服务端的Service,用于更新Notify重试信息等 -->
		<property name="notifyServerService" ref="notifyServerService" />
		<!-- notify.notifyId缓存策略实现类,可自行扩展 -->
		<property name="notifyIdCacheStrategy" ref="defaultNotifyIdCacheStrategy" />
		<!-- notify.load_balance_num字段值生成、以及调度时where条件中取值的策略实现类,可自行扩展 -->
		<!-- 当有多台notify服务器时才有用,用于平衡各台server间的压力;一般不用配置 -->
		<!-- <property name="loadBalanceNumStrategy" ref="alternateLoadBalanceNumStrategy" /> -->
		<!-- notify.handler字段值在调度时where条件中取值的策略实现类,可自行扩展 -->
		<!-- 当有多台notify服务器时才有用,用于表明某台server可执行哪些handler;一般不用配置 -->
		<property name="notifyHandlerStrategy" ref="handerNameByRetry" />
		<!-- 当有多台notify服务器时才有用,用于设置某台server调度时每次读取的Notify最大数,用于覆盖maxNum;一般不用配置 -->
		<!-- <property name="notifyMaxNumPerJobStrategy" ref="defaultNotifyMaxNumPerJobStrategy" /> -->
		<!-- 用于并发的线程池 -->
		<property name="notifyTaskExecutor" ref="syncTaskExecutor" />
		<!-- 每次调度读取的Notify最大记录数,默认为1000 -->
		<property name="maxNum" value="1000" />
		<property name="notifyDao" ref="notifyDao" />
	</bean>
	
	<!-- 同步处理 -->
	<bean id="syncTaskExecutor" class="org.springframework.core.task.SyncTaskExecutor">
	</bean>
	
	<bean id="notifyServerService" class="com.*.impl.NotifyServerServiceImpl">
		<!-- 针对任务执行失败后Notify如何重试的策略实现类,可自行扩展 -->
		<property name="notifyRetryStrategy" ref="defaultNotifyRetryStrategy" />
		<!-- 针对任务执行失败后异常处理策略实现类,可自行扩展 -->
		<!-- 默认不对异常进行补救,具体handler实现类中若返回NULL或抛出异常,则均按异常处理,直接将Notify记录迁移到历史表中,不进行重试; -->
		<!-- <property name="notifyHandlerExceptionStrategy" ref="defaultNotifyHandlerExceptionStrategy" /> -->
		<!-- 描述见notifyScheduledMainExecutor -->
		<property name="notifyIdCacheStrategy" ref="defaultNotifyIdCacheStrategy" />
		<!-- 事务模板,需保证能够找到对应的bean -->
		<property name="transactionTemplate" ref="transactionTemplate" />
		<property name="notifyDao" ref="notifyDao" />
	</bean>

	<!-- 以下几个default*的bean为系统提供的默认实现,若有需要,可自行扩展,但必须实现相应接口 -->
	<bean id="defaultNotifyIdCacheStrategy" class="com.*.DefaultNotifyIdCacheStrategyImpl" />
	<bean id="defaultNotifyHandlerExceptionStrategy" class="com*.impl.DefaultNotifyHandlerExceptionStrategyImpl" />
	<!--容错handler-->
	<bean id="handerNameByRetry" class="com.*.asyn.HandlerNameFilter">
		<property name="handerNames">
			<list>
				<value>retryHandler</value>
			</list>
		</property>
	</bean>
	
	<bean id="defaultNotifyMaxNumPerJobStrategy" class="com.*.DefaultNotifyMaxNumPerJobStrategyImpl">
		<!-- 主机名对应的每次调度读取Notify记录的最大值 -->
		<property name="maxNumPerJobMapByHostname">
			<map>
				<entry key="dev1" value="500"/>
				<entry key="dev2" value="800"/>
			</map>
		</property>
	</bean>
 	<bean id="defaultNotifyRetryStrategy" class="com.*.DefaultNotifyRetryStrategyImpl">
		<!-- 任务执行失败之后每次重试的间隔ms数 -->
		<property name="retryIntervals">
			<list>
				<!-- 依次为第一次间隔1min,第二次5min,第三次10min,第四次30min,第五次1h -->
				<value>60000</value>
				<value>300000</value>
				<value>600000</value>
				<value>1800000</value>
				<value>3600000</value>
			</list>
		</property>
	</bean>
	<!-- end default* -->
</beans>

handler.xml

 

<?xml version="1.0" encoding="UTF-8"?>
<beans >
	
	<bean id="retryHandler" class="com.*asyn.RetryHandler" />
</beans>
 

总结:

现在回去看系统的目标实现情况:

1、通用性:整个模块对应用系统的侵入性是很小了,可以打包为一个二方库,在公司范围的使用;对于应用来说只增加几个配置文件,在需要重试的地方,通过通过接口,完全于模块解耦;

2、轻量级:很明显,模块只是依赖spring,

3、细粒度:在上面的设计中,并没有特别强调细粒度,是因为对于选择多大粒度完全由应用自己决定,应用在自己的重试实现类和方法之间平衡,对模块来讲,没有任何限制;

 

 






只有注册用户登录后才能发表评论。


网站导航: