阅读724 返回首页    go 魔兽


简单了解Disruptor(二)

2.  Disruptor什么时候用

Disruptor适用于两个独立的处理过程(两个线程)之间交换数据。下面以两个简单场景举例:

例如场景一:

停车批量入场数据上报,数据上报后需要对每条入场数据存入DB,还需要发送kafka消息给其他业务系统。如果执行完所有的操作,再返回,那么接口耗时比较长,我们可以批量上报后验证数据正确性,通过后按单条入场数据写入环形队列,然后直接返回成功。

实现方式一:启 动2个消费者线程,一个消费者去执行db入库,一个消费者去发送kafka消息。

实现方式二:启动4个消费者,2个消费者并发执行db入库,两个消费者并发发送kafka消息,充分利用cpu多核特性,提高执行效率。

实现方式三:如果要求写入DB和kafka后,需要给用户发送短信。那么可以启动三个消费者线程,一个执行db插入,一个执行kafka消息发布,最后一个依赖前两个线程执行成功,前两个线程都执行成功后,该线程执行短信发送。

例如场景二:

你在网上使用信用卡下订单。一个简单的零售系统将获取您的订单信息,使用信用卡验证服务,以检查您的信用卡号码,然后确认您的订单 – 所有这些都在一个单一过程中操作。当进行信用卡有效性检查时,服务器这边的线程会阻塞等待,当然这个对于用户来说停顿不会太长。

在MAX架构中,你将此单一操作过程分为两个,第一部分将获取订单信息,然后输出事件(请求信用卡检查有效性的请求事件)给信用卡公司. 业务逻辑处理器将继续处理其他客户的订单,直至它在输入事件中发现了信用卡已经检查有效的事件,然后获取该事件来确认该订单有效。

 

3.  Disruptor为什么快

2.1数组实现

用数组实现, 解决了链表节点分散, 不利于cache预读问题,可以预分配用于存储事件内容的内存空间;并且解决了节点每次需要分配和释放, 需要大量的垃圾回收GC问题 (数组内元素的内存地址的连续性存储的,在硬件级别,数组中的元素是会被预加载的,因为只要一个元素被加载到缓存行,其他相邻的几个元素也会被加载进同一个缓存行)

2.2求余操作优化

求余操作本身也是一种高耗费的操作, 所以ringbuffer的size设成2的n次方, 可以利用位操作来高效实现求余。要找到数组中当前序号指向的元素,可以通过mod操作,正常通过sequence mod array length = array index,优化后可以通过:sequence & (array length-1) = array index实现。比如一共有8槽,3&(8-1)=3,HashMap就是用这个方式来定位数组元素的,这种方式比取模的速度更快。

 

2.3 预读与批量

相比链表队列,实现数组预读,减少结点操作空间释放和申请,从而减少gc次数。生产者支持单生产,多生产者模式,单生产者cursor使用普通long实现,无锁加快速度,多生产者才使用Sequence(AtomicLong)

生产和消费元素支持单线程批量操作数据。

2.4 Lock-Free

系统态的锁会导致线程cache丢失. 锁竞争的时候需要进行仲裁. 这个仲裁会涉及到操作系统的内核切换, 并且在此过程中操作系统需要做一系列操作, 导致原有线程的指令缓存和数据缓很可能被丢掉

– 用户态的锁往往是通过自旋锁来实现(自旋即忙等), 而自旋在竞争激烈的时候开销是很大的(一直在消耗CPU资源)

disruptor不使用锁, 使用CAS(Compare And Swap/Set),严格意义上说仍然是使用锁, 因为CAS本质上也是一种乐观锁, 只不过是CPU级别指令, 不涉及到操作系统, 所以效率很高(AtomicLong实现Sequence)

CAS说明:

  • CAS依赖于处理器的支持, 当然大部分现代处理器都支持.
  • CAS相对于锁是非常高效的, 因为它不需要涉及内核上下文切换进行仲裁.
  • CAS并不是免费的, 它会涉及到对指令pipeline加锁, 并且会用到内存barrier(用来刷新内存状态,简单理解就是把缓存中,寄存器中的数据同步到内存中去)

 

2.5 解决伪共享{False Sharing}

Cpu cache简单示意图:

 

 

