package com.mavsplus.example.disruptor;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
/**
* Distrupor Getting Started
*
* <a
* href="https://github.com/LMAX-Exchange/disruptor/wiki/Getting-Started"></a>
*
* @author landon
* @since 1.8.0_25
*/
public class DisruptorExample {
/**
* the Event that will carry the data
*/
public static class LongEvent {
private long value;
public void set(long value) {
this.value = value;
}
@Override
public String toString() {
return "LongEvent [value=" + value + "]";
}
}
public static void main(String[] args) throws Exception {
// usingJava8();
usingJava8Another();
}
// Using Java8
@SuppressWarnings("unchecked")
private static void usingJava8() throws InterruptedException {
// Executor that will be used to construct new threads for consumers
Executor executor = Executors.newCachedThreadPool();
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 1024;
// Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, executor);
// Connect the handler
disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " + event));
// Start the Disruptor, starts all threads running
disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++) {
bb.putLong(0, l);
ringBuffer.publishEvent((event, sequence) -> event.set(bb.getLong(0)));
Thread.sleep(1000);
}
}
/**
* <code>
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++)
{
bb.putLong(0, l);
ringBuffer.publishEvent((event, sequence) -> event.set(bb.getLong(0)));
Thread.sleep(1000);
}
</code> This would create a capturing lambda, meaning that it would need
* to instantiate an object to hold the ByteBuffer bb variable as it passes
* the lambda through to the publishEvent() call. This will create
* additional (unnecessary) garbage, so the call that passes the argument
* through to the lambda should be preferred if low GC pressure is a
* requirement.Give that method references can be used instead of anonymous
* lamdbas it is possible to rewrite the example in this fashion.
*
*/
@SuppressWarnings("unchecked")
private static void usingJava8Another() throws InterruptedException {
// Executor that will be used to construct new threads for consumers
Executor executor = Executors.newCachedThreadPool();
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 1024;
// Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, executor);
// Connect the handler
disruptor.handleEventsWith(DisruptorExample::handleEvent);
// Start the Disruptor, starts all threads running
disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++) {
bb.putLong(0, l);
ringBuffer.publishEvent(DisruptorExample::translate, bb);
Thread.sleep(1000);
}
}
public static void handleEvent(LongEvent event, long sequence, boolean endOfBatch) {
System.out.println(event);
}
public static void translate(LongEvent event, long sequence, ByteBuffer buffer) {
event.set(buffer.getLong(0));
}
}
posted on 2015-06-15 18:34
landon 阅读(3934)
评论(0) 编辑 收藏 所属分类:
Program 、
ServerFramework