EventBus Basic Usage

最近在看Domain Event的實作方式,剛好看到Teddy分享使用Guava的EventBus來處理事件的註冊與分派,於是花了一些時間實驗API的用法。以下內容分享給大家。Source code可以從link下載。

Event

我的範例事件為MJGotGodGuyCardEvent:

public class MJGotGodGuyCardEvent {
 
    private final String girlName;
 
    public MJGotGodGuyCardEvent(String girlName) {
        this.girlName = girlName;
    }
 
    public String getGirlName() {
        return girlName;
    }
 
}

Event Handler

我的事件處理者有兩個,第一個為MJGoToPronhub。需注意的是:

  • @Subscribe: 用以宣告處理函式,函式只允許一個參數,且要與Event物件相同。
  • @AllowConcurrentEvents: 用以告知EventBus此函式可以接受Concurrent存取,預設會使用循序方式存取此函式。

public class MJGoToPronhub {
    private static Logger logger = LoggerFactory.getLogger(MJGoToPronhub.class);
    private List<MJGotGodGuyCardEvent> receivedEvents = new CopyOnWriteArrayList<>();
 
    @AllowConcurrentEvents
    @Subscribe
    public void handle(MJGotGodGuyCardEvent event) {
        receivedEvents.add(event);
        logger.info("MJGoToPronhub due to {}", event.getGirlName());
    }
 
    public List<MJGotGodGuyCardEvent> getReceivedEvents(){
        return receivedEvents;
    }
 
}
另外一個實作內容類似,不做贅述,名稱為MJGoToWanhua。

EventBus

用法相當簡單,只要透過EventBus的register把處理物件進去後,在負責發送通知的client使用post即可“循序”的讓handler處理訊息:

public class TestEventBus {
 
    private EventBus eventBus = new EventBus();
 
    private MJGoToPronhub MJGoToPronhubHandler = new MJGoToPronhub();
    private MJGoToWanhua MJGoToWanhuaHandler = new MJGoToWanhua();
 
    private void thenTheHandlerShouldReceiveTheEvent() {
        assertEquals(1, MJGoToPronhubHandler.getReceivedEvents().size());
        assertEquals(1, MJGoToWanhuaHandler.getReceivedEvents().size());
    }
 
    private void givenEventBusRegisterTwoGoodHandler(EventBus eventBus) {
        eventBus.register(MJGoToPronhubHandler);
        eventBus.register(MJGoToWanhuaHandler);
    }
 
    @Test
    public void ShouldGetReceivedEventsWhenPostEventToHandlers() {
        givenEventBusRegisterTwoGoodHandler(eventBus);
        MJGotGodGuyCardEvent event = new MJGotGodGuyCardEvent("Nancy");
 
        eventBus.post(event);
 
        thenTheHandlerShouldReceiveTheEvent();
    }
}

AsyncEventBus

假如覺得循序處理太慢,可以使用AsyncEventBus,使用方法與EventBus相同,但它post是non-blocking的:

public class TestAsyncEventBus {
    private AsyncEventBus asyncEventBus = new AsyncEventBus(Executors.newCachedThreadPool());
 
    private MJGoToPronhub MJGoToPronhubHandler = new MJGoToPronhub();
    private MJGoToWanhua MJGoToWanhuaHandler = new MJGoToWanhua();
 
    private void thenTheHandlerShouldReceiveTheEvent() {
        assertEquals(1, MJGoToPronhubHandler.getReceivedEvents().size());
        assertEquals(1, MJGoToWanhuaHandler.getReceivedEvents().size());
    }
 
    private void givenEventBusRegisterTwoGoodHandler(EventBus eventBus) {
        eventBus.register(MJGoToPronhubHandler);
        eventBus.register(MJGoToWanhuaHandler);
    }
 
    private MJGotGodGuyCardEvent givenDelayedMJGotGodGuyCardEvent(CountDownLatch latch) {
        return new MJGotGodGuyCardEvent("Nancy") {
            @Override
            public String getGirlName() {
                try {
                    TimeUnit.SECONDS.sleep(1);
                    return super.getGirlName();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } finally {
                    latch.countDown();
                }
            }
        };
    }
 
    private void thenPostShouldBeNotBlocked(long startTime) {
        long afterTime = System.currentTimeMillis();
        assertTrue((afterTime-startTime)<1000);
    }
 
    private void thenPostShouldBeDoneWithParallel(long startTime, CountDownLatch latch) throws InterruptedException {
        latch.await();
        long afterTime = System.currentTimeMillis();
        assertTrue((afterTime-startTime)>1000);
    }
 
    @Test
    public void testAsyncEventBus() throws InterruptedException {
        givenEventBusRegisterTwoGoodHandler(asyncEventBus);
 
        CountDownLatch latch = new CountDownLatch(2);
        MJGotGodGuyCardEvent event = givenDelayedMJGotGodGuyCardEvent(latch);
 
        long startTime = System.currentTimeMillis();
        asyncEventBus.post(event);
 
        thenPostShouldBeNotBlocked(startTime);
        thenPostShouldBeDoneWithParallel(startTime, latch);
        thenTheHandlerShouldReceiveTheEvent();
    }
}

Notes

  • 我使用的guava版本為30.0-jre。
  • 預設情況下,Handler處理發生例外時,並不影響工作繼續進行。
  • 可以透過實做SubscriberExceptionHandler去達到自己的例外處理需求,可由constructor去注入。