public abstract class Task implements Runnable { public Task(){} } public class TaskEvent { private Task tk; public Task getTask() { return tk; } public void setTask(Task tk) { this.tk = tk; } public final static EventFactory<TaskEvent> EVENT_FACTORY = new EventFactory<TaskEvent>() { public TaskEvent newInstance() { return new TaskEvent(); } }; public class TaskEventHandler implements EventHandler<TaskEvent> { // 执行接口函数onEvent执行 public void onEvent(TaskEvent event, long sequence, boolean endOfBatch) throws Exception { event.getTask().run(); } } } import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import com.zhenhai.bonecp.CustomThreadFactory; import com.zhenhai.disruptor.BatchEventProcessor; import com.zhenhai.disruptor.RingBuffer; import com.zhenhai.disruptor.SequenceBarrier; import com.zhenhai.disruptor.YieldingWaitStrategy; import com.zhenhai.disruptor.dsl.ProducerType; /** * 使用方法 DisruptorHelper.initAndStart(); Task tt=new Taska(); DisruptorHelper.produce(tt); DisruptorHelper.shutdown(); * * */ public class DisruptorHelper { /** * ringbuffer容量,最好是2的N次方 */ private static final int BUFFER_SIZE = 1024 * 1; private static int group=2; private RingBuffer<TaskEvent> ringBuffer[]; private SequenceBarrier sequenceBarrier[]; private TaskEventHandler handler[]; private BatchEventProcessor<TaskEvent> batchEventProcessor[]; private static DisruptorHelper instance; private static boolean inited = false; private static ScheduledExecutorService taskTimer=null; //JDK 创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。 private ExecutorService execute[]; //启动监视线程 static { System.out.println("init DisruptorHelper!!!!!!!!!!!!!!!!!"); instance = new DisruptorHelper(); instance.init(); inited = true; System.out.println("init DisruptorHelper end!!!!!!!!!!!!!!!!!"); } |
**
* 静态类
* @return
*/
private DisruptorHelper(){ }
/**
* 初始化
*/
private void init(){
execute=new ExecutorService[group];
ringBuffer=new RingBuffer[group];
sequenceBarrier=new SequenceBarrier[group];
handler=new TaskEventHandler[group];
batchEventProcessor=new BatchEventProcessor[group];
////////////////定时执行////////////////
//初始化ringbuffer,存放Event
for(int i=0;i<group;i++){
ringBuffer[i] = RingBuffer.create(ProducerType.SINGLE, TaskEvent.EVENT_FACTORY, BUFFER_SIZE, new YieldingWaitStrategy());
sequenceBarrier[i] = ringBuffer[i].newBarrier();
handler[i] = new TaskEventHandler();
batchEventProcessor[i] = new BatchEventProcessor<TaskEvent>(ringBuffer[i], sequenceBarrier[i], handler[i]);
ringBuffer[i].addGatingSequences(batchEventProcessor[i].getSequence());
execute[i]= Executors.newSingleThreadExecutor();
execute[i].submit(instance.batchEventProcessor[i]);
}
this.taskTimer = Executors.newScheduledThreadPool(10, new CustomThreadFactory("DisruptorHelper-scheduler", true));
inited = true;
}
/**
* 执行定时器
* @param tk
*/
private void produce(int index,Task tk){
//System.out.println("index:="+index);
if(index<0||index>=group) {
System.out.println("out of group index:="+index);
return;
}
// if capacity less than 10%, don't use ringbuffer anymore
System.out.println("capacity:="+ringBuffer[index].remainingCapacity());
if(ringBuffer[index].remainingCapacity() < BUFFER_SIZE * 0.1) {
System.out.println("disruptor:ringbuffer avaliable capacity is less than 10 %");
// do something
}else {
long sequence = ringBuffer[index].next();
//将状态报告存入ringBuffer的该序列号中
ringBuffer[index].get(sequence).setTask(tk);
//通知消费者该资源可以消费
ringBuffer[index].publish(sequence);
}
}
/**
* 获得容器的capacity的数量
* @param index
* @return
*/
private long remainingcapacity(int index){
//System.out.println("index:="+index);
if(index<0||index>=group) {
System.out.println("out of group index:="+index);
return 0L;
}
long capacity= ringBuffer[index].remainingCapacity();
return capacity;
}
private void shutdown0(){
for(int i=0;i<group;i++){
execute[i].shutdown();
}
}
////////////////////////////////下面是静态方法提供调用////////////////////////////////////////////////////////
/**
* 直接消费
* @param tk
*/
public static void addTask(int priority,Task tk){
instance.produce(priority,tk);
}
/**
* 定时消费
* @param tk
* @param delay
* @param period
*/
public static void scheduleTask(int priority,Task tk,long delay,long period){
Runnable timerTask = new ScheduledTask(priority, tk);
taskTimer.scheduleAtFixedRate(timerTask, delay, period, TimeUnit.MILLISECONDS);
}
/**
* 定点执行
* @param tk
* @param hourse
* @param minus
* @param sec
* @return
*/
public static Runnable scheduleTask(int priority,Task tk, int hourse,int minus,int sec)
{
Runnable timerTask = new ScheduledTask(priority, tk);
//每天2:30分执行
long delay = Helper.calcDelay(hourse,minus,sec);
long period = Helper.ONE_DAY;
System.out.println("delay:"+(delay/1000)+"secs");
taskTimer.scheduleAtFixedRate(timerTask, delay, period, TimeUnit.MILLISECONDS);
return timerTask;
}
//对定时执行的程序进行分装
private static class ScheduledTask implements Runnable
{
private int priority;
private Task task;
ScheduledTask(int priority, Task task)
{
this.priority = priority;
this.task = task;
}
public void run()
{
try{
instance.produce(priority,task);
}catch(Exception e){
System.out.println("catch exception in DisruptorHelper!");
}
}
}
public static long getRemainingCapatiye(int index){
return instance.getRemainingCapatiye(index);
}
public static void shutdown(){
if(!inited){
throw new RuntimeException("Disruptor还没有初始化!");
}
instance.shutdown0();
}
}