Python 官方文档:入门教程 => 点击学习
目录一、Disruptor简介二、浅聊Disruptor的核心三、Disruptor使用3.1pom.xml3.2事件Event3.3EventFactory3.4EventHand
Disruptor目前是世界上最快的单机消息队列,由英国外汇交易公司LMAX开发,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注。2011年,企业应用软件专家Martin Fowler专门撰写长文介绍。同年它还获得了oracle官方的Duke大奖。目前,包括Apache StORM、Camel、Log4j 2在内的很多知名项目都应用了Disruptor以获取高性能。
Disruptor维护了一个环形队列RingBuffer,这个队列本质上是一个首位相连的数组。相比于LinkedBlockdingQueue,RingBuffer的数组结构在查找方面效率更高。此外,LinkedBlockingQueue需要维护一个头节点指针head和一个尾节点指针tail,而RingBuffer只需要维护一个sequence指向下一个可用的位置即可。所以从这两点来说,RingBuffer比LinkedBlockingQueue要快。
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.3</version>
</dependency>
Disruptor是基于事件的生产者消费者模型。其RingBuffer中存放的其实是将消息封装成的事件。这里定义了一个LongEvent,表示消息队列中存放的是long类型的数据。
public class LongEvent {
private long value;
public void set(long value) {
this.value = value;
}
@Override
public String toString() {
return "LongEvent{" +
"value=" + value +
'}';
}
}
实现EventFactory接口,定义Event工厂,用于填充队列。Event工厂其实是为了提高Disruptor的效率,初始化的时候,会调用Event工厂,对RingBuffer进行内存的提前分配,GC的频率会降低。
import com.lmax.disruptor.EventFactory;
public class LongEventFactory implements EventFactory<LongEvent> {
public LongEvent newInstance() {
return new LongEvent();
}
}
实现EventHandler接口,定义EventHandler(消费者),处理容器中的元素。
import com.lmax.disruptor.EventHandler;
public class LongEventHandler implements EventHandler<LongEvent> {
public void onEvent(LongEvent event, long sequence, boolean endOfBatch) {
System.out.println("Event: " + event + ", sequence: " + sequence);
}
}
import cn.flying.space.disruptor.demo.LongEvent;
import com.lmax.disruptor.RingBuffer;
import java.NIO.ByteBuffer;
public class LongEventProducer {
private RingBuffer<LongEvent> ringBuffer;
public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData(ByteBuffer byteBuffer) {
// 定位到下一个可存放的位置
long sequence = ringBuffer.next();
try {
// 拿到该位置的event
LongEvent event = ringBuffer.get(sequence);
// 设置event的值
event.set(byteBuffer.getLong(0));
} finally {
// 发布
ringBuffer.publish(sequence);
}
}
}
import cn.flying.space.disruptor.demo.LongEvent;
import cn.flying.space.disruptor.demo.LongEventFactory;
import cn.flying.space.disruptor.demo.LongEventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import java.nio.ByteBuffer;
import java.util.concurrent.Executors;
public class TestMain {
public static void main(String[] args) throws InterruptedException {
// 定义event工厂
LongEventFactory factory = new LongEventFactory();
// ringBuffer长度
int bufferSize = 1024;
// 构造一个Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, Executors.defaultThreadFactory());
// 绑定handler
disruptor.handleEventsWith(new LongEventHandler());
// 启动Disruptor
disruptor.start();
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
LongEventProducer producer = new LongEventProducer(ringBuffer);
ByteBuffer byteBuffer = ByteBuffer.allocate(8);
for (long i = 0; true; i++) {
byteBuffer.clear();
byteBuffer.putLong(i);
// 投递消息
producer.onData(byteBuffer);
Thread.sleep(1000);
}
}
}
import cn.flying.space.disruptor.demo.LongEvent;
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;
import java.nio.ByteBuffer;
public class LongEventProducerUsingTranslator {
private RingBuffer<LongEvent> ringBuffer;
public LongEventProducerUsingTranslator(RingBuffer<LongEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<LongEvent, ByteBuffer>() {
@Override
public void translateTo(LongEvent longEvent, long l, ByteBuffer byteBuffer) {
longEvent.set(byteBuffer.getLong(0));
}
};
public void onData(ByteBuffer byteBuffer) {
ringBuffer.publishEvent(TRANSLATOR, byteBuffer);
}
}
import cn.flying.space.disruptor.demo.LongEvent;
import cn.flying.space.disruptor.demo.LongEventFactory;
import cn.flying.space.disruptor.demo.LongEventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;
import java.nio.ByteBuffer;
public class TestMain {
public static void main(String[] args) throws InterruptedException {
LongEventFactory factory = new LongEventFactory();
int bufferSize = 1024;
Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE);
disruptor.handleEventsWith(new LongEventHandler());
disruptor.start();
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
LongEventProducerUsingTranslator producer = new LongEventProducerUsingTranslator(ringBuffer);
ByteBuffer byteBuffer = ByteBuffer.allocate(8);
for (long i = 0L; true; i++) {
byteBuffer.putLong(0, i);
// 发布
producer.onData(byteBuffer);
Thread.sleep(1000);
}
}
}
到此这篇关于Java多线程之Disruptor入门的文章就介绍到这了,更多相关Java Disruptor入门内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!
--结束END--
本文标题: Java多线程之Disruptor入门
本文链接: https://www.lsjlt.com/news/124880.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
下载Word文档到电脑,方便收藏和打印~
2024-03-01
2024-03-01
2024-03-01
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0