差異處
這裏顯示兩個版本的差異處。
Both sides previous revision 前次修改 下次修改 | 前次修改 | ||
java:apache_camel:aggregator:helloworld [2019/03/17 21:56] tony [Unit Test] |
java:apache_camel:aggregator:helloworld [2023/06/25 09:48] (目前版本) |
||
---|---|---|---|
行 1: | 行 1: | ||
+ | {{tag>camel}} | ||
====== Camel - Aggregator Hello World ====== | ====== Camel - Aggregator Hello World ====== | ||
===== Introduction ===== | ===== Introduction ===== | ||
- | Camel Aggregator提供開發人員能夠將大量訊息合成一個的功能。假設你的系統會接受外來通知,並將內容寫進資料庫中;一次連線寫一筆資料會比較快,還是透過一次連線寫入五筆資料快呢? 正常來說,一次連線寫入五筆同屬性資料會是比較快的。它與[[java:apache_camel:throttler:helloworld|Throttler]]的最大區別在於,Throttler限制了client的請求,而Aggreator則是收集起來一次請求。因此,如果使用Aggreator,你的程式必須要有對應的處理方式。\\ | + | Camel Aggregator提供開發人員能夠將大量訊息合成一個的功能。假設你的系統會接受外來通知,並將內容寫進資料庫中;一次連線寫一筆資料會比較快,還是透過一次連線寫入五筆資料快呢? 正常來說,一次連線寫入五筆同屬性資料會是比較快的。它與[[java:apache_camel:throttler:helloworld|Throttler]]的最大區別在於,Throttler限制了client的請求,而Aggreator則是收集起來一次"後送"。因此,如果使用Aggreator,你的程式必需要有對應的處理方式。\\ |
\\ | \\ | ||
我將透過HTTP GET請求/events/{id}做為範例,說明如何使用Aggregator去將單位時間內的請求,以{id}去分組並Aggredate成各別分組訊息。本篇文章中使用到兩個RouteBuilder,分別為RestRouteBuilder與AggregatorGroupRouteBuilder。RestRouteBuilder負責REST核心相關設定,可參考[[java:apache_camel:throttler:helloworld|先前文章]]。接下來直接說明AggregatorGroupRouteBuilder。 | 我將透過HTTP GET請求/events/{id}做為範例,說明如何使用Aggregator去將單位時間內的請求,以{id}去分組並Aggredate成各別分組訊息。本篇文章中使用到兩個RouteBuilder,分別為RestRouteBuilder與AggregatorGroupRouteBuilder。RestRouteBuilder負責REST核心相關設定,可參考[[java:apache_camel:throttler:helloworld|先前文章]]。接下來直接說明AggregatorGroupRouteBuilder。 | ||
+ | \\ | ||
+ | \\ | ||
+ | (程式碼可參考[[https://github.com/frank007love/CamelPractice|link]]) | ||
===== AggregatorGroupRouteBuilder ===== | ===== AggregatorGroupRouteBuilder ===== | ||
<code java> | <code java> | ||
行 42: | 行 46: | ||
</code> | </code> | ||
以下是在configure中的幾個重點: | 以下是在configure中的幾個重點: | ||
- | * aggregate(new GroupedExchangeAggregationStrategy()): 使用GroupedExchangeAggregationStrategy去做aggregate,aggregate的內容為Exchange。如果有任何條件限制,要以Exchange去操作。 | + | * aggregate(new GroupedExchangeAggregationStrategy()): 使用GroupedExchangeAggregationStrategy去做aggregate,aggregate的內容為Exchange。如果要加入條件限制,是以Exchange去操作。 |
- | * header("id"): 與aggregate一起使用,代表以header id去分組,在這裡就是屬於event的id。 | + | * header("id"): 與aggregate一起使用,代表以header id去分組。在這範例中,指得就是event id。 |
- | * completionInterval(period): 做aggregate的單位時間。 | + | * completionInterval(period): 以每period的單位時間去做aggregate。 |
* bean(eventHandler): 請求的處理者,必須要有能力處理aggregate後的訊息。 | * bean(eventHandler): 請求的處理者,必須要有能力處理aggregate後的訊息。 | ||
接下來讓我們透過單元測試展示效果。 | 接下來讓我們透過單元測試展示效果。 | ||
行 169: | 行 173: | ||
} | } | ||
</code> | </code> | ||
- | 接著是testCompletionInterval,這裡我直接透過請求相同id兩次,並分兩輪送出。這樣做可以確認aggregator有做到根據completionInterval的分批效果: | + | 從上述程式碼中,可以發現hander.waitCompletion(1000)是放在確認response之後;所以client只要將訊息發送到aggregator後,就會拿到response code,並不會等到全部處理結束才回去。\\ |
+ | \\ | ||
+ | 接著是testCompletionInterval。這裡我直接透過請求相同id兩次,並分兩輪送出。這樣做可以確認aggregator有做到根據completionInterval的分批效果: | ||
<code java> | <code java> | ||
private void requestTwiceWithId(String id) throws Exception { | private void requestTwiceWithId(String id) throws Exception { |