nest系列-event事件总线
设计思想
- 以系统思维为外部提供对外异步事件通知
- 支持系统间的异步事件请求,由分布式消息中间件提供
- 支持系统内的同步事件请求,由内部观察者模型提供(单线程)
- 通过事件总线统一发起事件及注册事件
- 通过可配置的扩展为事件提供消息通道
- 可通过插件方式为指定的消息中间件提供消息提供者
- 异步事件通过消息标识来提供消费者幂等性
- 异步事件支持最终一致性,即当工作单元的实体提交成功后再发起异步消息的提交,如果异步提交失败使用补偿机制重发消息
设计草图
-
-
数据流程
发起事件
应用程序通过事件总线服务发布事件,发布服务用事件名称去事件配置管理器找到该事件需要使用的消息通道。使用消息通道找到如果没有找到指定的消息通道,默认使用同步通道发送事件。如果找到通道,则使用通道提供的生产者执行发送消息的动作。
生产者分为同步类的生产者和异步类的生产者
同步类生产者直接使用观察者模式将消息发送给监听者,监听者调用具体的事件处理器处理事件消息
异步类生产者将事件信息构建为分布式事件信息,并且把这个分布式事件信息放入工作单元待发队列中。
当工作单元的实体提交完成将发起事件提交,事件提交将待发队列中的分布式事件信息提出,写入提交事件管理器中开始提交事件,事件信息的提交行为调用具体的分布式事件生产者的提交消息到消息中间件成功后更新事件提交管理器中的事件状态。
事件提交管理器提供补偿机制将事件提交状态为失败的事件重新发布直到成功
-
订阅事件
通过事件产总线服务发起事件注册,注册时需要提交一个事件处理器,事件处理器包括事件名称及处理方法。
总线服务收到事件注册信息后,通过事件配置管理器找到具体的事件通道提供的消费者
如果消费者是一个同步类消费者时直接创建一个消息监听器并添加到观察者中。
如果消费者是一个异步类消息者时,将启动一个单独的线程获取消息中间件的消息。从消息中间件中获取到的消息根据消息标识去事件记录管理器中查询是否已经处理过,如果已经处理过的消息直接返回给消息中间件,如果没有处理的消息调起事件处理器指定的处理方法。
扩展消息通道
-
继承并实现消息通道、生产者、消费者。可参考activemq消息通道的实现插件
代码地址:https://github.com/jovezhao/nest/tree/master/nest-ddd/src/main/java/com/jovezhao/nest/ddd/event
最后更新:2017-11-13 18:34:14