歡迎光臨
每天分享高質量文章

【死磕【Sharding-jdbc】—EventBus-輕量級行程內事件分發元件

點選上方“Java技術驛站”,選擇“置頂公眾號”。

有內涵、有價值的文章第一時間送達!

EventBus來自於google-guava包中。原始碼註釋如下:

Dispatches events to listeners, and provides ways for listeners to register themselves. The EventBus allows publish-subscribe-style communication between components  without requiring the components to explicitly register with one another (and thus be  aware of each other). It is designed exclusively to replace traditional Java in-process  event distribution using explicit registration. It is not a general-purpose publish-subscribe system, nor is it intended for interprocess communication.

翻譯:將事件分派給監聽器,併為監聽器提供註冊自己的方法。EventBus允許元件之間的釋出 – 訂閱式通訊,而不需要元件彼此明確註冊(並且因此彼此意識到)。 它專門用於使用顯式註冊替換傳統的Java行程內事件分發。 它不是一個通用的釋出 – 訂閱系統,也不是用於行程間通訊。

使用參考

關於EventBus的用例程式碼提取自sharding-jdbc原始碼,並結合lombok最大限度的簡化:

  • EventBusInstance–用於獲取EventBus實體(餓漢式單例樣式)

  1. @NoArgsConstructor(access = AccessLevel.PRIVATE)

  2. public final class EventBusInstance {

  3.    private static final EventBus INSTANCE = new EventBus();

  4.    public static EventBus getInstance() {

  5.        return INSTANCE;

  6.    }

  7. }

  • DMLExecutionEvent--釋出訂閱事件模型

  1. @Getter

  2. @Setter

  3. public class DMLExecutionEvent {

  4.    private String id;

  5.    private String dataSource;

  6.    private Date sendTime;

  7. }

  • DMLExecutionEventListener--事件監聽器

  1. public final class DMLExecutionEventListener {

  2.    @Subscribe

  3.    @AllowConcurrentEvents

  4.    public void listener(final DMLExecutionEvent event) {

  5.        System.out.println("監聽的DML執行事件: " + JSON.toJSONString(event));

  6.        // do something

  7.    }

  8. }

-- Main--主方法:註冊訂閱者監聽事件,以及釋出事件。

  1. /**

  2. * @author wangzhenfei9

  3. * @version 1.0.0

  4. * @since 2018年04月24日

  5. */

  6. public class Main {

  7.    static{

  8.        System.out.println("register listener...");

  9.        EventBusInstance.getInstance().register(new DMLExecutionEventListener());

  10.    }

  11.    public static void main(String[] args) throws InterruptedException {

  12.        for (int i=0; i<10; i++) {

  13.            pub();

  14.            Thread.sleep(3000);

  15.        }

  16.    }

  17.    private static void pub(){

  18.        DMLExecutionEvent event = new DMLExecutionEvent();

  19.        event.setId(String.valueOf(new Random().nextInt(1000)));

  20.        event.setDataSource("sj_db_1");

  21.        event.setSendTime(new Date());

  22.        System.out.println("釋出的DML執行事件: " + JSON.toJSONString(event));

  23.        EventBusInstance.getInstance().post(event);

  24.    }

  25. }

核心方法

EventBus一些重要方法解釋如下:

  • post(Object):Posts an event to all registered subscribers. This method will return successfully after the event has been posted to all subscribers, and regardless of any exceptions thrown by subscribers.

  • register(Object): Registers all subscriber methods on object to receive events.Subscriber methods are selected and classified using this EventBus's SubscriberFindingStrategy; the default strategy is the AnnotatedSubscriberFinder.

  • unregister(Object):Unregisters all subscriber methods on a registered object.

原始碼分析

主要分析釋出事件以及訂閱的核心原始碼;

