目錄表

,

EventBus Basic Usage

Introduction

最近在看Domain Event的實作方式,剛好看到Teddy分享使用Guava的EventBus來處理事件的註冊與分派,於是花了一些時間實驗API的用法。以下內容分享給大家。Source code可以從link下載。

How to?

Event

我的範例事件為MJGotGodGuyCardEvent:

public class MJGotGodGuyCardEvent {
 
    private final String girlName;
 
    public MJGotGodGuyCardEvent(String girlName) {
        this.girlName = girlName;
    }
 
    public String getGirlName() {
        return girlName;
    }
 
}

Event Handler

我的事件處理者有兩個,第一個為MJGoToPronhub。需注意的是:

public class MJGoToPronhub {
    private static Logger logger = LoggerFactory.getLogger(MJGoToPronhub.class);
    private List<MJGotGodGuyCardEvent> receivedEvents = new CopyOnWriteArrayList<>();
 
    @AllowConcurrentEvents
    @Subscribe
    public void handle(MJGotGodGuyCardEvent event) {
        receivedEvents.add(event);
        logger.info("MJGoToPronhub due to {}", event.getGirlName());
    }
 
    public List<MJGotGodGuyCardEvent> getReceivedEvents(){
        return receivedEvents;
    }
 
}
另外一個實作內容類似,不做贅述,名稱為MJGoToWanhua。

EventBus

用法相當簡單,只要透過EventBus的register把處理物件進去後,在負責發送通知的client使用post即可“循序”的讓handler處理訊息:

public class TestEventBus {
 
    private EventBus eventBus = new EventBus();
 
    private MJGoToPronhub MJGoToPronhubHandler = new MJGoToPronhub();
    private MJGoToWanhua MJGoToWanhuaHandler = new MJGoToWanhua();
 
    private void thenTheHandlerShouldReceiveTheEvent() {
        assertEquals(1, MJGoToPronhubHandler.getReceivedEvents().size());
        assertEquals(1, MJGoToWanhuaHandler.getReceivedEvents().size());
    }
 
    private void givenEventBusRegisterTwoGoodHandler(EventBus eventBus) {
        eventBus.register(MJGoToPronhubHandler);
        eventBus.register(MJGoToWanhuaHandler);
    }
 
    @Test
    public void ShouldGetReceivedEventsWhenPostEventToHandlers() {
        givenEventBusRegisterTwoGoodHandler(eventBus);
        MJGotGodGuyCardEvent event = new MJGotGodGuyCardEvent("Nancy");
 
        eventBus.post(event);
 
        thenTheHandlerShouldReceiveTheEvent();
    }
}

AsyncEventBus

假如覺得循序處理太慢,可以使用AsyncEventBus,使用方法與EventBus相同,但它post是non-blocking的:

public class TestAsyncEventBus {
    private AsyncEventBus asyncEventBus = new AsyncEventBus(Executors.newCachedThreadPool());
 
    private MJGoToPronhub MJGoToPronhubHandler = new MJGoToPronhub();
    private MJGoToWanhua MJGoToWanhuaHandler = new MJGoToWanhua();
 
    private void thenTheHandlerShouldReceiveTheEvent() {
        assertEquals(1, MJGoToPronhubHandler.getReceivedEvents().size());
        assertEquals(1, MJGoToWanhuaHandler.getReceivedEvents().size());
    }
 
    private void givenEventBusRegisterTwoGoodHandler(EventBus eventBus) {
        eventBus.register(MJGoToPronhubHandler);
        eventBus.register(MJGoToWanhuaHandler);
    }
 
    private MJGotGodGuyCardEvent givenDelayedMJGotGodGuyCardEvent(CountDownLatch latch) {
        return new MJGotGodGuyCardEvent("Nancy") {
            @Override
            public String getGirlName() {
                try {
                    TimeUnit.SECONDS.sleep(1);
                    return super.getGirlName();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } finally {
                    latch.countDown();
                }
            }
        };
    }
 
    private void thenPostShouldBeNotBlocked(long startTime) {
        long afterTime = System.currentTimeMillis();
        assertTrue((afterTime-startTime)<1000);
    }
 
    private void thenPostShouldBeDoneWithParallel(long startTime, CountDownLatch latch) throws InterruptedException {
        latch.await();
        long afterTime = System.currentTimeMillis();
        assertTrue((afterTime-startTime)>1000);
    }
 
    @Test
    public void testAsyncEventBus() throws InterruptedException {
        givenEventBusRegisterTwoGoodHandler(asyncEventBus);
 
        CountDownLatch latch = new CountDownLatch(2);
        MJGotGodGuyCardEvent event = givenDelayedMJGotGodGuyCardEvent(latch);
 
        long startTime = System.currentTimeMillis();
        asyncEventBus.post(event);
 
        thenPostShouldBeNotBlocked(startTime);
        thenPostShouldBeDoneWithParallel(startTime, latch);
        thenTheHandlerShouldReceiveTheEvent();
    }
}

Notes

Reference