差異處
這裏顯示兩個版本的差異處。
Both sides previous revision 前次修改 下次修改 | 前次修改 | ||
java:apache_camel:aggregator:helloworld [2019/03/17 21:42] tony [AggregatorGroupRouteBuilder] |
java:apache_camel:aggregator:helloworld [2023/06/25 09:48] (目前版本) |
||
---|---|---|---|
行 1: | 行 1: | ||
+ | {{tag>camel}} | ||
====== Camel - Aggregator Hello World ====== | ====== Camel - Aggregator Hello World ====== | ||
===== Introduction ===== | ===== Introduction ===== | ||
- | Camel Aggregator提供開發人員能夠將大量訊息合成一個的功能。假設你的系統會接受外來通知,並將內容寫進資料庫中;一次連線寫一筆資料會比較快,還是透過一次連線寫入五筆資料快呢? 正常來說,一次連線寫入五筆同屬性資料會是比較快的。它與[[java:apache_camel:throttler:helloworld|Throttler]]的最大區別在於,Throttler限制了client的請求,而Aggreator則是收集起來一次請求。因此,如果使用Aggreator,你的程式必須要有對應的處理方式。\\ | + | Camel Aggregator提供開發人員能夠將大量訊息合成一個的功能。假設你的系統會接受外來通知,並將內容寫進資料庫中;一次連線寫一筆資料會比較快,還是透過一次連線寫入五筆資料快呢? 正常來說,一次連線寫入五筆同屬性資料會是比較快的。它與[[java:apache_camel:throttler:helloworld|Throttler]]的最大區別在於,Throttler限制了client的請求,而Aggreator則是收集起來一次"後送"。因此,如果使用Aggreator,你的程式必需要有對應的處理方式。\\ |
\\ | \\ | ||
我將透過HTTP GET請求/events/{id}做為範例,說明如何使用Aggregator去將單位時間內的請求,以{id}去分組並Aggredate成各別分組訊息。本篇文章中使用到兩個RouteBuilder,分別為RestRouteBuilder與AggregatorGroupRouteBuilder。RestRouteBuilder負責REST核心相關設定,可參考[[java:apache_camel:throttler:helloworld|先前文章]]。接下來直接說明AggregatorGroupRouteBuilder。 | 我將透過HTTP GET請求/events/{id}做為範例,說明如何使用Aggregator去將單位時間內的請求,以{id}去分組並Aggredate成各別分組訊息。本篇文章中使用到兩個RouteBuilder,分別為RestRouteBuilder與AggregatorGroupRouteBuilder。RestRouteBuilder負責REST核心相關設定,可參考[[java:apache_camel:throttler:helloworld|先前文章]]。接下來直接說明AggregatorGroupRouteBuilder。 | ||
+ | \\ | ||
+ | \\ | ||
+ | (程式碼可參考[[https://github.com/frank007love/CamelPractice|link]]) | ||
===== AggregatorGroupRouteBuilder ===== | ===== AggregatorGroupRouteBuilder ===== | ||
<code java> | <code java> | ||
行 42: | 行 46: | ||
</code> | </code> | ||
以下是在configure中的幾個重點: | 以下是在configure中的幾個重點: | ||
- | * aggregate(new GroupedExchangeAggregationStrategy()): 使用GroupedExchangeAggregationStrategy去做aggregate,aggregate的內容為Exchange。如果有任何條件限制,要以Exchange去操作。 | + | * aggregate(new GroupedExchangeAggregationStrategy()): 使用GroupedExchangeAggregationStrategy去做aggregate,aggregate的內容為Exchange。如果要加入條件限制,是以Exchange去操作。 |
- | * header("id"): 與aggregate一起使用,代表以header id去分組,在這裡就是屬於event的id。 | + | * header("id"): 與aggregate一起使用,代表以header id去分組。在這範例中,指得就是event id。 |
- | * completionInterval(period): 做aggregate的單位時間。 | + | * completionInterval(period): 以每period的單位時間去做aggregate。 |
* bean(eventHandler): 請求的處理者,必須要有能力處理aggregate後的訊息。 | * bean(eventHandler): 請求的處理者,必須要有能力處理aggregate後的訊息。 | ||
+ | 接下來讓我們透過單元測試展示效果。 | ||
+ | ===== Unit Test ===== | ||
+ | 測試有兩個目標,testGroupedExchange用以確認aggregate group後的結果,另外一個testCompletionInterval則是確認單位時間設定的作用。以下為程式碼主要結構,主要測試程式碼稍後做說明: | ||
+ | <code java> | ||
+ | package org.tonylin.practice.camel.aggregator; | ||
+ | import static com.google.common.base.Preconditions.checkState; | ||
+ | import java.util.Arrays; | ||
+ | import java.util.List; | ||
+ | import java.util.Map; | ||
+ | import java.util.concurrent.ConcurrentHashMap; | ||
+ | import java.util.concurrent.CountDownLatch; | ||
+ | import java.util.concurrent.TimeUnit; | ||
+ | import java.util.stream.Collectors; | ||
+ | |||
+ | import org.apache.camel.Exchange; | ||
+ | import org.apache.camel.Handler; | ||
+ | import org.apache.camel.RoutesBuilder; | ||
+ | import org.apache.camel.test.junit4.CamelTestSupport; | ||
+ | import org.apache.http.HttpResponse; | ||
+ | import org.apache.http.client.HttpClient; | ||
+ | import org.apache.http.client.methods.HttpGet; | ||
+ | import org.apache.http.impl.client.HttpClientBuilder; | ||
+ | import org.junit.Test; | ||
+ | import org.tonylin.practice.camel.rest.RestRouteBuilder; | ||
+ | |||
+ | public class AggregatorGroupRouteBuilderTest extends CamelTestSupport { | ||
+ | |||
+ | private RestHandler hander = new RestHandler(); | ||
+ | |||
+ | private HttpClient client = HttpClientBuilder.create().build(); | ||
+ | |||
+ | @Override | ||
+ | protected RoutesBuilder[] createRouteBuilders() throws Exception { | ||
+ | AggregatorGroupRouteBuilder aggregatorGroupRouteBuilder = new AggregatorGroupRouteBuilder(hander); | ||
+ | aggregatorGroupRouteBuilder.setPeriod(500); | ||
+ | |||
+ | return new RoutesBuilder[] { | ||
+ | new RestRouteBuilder(), | ||
+ | aggregatorGroupRouteBuilder | ||
+ | }; | ||
+ | } | ||
+ | |||
+ | @Test | ||
+ | public void testGroupedExchange() throws Exception { | ||
+ | // skip | ||
+ | } | ||
+ | @Test | ||
+ | public void testCompletionInterval() throws Exception { | ||
+ | // skip | ||
+ | } | ||
+ | } | ||
+ | </code> | ||
+ | 此測試中的RestHandler程式碼如下,它會透過GROUPED_EXCHANGE取得aggregator處理後的內容,並根據header id儲存到map中,供測試程式碼做assertion。除此之外,在這裡使用CountDownLatch了去確認有收到預期訊息數量: | ||
+ | <code java> | ||
+ | public static class RestHandler { | ||
+ | private Map<String, List<Exchange>> requestData = new ConcurrentHashMap<String, List<Exchange>>(); | ||
+ | private CountDownLatch countDownLatch; | ||
+ | |||
+ | @SuppressWarnings("unchecked") | ||
+ | @Handler | ||
+ | public void handle(Exchange exchange) { | ||
+ | List<Exchange> groupExchanges = exchange.getProperty(Exchange.GROUPED_EXCHANGE, List.class); | ||
+ | String id = groupExchanges.get(0).getIn().getHeader("id", String.class); | ||
+ | requestData.put(id, groupExchanges); | ||
+ | |||
+ | if( countDownLatch != null ) | ||
+ | countDownLatch.countDown(); | ||
+ | } | ||
+ | |||
+ | public Map<String, List<Exchange>> getRequestData(){ | ||
+ | return requestData; | ||
+ | } | ||
+ | |||
+ | public void expectNum(int num) { | ||
+ | countDownLatch = new CountDownLatch(num); | ||
+ | } | ||
+ | |||
+ | public void waitCompletion(int timeout) throws InterruptedException { | ||
+ | checkState(countDownLatch!=null); | ||
+ | countDownLatch.await(timeout, TimeUnit.MILLISECONDS); | ||
+ | } | ||
+ | |||
+ | public void clear() { | ||
+ | requestData.clear(); | ||
+ | } | ||
+ | } | ||
+ | </code> | ||
+ | 首先讓我說明testGroupedExchange。測試程式碼中,送了四個訊息,其中包含了三組不同id;這樣的請求,我們預期RestHandler中,總共會收到三組訊息,其中id 123那組,應包含兩個訊息: | ||
+ | <code java> | ||
+ | private HttpResponse requestWithEventId(String id) { | ||
+ | try { | ||
+ | HttpGet httpGet = new HttpGet("http://localhost:8080/events/" + id); | ||
+ | return client.execute(httpGet); | ||
+ | } catch( Exception e ) { | ||
+ | throw new RuntimeException(e); | ||
+ | } | ||
+ | } | ||
+ | |||
+ | @Test | ||
+ | public void testGroupedExchange() throws Exception { | ||
+ | // given request event id | ||
+ | List<String> eventIds = Arrays.asList("123", "123", "456", "789"); | ||
+ | hander.expectNum(3); | ||
+ | |||
+ | // when | ||
+ | List<HttpResponse> responses = eventIds.parallelStream().map(this::requestWithEventId).collect(Collectors.toList()); | ||
+ | |||
+ | // then | ||
+ | responses.forEach(response->{ | ||
+ | assertEquals(200, response.getStatusLine().getStatusCode()); | ||
+ | }); | ||
+ | |||
+ | hander.waitCompletion(1000); | ||
+ | |||
+ | Map<String, List<Exchange>> requestData = hander.getRequestData(); | ||
+ | assertEquals(3, requestData.size()); | ||
+ | assertEquals(2, requestData.get("123").size()); | ||
+ | assertEquals(1, requestData.get("456").size()); | ||
+ | assertEquals(1, requestData.get("789").size()); | ||
+ | } | ||
+ | </code> | ||
+ | 從上述程式碼中,可以發現hander.waitCompletion(1000)是放在確認response之後;所以client只要將訊息發送到aggregator後,就會拿到response code,並不會等到全部處理結束才回去。\\ | ||
+ | \\ | ||
+ | 接著是testCompletionInterval。這裡我直接透過請求相同id兩次,並分兩輪送出。這樣做可以確認aggregator有做到根據completionInterval的分批效果: | ||
+ | <code java> | ||
+ | private void requestTwiceWithId(String id) throws Exception { | ||
+ | // request event {id} twice | ||
+ | List<String> eventIds = Arrays.asList(id, id); | ||
+ | hander.expectNum(1); | ||
+ | List<HttpResponse> responses = eventIds.parallelStream().map(this::requestWithEventId).collect(Collectors.toList()); | ||
+ | responses.forEach(response->{ | ||
+ | assertEquals(200, response.getStatusLine().getStatusCode()); | ||
+ | }); | ||
+ | |||
+ | hander.waitCompletion(1000); | ||
+ | |||
+ | // confirm request result | ||
+ | Map<String, List<Exchange>> requestData = hander.getRequestData(); | ||
+ | assertEquals(1, requestData.size()); | ||
+ | assertEquals(2, requestData.get("123").size()); | ||
+ | |||
+ | // clear first request data | ||
+ | hander.clear(); | ||
+ | assertTrue(hander.getRequestData().isEmpty()); | ||
+ | } | ||
+ | |||
+ | @Test | ||
+ | public void testCompletionInterval() throws Exception { | ||
+ | requestTwiceWithId("123"); | ||
+ | requestTwiceWithId("123"); | ||
+ | } | ||
+ | </code> | ||
+ | Camel Aggregator其實提供了很多細部操作,你也可以根據自身需求做AggregationStrategy;但時間有限,請容我以後有機會再分享。 | ||
+ | ===== Library Info (Gradle Config) ===== | ||
+ | 以下是我在寫這篇文章時,所使用的libraries版本: | ||
+ | <code> | ||
+ | ext { | ||
+ | camelVersion='2.23.1' | ||
+ | nettyAllVersion='4.1.34.Final' | ||
+ | guavaVersion='27.1-jre' | ||
+ | log4jVersion='1.2.17' | ||
+ | slf4jVersion='1.7.26' | ||
+ | httpClientVersion='4.5.7' | ||
+ | } | ||
+ | |||
+ | dependencies { | ||
+ | compile group: 'org.apache.camel', name: 'camel-core', version: "$camelVersion" | ||
+ | compile group: 'org.apache.camel', name: 'camel-netty4-http', version: "$camelVersion" | ||
+ | compile group: 'org.apache.camel', name: 'camel-http-common', version: "$camelVersion" | ||
+ | compile group: 'org.apache.camel', name: 'camel-netty4', version: "$camelVersion" | ||
+ | compile group: 'io.netty', name: 'netty-all', version: "$nettyAllVersion" | ||
+ | compile group: 'com.google.guava', name: 'guava', version: "$guavaVersion" | ||
+ | compile group: 'log4j', name: 'log4j', version: "$log4jVersion" | ||
+ | compile group: 'org.slf4j', name: 'slf4j-api', version: "$slf4jVersion" | ||
+ | runtime group: 'org.slf4j', name: 'slf4j-log4j12', version: "$slf4jVersion" | ||
+ | testCompile group: 'org.apache.camel', name: 'camel-test', version: "$camelVersion" | ||
+ | testCompile group: 'org.apache.httpcomponents', name: 'httpclient', version: "$httpClientVersion" | ||
+ | testCompile 'junit:junit:4.12' | ||
+ | } | ||
+ | </code> | ||
===== Reference ===== | ===== Reference ===== |