上面谈到lock的耗费, 主要也是由于内核的切换导致cache的丢失

所以cache是优化的关键, cache越接近core就越快,也越小 。

其中L1,L2,L3等级缓存都是由缓存行组成的, 通常是64字节, 一个Java的long类型是8字节,因此在一个缓存行中可以存8个long类型的变量. 缓存行是缓存更新的基本单位, 就算你只读一个变量, 系统也会预读其余7个, 并cache这一行, 并且这行中的任一变量发生改变, 都需要重新加载整行, 而非仅仅重新加载一个变量.

伪共享举例:

比如在链表中往往会连续定义head和tail指针, 所以对于cache-line的预读, 很有可能会导致head和tail在同一cache-line。在实际使用中, 往往producer线程会持续更改tail指针, 而consumer线程会持续更改head指针

当producer线程和consumer线程分别被分配到core2和core1, 就会出现以下状况,由于core1不断改变h, 导致该cache-line过期, 对于core2, 虽然他不需要读h, 或者t也没有改变, 但是由于cache-line的整行更新, 所以core2仍然需要不停的更新它的cache,core2的缓存未命中被一个和它本身完全不相干的值h, 而被大大提高, 导致cache效率底下,而实际情况下, core1会不断更新h, 而core2会不断更新t, 导致core1和core2都需要频繁的重新load cache, 这就是伪共享问题

 

在Disruptor里我们对RingBuffer的cursor和BatchEventProcessor的序列进行了缓存行填充,如下:

 

class LhsPadding {
    protected long p1;
    protected long p2;
    protected long p3;
    protected long p4;
    protected long p5;
    protected long p6;
    protected long p7;

    LhsPadding() {
    }
}

2.6 使用内存屏障

内存屏障本身不是一种优化方式, 而是你使用lock-free(CAS)的时候, 必须要配合使用内存屏障,因为CPU和memory之间有多级cache, CPU core只会更新cache-line, 而cache-line什么时候flush到memory, 这个是有一定延时的 ,在这个延时当中, 其他CPU core是无法得知你的更新的, 因为只有把cache-line flush到memory后, 其他core中的相应的cache-line才会被置为过期数据,所以如果要保证使用CAS能保证线程间互斥, 即乐观锁, 必须当一个core发生更新后, 其他所有core立刻知道并把相应的cache-line设为过期, 否则在这些core上执行CAS读到的都是过期数据.

内存屏障 = “立刻将cache-line flush到memory, 没有延时”

注:可参考java中volatile的原理,同样实现了内存屏障。

 

4.  使用Disruptor开发

下面以车辆入场为例,入场后需要存入数据库,需要发送kafka消息,两步执行完后,给用户发送短信。代码实现如下:(参见代码运行)

开发步骤:

  1. 定义事件
/**
 * @author mawming
 * @version 1.0
 * @create date:2016-9-12.
 */
public class InParkingDataEvent {
    private String carLicense = "";

    public void setCarLicense(String carLicense)
    {
        this.carLicense = carLicense;
    }
    public String getCarLicense()
    {
        return carLicense;
    }
}
  1. 定义事件处理的具体实现

 

/**
 * @author mawming
 * @version 1.0
 * @create date:2016-9-12.
 */
public class ParkingDataInDbHandler implements EventHandler<InParkingDataEvent>,WorkHandler<InParkingDataEvent>{

   @Override
   public void onEvent(InParkingDataEvent event) throws Exception {
      long threadId = Thread.currentThread().getId();
        String carLicense = event.getCarLicense();
      System.out.println(String.format("Thread Id %s save %s into db ....",threadId,carLicense));
   }

   @Override
   public void onEvent(InParkingDataEvent event, long sequence, boolean endOfBatch) throws Exception {
      // TODO Auto-generated method stub
      this.onEvent(event);  
   }

}

public class ParkingDataSmsHandler implements EventHandler<InParkingDataEvent> {


    @Override
    public void onEvent(InParkingDataEvent event, long sequence, boolean endOfBatch) throws Exception {
            long threadId = Thread.currentThread().getId();
            String carLicense = event.getCarLicense();
            System.out.println(String.format("Thread Id %s send %s in plaza sms to user",threadId,carLicense));
    }
}

public class ParkingDataToKafkaHandler implements EventHandler<InParkingDataEvent> {
     