釋出原始碼分析

  1. public void post(Object event) {

  2.    // 得到所有該類已經它的所有父類(因為有些註冊監聽器是監聽其父類)

  3.    Set<Class>> dispatchTypes = flattenHierarchy(event.getClass());

  4.    boolean dispatched = false;

  5.    // 遍歷類本身以及所有父類

  6.    for (Class> eventType : dispatchTypes) {

  7.        // 重入讀鎖先鎖住

  8.        subscribersByTypeLock.readLock().lock();

  9.        try {

  10.            // 得到類的所有訂閱者,例如DMLExecutionEvent的訂閱者就是DMLExecutionEventListener(EventSubscriber有兩個屬性:重要的屬性target和method,target就是監聽器即DMLExecutionEventListener,method就是監聽器方法即listener;從而知道DMLExecutionEvent這個事件由哪個類的哪個方法監聽處理)

  11.            Set<EventSubscriber> wrappers = subscribersByType.get(eventType);

  12.            if (!wrappers.isEmpty()) {

  13.                // 如果有時間訂閱者,那麼dispatched = true,表示該事件可以分發

  14.                dispatched = true;

  15.                // 遍歷所有的時間訂閱者,每個訂閱者的佇列都增加該事件

  16.                for (EventSubscriber wrapper : wrappers) {

  17.                    enqueueEvent(event, wrapper);

  18.                }

  19.            }

  20.        } finally {

  21.            subscribersByTypeLock.readLock().unlock();

  22.        }

  23.    }

  24.    if (!dispatched && !(event instanceof DeadEvent)) {

  25.        post(new DeadEvent(this, event));

  26.    }

  27.    // 分發進入佇列的事件

  28.    dispatchQueuedEvents();

  29. }

  30. /**

  31. * queues of events for the current thread to dispatch;

  32. * 核心資料結構為LinkedList,儲存的是EventBus.EventWithSubscriber型別資料

  33. */

  34. private final ThreadLocal<Queue<EventBus.EventWithSubscriber>> eventsToDispatch =

  35.        new ThreadLocal<Queue<EventBus.EventWithSubscriber>>() {

  36.            @Override protected Queue<EventBus.EventWithSubscriber> initialValue() {

  37.                return new LinkedList<EventBus.EventWithSubscriber>();

  38.            }

  39.        };

  40. void enqueueEvent(Object event, EventSubscriber subscriber) {

  41.    // 資料結構為new LinkedList(),EventWithSubscriber就是對event和subscriber的封裝,LinkedList資料結構保證進入佇列和消費佇列順序一致

  42.    eventsToDispatch.get().offer(new EventBus.EventWithSubscriber(event, subscriber));

  43. }

  44. /**

  45. * Drain the queue of events to be dispatched. As the queue is being drained,

  46. * new events may be posted to the end of the queue.

  47. * 排乾要被分發的事件佇列,正在排乾的過程中,可能有新的事件被追加到佇列尾部

  48. */

  49. void dispatchQueuedEvents() {

  50.    // don't dispatch if we're already dispatching, that would allow reentrancy

  51.    // and out-of-order events. Instead, leave the events to be dispatched

  52.    // after the in-progress dispatch is complete.

  53.    // 如果正在排乾佇列,則不分發

  54.    if (isDispatching.get()) {

  55.        return;

  56.    }

  57.    // ThreadLocal設定正在分發即isDispatching為true

  58.    isDispatching.set(true);

  59.    try {

  60.        Queue<EventBus.EventWithSubscriber> events = eventsToDispatch.get();

  61.        EventBus.EventWithSubscriber eventWithSubscriber;

  62.        while ((eventWithSubscriber = events.poll()) != null) {

  63.            // 呼叫訂閱者處理事件(method.invoke(target, new Object[] { event });,method和target來自訂閱者)

  64.            dispatch(eventWithSubscriber.event, eventWithSubscriber.subscriber);

  65.        }

  66.    } finally {

  67.        // ThreadLocal可能記憶體洩漏,用完需要remove

  68.        isDispatching.remove();

  69.        // 佇列中的事件任務處理完,清空佇列,即所謂的排乾(Drain)

  70.        eventsToDispatch.remove();

  71.    }

  72. }

訂閱原始碼分析

  1. /**

  2. * Registers all subscriber methods on {@code object} to receive events.

  3. * 註冊object上所有訂閱方法,用來接收事件,上面的使用參考,DMLExecutionEventListener就是這裡的object

  4. */

  5. public void register(Object object) {

  6.    // Multimap是guava自定義資料結構,類似Map>,key就是事件型別,例如DMLExecutionEvent,value就是EventSubscriber即事件訂閱者集合(說明,這個的訂閱者集合是指object裡符合訂閱者的所有方法,例如DMLExecutionEventListener.listener(),DMLExecutionEventListener中可以有多個訂閱者,註解@Subscribe即可),

  7.    Multimap<Class>, EventSubscriber> methodsInListener =

  8.            finder.findAllSubscribers(object);

  9.    // 重入寫鎖保證執行緒安全

  10.    subscribersByTypeLock.writeLock().lock();

  11.    try {

  12.        // 把訂閱者資訊放到map中快取起來(釋出事件post()時就會用到)

  13.        subscribersByType.putAll(methodsInListener);

  14.    } finally {

  15.        subscribersByTypeLock.writeLock().unlock();

  16.    }

  17. }

END

贊(0)

分享創造快樂

© 2024 知識星球   網站地圖