public class LinkedBlockingQueueTest {
public static int eventNum = 5000000;
public static void main(String[] args) {
final BlockingQueue queue = new LinkedBlockingQueue();
final long startTime = System.currentTimeMillis();
new Thread(new Runnable() {
@Override
public void run() {
int i = 0;
while (i < eventNum) {
LogEvent logEvent = new LogEvent(i, "c" i);
try {
queue.put(logEvent);
} catch (InterruptedException e) {
e.printStackTrace();
}
i ;
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
int k = 0;
while (k < eventNum) {
try {
queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
k ;
}
long endTime = System.currentTimeMillis();
System.out
.println("costTime = " (endTime - startTime) "ms");
}
}).start();
}
}
public class DisruptorTest {
public static void main(String[] args) {
LogEventFactory factory = new LogEventFactory();
int ringBufferSize = 65536;
final Disruptor disruptor = new Disruptor(factory,
ringBufferSize, DaemonThreadFactory.INSTANCE,
ProducerType.SINGLE, new BusySpinWaitStrategy());
LogEventConsumer consumer = new LogEventConsumer();
disruptor.handleEventsWith(consumer);
disruptor.start();
new Thread(new Runnable() {
@Override
public void run() {
RingBuffer ringBuffer = disruptor.getRingBuffer();
for (int i = 0; i < LinkedBlockingQueueTest.eventNum; i ) {
long seq = ringBuffer.next();
LogEvent logEvent = ringBuffer.get(seq);
logEvent.setLogId(i);
logEvent.setContent("c" i);
ringBuffer.publish(seq);
}
}
}).start();
}
}
public class LogEventConsumer implements EventHandler {
private long startTime;
private int i;
public LogEventConsumer() {
this.startTime = System.currentTimeMillis();
}
public void onEvent(LogEvent logEvent, long seq, boolean bool)
throws Exception {
i ;
if (i == LinkedBlockingQueueTest.eventNum) {
long endTime = System.currentTimeMillis();
System.out.println(" costTime = " (endTime - startTime) "ms");
}
}
}
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
public class FalseSharing implements Runnable {
public final static int NUM_THREADS = 4;
public final static long ITERATIONS = 50000000;
private final int arrayIndex;
private static VolatileLong[] longs = new VolatileLong[NUM_THREADS];
static {
for (int i = 0; i < longs.length; i ) {
longs[i] = new VolatileLong();
}
}
public FalseSharing(final int arrayIndex) {
this.arrayIndex = arrayIndex;
}
public static void main(final String[] args) throws Exception {
final long start = System.currentTimeMillis();
runTest();
System.out.println("costTime = " (System.currentTimeMillis() - start) "ms");
}
private static void runTest() throws InterruptedException {
Thread[] threads = new Thread[NUM_THREADS];
for (int i = 0; i < threads.length; i ) {
threads[i] = new Thread(new FalseSharing(i));
}
for (Thread t : threads) {
t.start();
}
for (Thread t : threads) {
t.join();
}
}
@Override
public void run() {
long i = ITERATIONS 1;
while (0 != --i) {
longs[arrayIndex].value = i;
}
}
public final static class VolatileLong {
public volatile long value = 0L;
public long p1, p2, p3, p4, p5, p6;
}
}