    @Override  
    public void onEvent(InParkingDataEvent event, long sequence,
            boolean endOfBatch) throws Exception {  
       long threadId = Thread.currentThread().getId();
        String carLicense = event.getCarLicense();
        System.out.println(String.format("Thread Id %s send %s in plaza messsage to kafka...",threadId,carLicense));
    }  
}

3.发布事件类实现(Disruptor 要求 RingBuffer.publish 必须得到调用,如果发生异常也一样要调用 publish ,那么,很显然这个时候需要调用者在事件处理的实现上来判断事件携带的数据是否是正确的或者完整的)
public class InParkingDataEventPublisher implements Runnable{
    Disruptor<InParkingDataEvent> disruptor;
    private CountDownLatch latch;  
    //private static int LOOP=10000;//模拟一万车辆入场
    private static int LOOP=10;//模拟10车辆入场


    public InParkingDataEventPublisher(CountDownLatch latch,Disruptor<InParkingDataEvent> disruptor) {
        this.disruptor=disruptor;  
        this.latch=latch;  
    }  
  
    @Override  
    public void run() {
        InParkingDataEventTranslator tradeTransloator=new InParkingDataEventTranslator();
        for(int i=0;i<LOOP;i++){

            disruptor.publishEvent(tradeTransloator);
            try {
                Thread.currentThread().sleep(1000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }  
        latch.countDown();
        System.out.println("生产者写完" +LOOP + "个消息");
    }  
      
}  
  
class InParkingDataEventTranslator implements EventTranslator<InParkingDataEvent>{

    @Override  
    public void translateTo(InParkingDataEvent event, long sequence) {
        this.generateTradeTransaction(event);  
    }  
    private InParkingDataEvent generateTradeTransaction(InParkingDataEvent event){
       int num =  (int)(Math.random()*8000);
        num = num + 1000;
       event.setCarLicense("京Z" + num);
        System.out.println("Thread Id " + Thread.currentThread().getId() + " 写完一个event");
        return event;
    }
}
  1. 定义用于事件处理的线程池, 指定等待策略, 启动 Disruptor,执行完毕后关闭Disruptor
/**
 * @author mawming
 * @version 1.0
 * @create date:2016-9-12.
 * 测试 P1生产消息,C1,C2消费消息,C1和C2会共享所有的event元素! C3依赖C1,C2处理结果
 */
public class TestP1c12c3 {
   public static void main(String[] args) throws InterruptedException {  
        long beginTime=System.currentTimeMillis();  
          
        int bufferSize=1024;
        //Disruptor交给线程池来处理,共计 p1,c1,c2,c3四个线程
        ExecutorService executor=Executors.newFixedThreadPool(4);
        //构造缓冲区与事件生成
        Disruptor<InParkingDataEvent> disruptor=new Disruptor<InParkingDataEvent>(new EventFactory<InParkingDataEvent>() {
            @Override  
            public InParkingDataEvent newInstance() {
                return new InParkingDataEvent();
            }  
        }, bufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy());
          
        //使用disruptor创建消费者组C1,C2  
        EventHandlerGroup<InParkingDataEvent> handlerGroup=disruptor
                .handleEventsWith(new ParkingDataToKafkaHandler(),new ParkingDataInDbHandler());
          
        ParkingDataSmsHandler smsHandler=new ParkingDataSmsHandler();
        //声明在C1,C2完事之后执行JMS消息发送操作 也就是流程走到C3  
        handlerGroup.then(smsHandler);

        disruptor.start();//启动  
        CountDownLatch latch=new CountDownLatch(1);  
        //生产者准备  
        executor.submit(new InParkingDataEventPublisher(latch, disruptor));
        latch.await();//等待生产者结束
        disruptor.shutdown();  
        executor.shutdown();  
          
        System.out.println("总耗时:"+(System.currentTimeMillis()-beginTime));  
    }  
}

 

 

5.   相关资料

 Disruptor源码地址: https://github.com/LMAX-Exchange/disruptor

转载自 并发编程网 - ifeve.com

最后更新:2017-05-19 10:25:18

  上一篇:go  Storm-源码分析-Topology Submit-Task
  下一篇:go  《云周刊》第122期:勒索病毒防护全攻略,不再让服务器裸奔