差異處

這裏顯示兩個版本的差異處。

連向這個比對檢視

Both sides previous revision 前次修改
下次修改
前次修改
java:apache_camel:aggregator:helloworld [2019/03/17 21:42]
tony [AggregatorGroupRouteBuilder]
java:apache_camel:aggregator:helloworld [2023/06/25 09:48] (目前版本)
行 1: 行 1:
 +{{tag>​camel}}
 ====== Camel - Aggregator Hello World ====== ====== Camel - Aggregator Hello World ======
 ===== Introduction ===== ===== Introduction =====
-Camel Aggregator提供開發人員能夠將大量訊息合成一個的功能。假設你的系統會接受外來通知,並將內容寫進資料庫中;一次連線寫一筆資料會比較快,還是透過一次連線寫入五筆資料快呢?​ 正常來說,一次連線寫入五筆同屬性資料會是比較快的。它與[[java:​apache_camel:​throttler:​helloworld|Throttler]]的最大區別在於,Throttler限制了client的請求,而Aggreator則是收集起來一次請求。因此,如果使用Aggreator,你的程式必要有對應的處理方式。\\+Camel Aggregator提供開發人員能夠將大量訊息合成一個的功能。假設你的系統會接受外來通知,並將內容寫進資料庫中;一次連線寫一筆資料會比較快,還是透過一次連線寫入五筆資料快呢?​ 正常來說,一次連線寫入五筆同屬性資料會是比較快的。它與[[java:​apache_camel:​throttler:​helloworld|Throttler]]的最大區別在於,Throttler限制了client的請求,而Aggreator則是收集起來一次"​後送"​。因此,如果使用Aggreator,你的程式必要有對應的處理方式。\\
 \\ \\
 我將透過HTTP GET請求/​events/​{id}做為範例,說明如何使用Aggregator去將單位時間內的請求,以{id}去分組並Aggredate成各別分組訊息。本篇文章中使用到兩個RouteBuilder,分別為RestRouteBuilder與AggregatorGroupRouteBuilder。RestRouteBuilder負責REST核心相關設定,可參考[[java:​apache_camel:​throttler:​helloworld|先前文章]]。接下來直接說明AggregatorGroupRouteBuilder。 我將透過HTTP GET請求/​events/​{id}做為範例,說明如何使用Aggregator去將單位時間內的請求,以{id}去分組並Aggredate成各別分組訊息。本篇文章中使用到兩個RouteBuilder,分別為RestRouteBuilder與AggregatorGroupRouteBuilder。RestRouteBuilder負責REST核心相關設定,可參考[[java:​apache_camel:​throttler:​helloworld|先前文章]]。接下來直接說明AggregatorGroupRouteBuilder。
 +\\
 +\\
 +(程式碼可參考[[https://​github.com/​frank007love/​CamelPractice|link]])
 ===== AggregatorGroupRouteBuilder ===== ===== AggregatorGroupRouteBuilder =====
 <code java> <code java>
行 42: 行 46:
 </​code>​ </​code>​
 以下是在configure中的幾個重點:​ 以下是在configure中的幾個重點:​
-  * aggregate(new GroupedExchangeAggregationStrategy()):​ 使用GroupedExchangeAggregationStrategy去做aggregate,aggregate的內容為Exchange。如果有任何條件限制,以Exchange去操作。 +  * aggregate(new GroupedExchangeAggregationStrategy()):​ 使用GroupedExchangeAggregationStrategy去做aggregate,aggregate的內容為Exchange。如果要加入條件限制,以Exchange去操作。 
-  * header("​id"​):​ 與aggregate一起使用,代表以header id去分組在這就是屬於eventid。 +  * header("​id"​):​ 與aggregate一起使用,代表以header id去分組在這範例中,指得就是event id。 
-  * completionInterval(period): ​做aggregate的單位時間。+  * completionInterval(period): ​以每period的單位時間去做aggregate
   * bean(eventHandler):​ 請求的處理者,必須要有能力處理aggregate後的訊息。   * bean(eventHandler):​ 請求的處理者,必須要有能力處理aggregate後的訊息。
 +接下來讓我們透過單元測試展示效果。
 +===== Unit Test =====
 +測試有兩個目標,testGroupedExchange用以確認aggregate group後的結果,另外一個testCompletionInterval則是確認單位時間設定的作用。以下為程式碼主要結構,主要測試程式碼稍後做說明:​
 +<code java>
 +package org.tonylin.practice.camel.aggregator;​
  
 +import static com.google.common.base.Preconditions.checkState;​
  
 +import java.util.Arrays;​
 +import java.util.List;​
 +import java.util.Map;​
 +import java.util.concurrent.ConcurrentHashMap;​
 +import java.util.concurrent.CountDownLatch;​
 +import java.util.concurrent.TimeUnit;​
 +import java.util.stream.Collectors;​
 +
 +import org.apache.camel.Exchange;​
 +import org.apache.camel.Handler;​
 +import org.apache.camel.RoutesBuilder;​
 +import org.apache.camel.test.junit4.CamelTestSupport;​
 +import org.apache.http.HttpResponse;​
 +import org.apache.http.client.HttpClient;​
 +import org.apache.http.client.methods.HttpGet;​
 +import org.apache.http.impl.client.HttpClientBuilder;​
 +import org.junit.Test;​
 +import org.tonylin.practice.camel.rest.RestRouteBuilder;​
 +
 +public class AggregatorGroupRouteBuilderTest extends CamelTestSupport ​ {
 +
 + private RestHandler hander = new RestHandler();​
 +
 + private HttpClient client = HttpClientBuilder.create().build();​
 +
 + @Override
 + protected RoutesBuilder[] createRouteBuilders() throws Exception {
 + AggregatorGroupRouteBuilder aggregatorGroupRouteBuilder = new AggregatorGroupRouteBuilder(hander);​
 + aggregatorGroupRouteBuilder.setPeriod(500);​
 +
 + return new RoutesBuilder[] {
 + new RestRouteBuilder(),​
 + aggregatorGroupRouteBuilder
 + };
 + }
 +
 + @Test
 + public void testGroupedExchange() throws Exception {
 + // skip
 + }
 + @Test
 + public void testCompletionInterval() throws Exception {
 + // skip
 + }
 +}
 +</​code>​
 +此測試中的RestHandler程式碼如下,它會透過GROUPED_EXCHANGE取得aggregator處理後的內容,並根據header id儲存到map中,供測試程式碼做assertion。除此之外,在這裡使用CountDownLatch了去確認有收到預期訊息數量:​
 +<code java>
 +public static class RestHandler {
 + private Map<​String,​ List<​Exchange>>​ requestData = new ConcurrentHashMap<​String,​ List<​Exchange>>​();​
 + private CountDownLatch countDownLatch;​
 +
 + @SuppressWarnings("​unchecked"​)
 + @Handler
 + public void handle(Exchange exchange) {
 + List<​Exchange>​ groupExchanges = exchange.getProperty(Exchange.GROUPED_EXCHANGE,​ List.class);​
 + String id = groupExchanges.get(0).getIn().getHeader("​id",​ String.class);​
 + requestData.put(id,​ groupExchanges);​
 +
 + if( countDownLatch != null )
 + countDownLatch.countDown();​
 + }
 +
 + public Map<​String,​ List<​Exchange>>​ getRequestData(){
 + return requestData;​
 + }
 +
 + public void expectNum(int num) {
 + countDownLatch = new CountDownLatch(num);​
 + }
 +
 + public void waitCompletion(int timeout) throws InterruptedException {
 + checkState(countDownLatch!=null);​
 + countDownLatch.await(timeout,​ TimeUnit.MILLISECONDS);​
 + }
 +
 + public void clear() {
 + requestData.clear();​
 + }
 +}
 +</​code>​
 +首先讓我說明testGroupedExchange。測試程式碼中,送了四個訊息,其中包含了三組不同id;這樣的請求,我們預期RestHandler中,總共會收到三組訊息,其中id 123那組,應包含兩個訊息:​
 +<code java>
 +private HttpResponse requestWithEventId(String id) {
 + try {
 + HttpGet httpGet = new HttpGet("​http://​localhost:​8080/​events/"​ + id);
 + return client.execute(httpGet);​
 + } catch( Exception e ) {
 + throw new RuntimeException(e);​
 + }
 +}
 +
 +@Test
 +public void testGroupedExchange() throws Exception {
 + // given request event id
 + List<​String>​ eventIds = Arrays.asList("​123",​ "​123",​ "​456",​ "​789"​);​
 + hander.expectNum(3);​
 +
 + // when
 + List<​HttpResponse>​ responses = eventIds.parallelStream().map(this::​requestWithEventId).collect(Collectors.toList());​
 +
 + // then
 + responses.forEach(response->​{
 + assertEquals(200,​ response.getStatusLine().getStatusCode());​
 + });
 +
 + hander.waitCompletion(1000);​
 +
 + Map<​String,​ List<​Exchange>>​ requestData = hander.getRequestData();​
 + assertEquals(3,​ requestData.size());​
 + assertEquals(2,​ requestData.get("​123"​).size());​
 + assertEquals(1,​ requestData.get("​456"​).size());​
 + assertEquals(1,​ requestData.get("​789"​).size());​
 +}
 +</​code>​
 +從上述程式碼中,可以發現hander.waitCompletion(1000)是放在確認response之後;所以client只要將訊息發送到aggregator後,就會拿到response code,並不會等到全部處理結束才回去。\\
 +\\
 +接著是testCompletionInterval。這裡我直接透過請求相同id兩次,並分兩輪送出。這樣做可以確認aggregator有做到根據completionInterval的分批效果:​
 +<code java>
 +private void requestTwiceWithId(String id) throws Exception {
 + // request event {id} twice
 + List<​String>​ eventIds = Arrays.asList(id,​ id);
 + hander.expectNum(1);​
 + List<​HttpResponse>​ responses = eventIds.parallelStream().map(this::​requestWithEventId).collect(Collectors.toList());​
 + responses.forEach(response->​{
 + assertEquals(200,​ response.getStatusLine().getStatusCode());​
 + });
 +
 + hander.waitCompletion(1000);​
 +
 + // confirm request result
 + Map<​String,​ List<​Exchange>>​ requestData = hander.getRequestData();​
 + assertEquals(1,​ requestData.size());​
 + assertEquals(2,​ requestData.get("​123"​).size());​
 +
 + // clear first request data
 + hander.clear();​
 + assertTrue(hander.getRequestData().isEmpty());​
 +}
 +
 +@Test
 +public void testCompletionInterval() throws Exception {
 + requestTwiceWithId("​123"​);​
 + requestTwiceWithId("​123"​);​
 +}
 +</​code>​
 +Camel Aggregator其實提供了很多細部操作,你也可以根據自身需求做AggregationStrategy;但時間有限,請容我以後有機會再分享。
 +===== Library Info (Gradle Config) =====
 +以下是我在寫這篇文章時,所使用的libraries版本:​
 +<​code>​
 +ext {
 + camelVersion='​2.23.1'​
 + nettyAllVersion='​4.1.34.Final'​
 + guavaVersion='​27.1-jre'​
 + log4jVersion='​1.2.17'​
 + slf4jVersion='​1.7.26'​
 + httpClientVersion='​4.5.7'​
 +}
 +
 +dependencies {
 +    compile group: '​org.apache.camel',​ name: '​camel-core',​ version: "​$camelVersion"​
 +    compile group: '​org.apache.camel',​ name: '​camel-netty4-http',​ version: "​$camelVersion"​
 +    compile group: '​org.apache.camel',​ name: '​camel-http-common',​ version: "​$camelVersion"​
 +    compile group: '​org.apache.camel',​ name: '​camel-netty4',​ version: "​$camelVersion"​
 +    compile group: '​io.netty',​ name: '​netty-all',​ version: "​$nettyAllVersion"​
 +    compile group: '​com.google.guava',​ name: '​guava',​ version: "​$guavaVersion"​
 +    compile group: '​log4j',​ name: '​log4j',​ version: "​$log4jVersion"​
 +    compile group: '​org.slf4j',​ name: '​slf4j-api',​ version: "​$slf4jVersion"​
 +    runtime group: '​org.slf4j',​ name: '​slf4j-log4j12',​ version: "​$slf4jVersion"​
 +    testCompile group: '​org.apache.camel',​ name: '​camel-test',​ version: "​$camelVersion"​
 +    testCompile group: '​org.apache.httpcomponents',​ name: '​httpclient',​ version: "​$httpClientVersion"​
 +    testCompile '​junit:​junit:​4.12'​
 +}
 +</​code>​
  
 ===== Reference ===== ===== Reference =====