qileilove

blog已经转移至github,大家请访问 http://qaseven.github.io/

谈disruptor的单线程数据库操作

对远程数据库的操作,采用disruptor能够很好解决死锁,
  首先是定义一个抽象类,实现Runnable接口
public abstract class  Task implements Runnable  {
public Task(){}
}
public class TaskEvent {
private Task tk;
public Task getTask() {
return tk;
}
public void setTask(Task tk) {
this.tk = tk;
}
public final static EventFactory<TaskEvent> EVENT_FACTORY = new EventFactory<TaskEvent>() {
public TaskEvent newInstance() {
return new TaskEvent();
}
};
public class TaskEventHandler implements EventHandler<TaskEvent> {
//  执行接口函数onEvent执行
public void onEvent(TaskEvent event, long sequence,
boolean endOfBatch) throws Exception {
event.getTask().run();
}
}
}
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import com.zhenhai.bonecp.CustomThreadFactory;
import com.zhenhai.disruptor.BatchEventProcessor;
import com.zhenhai.disruptor.RingBuffer;
import com.zhenhai.disruptor.SequenceBarrier;
import com.zhenhai.disruptor.YieldingWaitStrategy;
import com.zhenhai.disruptor.dsl.ProducerType;
/**
*     使用方法
DisruptorHelper.initAndStart();
Task tt=new Taska();
DisruptorHelper.produce(tt);
DisruptorHelper.shutdown();
*
*
*/
public class DisruptorHelper {
/**
* ringbuffer容量,最好是2的N次方
*/
private static final int BUFFER_SIZE = 1024 * 1;
private static int group=2;
private RingBuffer<TaskEvent> ringBuffer[];
private SequenceBarrier sequenceBarrier[];
private TaskEventHandler handler[];
private BatchEventProcessor<TaskEvent> batchEventProcessor[];
private  static DisruptorHelper instance;
private static boolean inited = false;
private static ScheduledExecutorService taskTimer=null;
//JDK 创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。
private    ExecutorService execute[];
//启动监视线程
static {
System.out.println("init DisruptorHelper!!!!!!!!!!!!!!!!!");
instance = new DisruptorHelper();
instance.init();
inited = true;
System.out.println("init DisruptorHelper end!!!!!!!!!!!!!!!!!");
}
**
* 静态类
* @return
*/
private DisruptorHelper(){ }
/**
* 初始化
*/
private void init(){
execute=new ExecutorService[group];
ringBuffer=new RingBuffer[group];
sequenceBarrier=new SequenceBarrier[group];
handler=new TaskEventHandler[group];
batchEventProcessor=new BatchEventProcessor[group];
////////////////定时执行////////////////
//初始化ringbuffer,存放Event
for(int i=0;i<group;i++){
ringBuffer[i] = RingBuffer.create(ProducerType.SINGLE, TaskEvent.EVENT_FACTORY, BUFFER_SIZE, new YieldingWaitStrategy());
sequenceBarrier[i] = ringBuffer[i].newBarrier();
handler[i] = new TaskEventHandler();
batchEventProcessor[i] = new BatchEventProcessor<TaskEvent>(ringBuffer[i], sequenceBarrier[i], handler[i]);
ringBuffer[i].addGatingSequences(batchEventProcessor[i].getSequence());
execute[i]= Executors.newSingleThreadExecutor();
execute[i].submit(instance.batchEventProcessor[i]);
}
this.taskTimer =  Executors.newScheduledThreadPool(10, new CustomThreadFactory("DisruptorHelper-scheduler", true));
inited = true;
}
/**
* 执行定时器
* @param tk
*/
private void produce(int index,Task tk){
//System.out.println("index:="+index);
if(index<0||index>=group) {
System.out.println("out of group index:="+index);
return;
}
// if capacity less than 10%, don't use ringbuffer anymore
System.out.println("capacity:="+ringBuffer[index].remainingCapacity());
if(ringBuffer[index].remainingCapacity() < BUFFER_SIZE * 0.1) {
System.out.println("disruptor:ringbuffer avaliable capacity is less than 10 %");
// do something
}else {
long sequence = ringBuffer[index].next();
//将状态报告存入ringBuffer的该序列号中
ringBuffer[index].get(sequence).setTask(tk);
//通知消费者该资源可以消费
ringBuffer[index].publish(sequence);
}
}
/**
* 获得容器的capacity的数量
* @param index
* @return
*/
private long  remainingcapacity(int index){
//System.out.println("index:="+index);
if(index<0||index>=group) {
System.out.println("out of group index:="+index);
return 0L;
}
long capacity= ringBuffer[index].remainingCapacity();
return capacity;
}
private void shutdown0(){
for(int i=0;i<group;i++){
execute[i].shutdown();
}
}
////////////////////////////////下面是静态方法提供调用////////////////////////////////////////////////////////
/**
* 直接消费
* @param tk
*/
public static void addTask(int priority,Task tk){
instance.produce(priority,tk);
}
/**
* 定时消费
* @param tk
* @param delay
* @param period
*/
public static void scheduleTask(int priority,Task tk,long delay,long period){
Runnable timerTask = new ScheduledTask(priority, tk);
taskTimer.scheduleAtFixedRate(timerTask, delay, period, TimeUnit.MILLISECONDS);
}
/**
* 定点执行
* @param tk
* @param hourse
* @param minus
* @param sec
* @return
*/
public static Runnable scheduleTask(int priority,Task tk, int hourse,int minus,int sec)
{
Runnable timerTask = new ScheduledTask(priority, tk);
//每天2:30分执行
long delay = Helper.calcDelay(hourse,minus,sec);
long period = Helper.ONE_DAY;
System.out.println("delay:"+(delay/1000)+"secs");
taskTimer.scheduleAtFixedRate(timerTask, delay, period, TimeUnit.MILLISECONDS);
return timerTask;
}
//对定时执行的程序进行分装
private static class ScheduledTask implements Runnable
{
private int priority;
private Task task;
ScheduledTask(int priority, Task task)
{
this.priority = priority;
this.task = task;
}
public void run()
{
try{
instance.produce(priority,task);
}catch(Exception e){
System.out.println("catch exception in DisruptorHelper!");
}
}
}
public static long getRemainingCapatiye(int index){
return instance.getRemainingCapatiye(index);
}
public static void shutdown(){
if(!inited){
throw new RuntimeException("Disruptor还没有初始化!");
}
instance.shutdown0();
}
}

posted on 2014-05-15 11:53 顺其自然EVO 阅读(801) 评论(0)  编辑  收藏 所属分类: 测试学习专栏


只有注册用户登录后才能发表评论。


网站导航:
 
<2014年5月>
27282930123
45678910
11121314151617
18192021222324
25262728293031
1234567

导航

统计

常用链接

留言簿(55)

随笔分类

随笔档案

文章分类

文章档案

搜索

最新评论

阅读排行榜

评论排行榜