Advanced cloning and aggregation with streaming

This article demonstrates the capability of the UltraESB to split messages and aggregate responses, and in particular highlights the capability to stream chunks of responses aggregated as-soon-as-they-become-available; while handling premature timeouts thus preventing the remote client from receiving a partial response on a timeout.

Note: This is an advanced example solving a complex scenario, and was driven by a user request for a proof-of-concept.

Scenario - Problem definition

The scenario simulates multiple concurrent users issuing JSON requests of the form given below, with a request to multiply two variables v1 and v2, along with a text message:

 { "message" : "hello world, I am client N!", "v1" : 100, "v2" : 300}

There are three backend service endpoints servicing such JSON requests, but each expecting a slightly different message format. Thus each such message by a user must be cloned and transformed into a format suitable to each destination backend systems In this example, the format expected by the backend services are:

{ "prefix_for_Server <n>" : "Hello Server <n> - hello world, I am client N!", "v1" : 100, "v2" : 300}

Each backend JSON service responds to such a request within a random delay of 1200ms as follows, adding a "result" as v1 * v2:

{ "prefix_for_Server <n>" : "Welcome to Server <n>", "v1" : 100, "v2" : 300, "result", 30000}

These responses now needs to be aggregated and sent back to the client as one single message, where each <part i> for i=1,2,3 denotes a response as shown above.

{ "merged": [ <part 1>, <part 2>, <part 3> ]}

We first consider aggregation without a timeout, and then move onto the more advanced scenario, where we aggregate responses received within 1000 ms, and handle premature timeouts correctly with partial - but valid and complete results.

Aggregation without timeouts

First let us consider the aggregation of the responses without a timeout. This expects all three services to respond in-order to send a complete response back to the client. The UltraESB sample configuration # 211 and the Unit test case JsonLoadTest using JUnitPerf load tests this configuration with multiple concurrent users.

Simple in-memory aggregation

When using simple in-memory aggregation, the responses received are collected until all expected responses are received. When the collection completes, a new response message is injected into the outSequence for processing; with each of the responses aggregated as its attachments.

Here is the definition of the proxy service that handles simple in-memory aggregation. Refer to the sample #211 for the complete and annotated configuration.

<u:proxy id="merge-and-aggregate-proxy">
    <u:transport id="http-8280"/>
    <u:target>
        <u:inSequence>
            <u:java import="samples.json.SampleJsonTransformation;"><![CDATA[
                Message clone = msg.cloneMessage();
                mediation.setPayloadFromString(clone, SampleJsonTransformation.transformRequest(msg, "Server 1"));
                mediation.sendToEndpoint(clone, "json1");

                clone = msg.cloneMessage();
                mediation.setPayloadFromString(clone, SampleJsonTransformation.transformRequest(msg, "Server 2"));
                mediation.sendToEndpoint(clone, "json2");

                clone = msg.cloneMessage();
                mediation.setPayloadFromString(clone, SampleJsonTransformation.transformRequest(msg, "Server 3"));
                mediation.sendToEndpoint(clone, "json3");
            ]]></u:java>
        </u:inSequence>
        <u:outSequence>
            <u:java import="samples.json.JsonMerge; org.adroitlogic.ultraesb.api.helper.MessageAggregator;"><![CDATA[
                // the InMemoryMessageAggregator marks the 'merged' message with a property 'MERGED'
                if (msg.getMessageProperty("MERGED") == null) {
                    MessageAggregator fxd = mediation.getSpringBean("fxd", MessageAggregator.class);
                    fxd.aggregate(msg);
                } else {
                    JsonMerge.aggregateAsJsonResult(msg);
                    mediation.sendResponse(msg, 200);
                }
            ]]></u:java>
        </u:outSequence>
    </u:target>
</u:proxy>

<bean id="fxd" class="org.adroitlogic.ultraesb.core.helper.InMemoryMessageAggregator">
    <constructor-arg ref="ultra-config"/>
    <property name="completionSize" value="3"/>
    <property name="timeoutDurationMillis" value="1000"/>
</bean>

The merge-and-aggregate-proxy will send a received JSON message to three endpoints (json1, json2 and json3 - defined in the configuration) after cloning and transforming the message into three distinct payloads compatible with each endpoint. The sample uses a simple custom Java transformation SampleJsonTransformation.transformRequest() using the Jackson JSON library to convert a request to the form expected by the backend service.

All responses are received at the outSequence, where the InMemoryMessageAggregator bean is handed the response messages received. The InMemoryMessageAggregator is configured to aggregate 3 messages, and pushes the aggregated message - once complete - again into the outSequence, marked with a special property 'MERGED'
A merged message holds each individual response received as an attachment, and these attachments are then merged into a single payload using another simple Java class JsonMerge.aggregateAsJsonResult() to combine the parts into a single JSON response as { "merged": [ <part 1>, <part 2>, <part 3> ]}. The merged response is sent as one single response back to the original client

Streaming aggregation

When using streaming aggregation, the responses received are sent back to the client as "chunks" of a larger single response. The HTTP protocol streaming support allows the client to receive these chunks transparently as-soon-as-they-become available, without waiting until all three messages are received and aggregated. Using an advanced streaming parser such as the Jackson parser for JSON, a client is able to say update a UI as more data becomes available. To simulate a real-world scenario, the backend JSON services are configured to respond with a random delay of less than 1200ms.

Here is the definition of the proxy service that handles streaming aggregation. Refer to the sample #211 for the complete and annotated configuration.

<u:proxy id="stream-and-aggregate-proxy">
    <u:transport id="http-8280"/>
    <u:target>
    <u:inSequence>
        <u:java import="samples.json.SampleJsonTransformation;"><![CDATA[
            MessageAggregator agg = mediation.getSpringBean("agg", MessageAggregator.class);
            agg.beginTimer(msg);

            Message clone = msg.cloneMessage();
            mediation.setPayloadFromString(clone, SampleJsonTransformation.transformRequest(msg, "Server 1"));
            mediation.sendToEndpoint(clone, "json1");

            clone = msg.cloneMessage();
            mediation.setPayloadFromString(clone, SampleJsonTransformation.transformRequest(msg, "Server 2"));
            mediation.sendToEndpoint(clone, "json2");

            clone = msg.cloneMessage();
            mediation.setPayloadFromString(clone, SampleJsonTransformation.transformRequest(msg, "Server 3"));
            mediation.sendToEndpoint(clone, "json3");
        ]]></u:java>
    </u:inSequence>
    <u:outSequence>
        <u:java import="org.adroitlogic.ultraesb.api.helper.MessageAggregator; org.adroitlogic.ultraesb.core.helper.StreamingAggregator;"><![CDATA[
            if (msg.getMessageProperty(MessageAggregator.AggregationConstants.MERGED) == null) {
                MessageAggregator agg = mediation.getSpringBean("agg", MessageAggregator.class);
                agg.aggregate(msg);
            }
             
            if (msg.getMessageProperty(MessageAggregator.AggregationConstants.EXPIRED) != null) {
                 // an expired message arrives after the MessageAggregator timesout. Usually there is no use for them
                 return;
            }

           // StreamingAggregator marks messages as "part i of n" where i = 1 for the first, and i = n for last
           Integer part = (Integer) msg.getMessageProperty(StreamingAggregator.PART);
           Integer size = (Integer) msg.getMessageProperty(StreamingAggregator.SIZE);

           if (size < 0) {
               // size -1 indicates early termination using this last payload for closure
               if (part == 1) {
                   mediation.setPayloadFromString(msg, "{ \"merged\" : []}");
               } else {
                   mediation.setPayloadFromString(msg, " {} ]}");
               }
               
           } else {
               if (part == 1) {
                   String result = "{ \"merged\": [" + mediation.readPayloadAsString(msg) + ",";
                   mediation.setPayloadFromString(msg, result);
               } else if (part == size) {
               String result = mediation.readPayloadAsString(msg) + "]}";
               mediation.setPayloadFromString(msg, result);
               } else {
                   String result = mediation.readPayloadAsString(msg) + ",";
                   mediation.setPayloadFromString(msg, result);
               }
           }     
           mediation.sendPart(msg, 200, part, size);
       ]]></u:java>
   </u:outSequence>
 </u:target>
 </u:proxy>

<bean id="agg" class="org.adroitlogic.ultraesb.core.helper.StreamingAggregator">
    <constructor-arg ref="ultra-config"/>
    <property name="completionSize" value="3"/>
    <property name="timeoutDurationMillis" value="1000"/>
</bean>

The stream-and-aggregate-proxy will send a received JSON message to the three endpoints (json1, json2 and json3 - defined in the configuration) after cloning and transforming the message into three distinct payloads compatible with each endpoint. The sample uses the same custom Java transformation SampleJsonTransformation.transformRequest() using the Jackson JSON library to convert a request to the form expected by the backend service.

All responses are received at the outSequence, where the StreamingAggregator bean is handed the response message received. The StreamingAggregator is configured to aggregate 3 messages, and marks each new message as "part <i> of <n>" where "i" is the current message position of the total of "n" messages.

The streaming support of the UltraESB used now can send each of these parts "as soon as they become available" as chunks of a larger aggregated response. Thus part 1 is streamed to the client composed as :

{ "merged" : [ <part1>,

and each subsequent part (i.e. in this example, part 2) is streamed as

<part i>,

and the last part as

<part n> ]}

to form the complete JSON response to the client. Note that we inject custom Strings between message parts sent, so that they will collectively form a valid larger JSON response to the client reading the response. If we are dealing with an XML payload, a similar gluing-up-of-parts could be performed as required.

Aggregation with timeouts

Next we consider the aggregation of the responses with a timeout - set to one second. Note that the backend services are configured to respond with a random delay less than 1200ms, so that a load test will create aggregated responses with zero, one, two and three part response distribution. The configurations are fixed to respond back after completion, or after the timeout - irrespective of the number of responses actually received from the backend services, and is able to return an empty response as { "merged" : []} if no responses has yet been received.

Simple in-memory aggregation

When using simple in-memory aggregation, the responses received are collected until all expected responses are received or a timeout occurs. When the collection completes or on timeout, a new response message is injected for processing, with each of the responses aggregated as its attachments.

Here is the definition of the proxy service that handles simple in-memory aggregation. Refer to the sample #212 for the complete and annotated configuration.

<u:proxy id="merge-and-aggregate-proxy">
    <u:transport id="http-8280"/>
    <u:target>
        <u:inSequence>
            <u:java import="samples.json.SampleJsonTransformation;"><![CDATA[
                MessageAggregator fxd = mediation.getSpringBean("fxd", MessageAggregator.class);
                fxd.beginTimer(msg);

                Message clone = msg.cloneMessage();
                mediation.setPayloadFromString(clone, SampleJsonTransformation.transformRequest(msg, "Server 1"));
                mediation.sendToEndpoint(clone, "json1");

                clone = msg.cloneMessage();
                mediation.setPayloadFromString(clone, SampleJsonTransformation.transformRequest(msg, "Server 2"));
                mediation.sendToEndpoint(clone, "json2");

                clone = msg.cloneMessage();
                mediation.setPayloadFromString(clone, SampleJsonTransformation.transformRequest(msg, "Server 3"));
                mediation.sendToEndpoint(clone, "json3");
            ]]></u:java>
        </u:inSequence>
        <u:outSequence>
            <u:java import="samples.json.JsonMerge;"><![CDATA[
                // the InMemoryMessageAggregator marks the 'merged' message with a property 'MERGED'
                if (msg.getMessageProperty("MERGED") == null) {
                    MessageAggregator fxd = mediation.getSpringBean("fxd", MessageAggregator.class);
                    fxd.aggregate(msg);
                } else {
                    JsonMerge.aggregateAsJsonResult(msg);
                    mediation.sendResponse(msg, 200);
                }
            ]]></u:java>
        </u:outSequence>
    </u:target>
</u:proxy>

<!--Defines an in-memory aggregator instance with a completion size and timeout -->
<bean id="fxd" class="org.adroitlogic.ultraesb.core.helper.InMemoryMessageAggregator">
    <constructor-arg ref="ultra-config"/>
    <property name="completionSize" value="3"/>
    <property name="timeoutDurationMillis" value="1000"/>
</bean>

To facilitate timeouts, the MessageAggregator.beginTimer() method must be invoked from the inSequence for the original incoming message, so that the aggregator can correctly trigger completion after the expected duration from that moment.

On the outSequence, each response received is handed to the message aggregator. Once the timeout expires, the InMemoryMessageAggregator fires with whatever responses collected thus far, and the JsonMerge.aggregateAsJsonResult() method aggregates these messages and sends back to the client

Streaming aggregation

When using streaming aggregation with timeouts, the responses received are sent back to the client as "chunks" of one single response to the client, while considering the timeout, to close a premature response as a valid and correct JSON response expected by the client.

Here is the definition of the proxy service that handles streaming aggregation with a timeout. Refer to the sample #212 for the complete and annotated configuration.

<u:proxy id="stream-and-aggregate-proxy">
    <u:transport id="http-8280"/>
    <u:target>
        <u:inSequence>
            <u:java import="samples.json.SampleJsonTransformation;"><![CDATA[
                MessageAggregator agg = mediation.getSpringBean("agg", MessageAggregator.class);
                agg.beginTimer(msg);

                Message clone = msg.cloneMessage();
                mediation.setPayloadFromString(clone, SampleJsonTransformation.transformRequest(msg, "Server 1"));
                mediation.sendToEndpoint(clone, "json1");

                clone = msg.cloneMessage();
                mediation.setPayloadFromString(clone, SampleJsonTransformation.transformRequest(msg, "Server 2"));
                mediation.sendToEndpoint(clone, "json2");

                clone = msg.cloneMessage();
                mediation.setPayloadFromString(clone, SampleJsonTransformation.transformRequest(msg, "Server 3"));
                mediation.sendToEndpoint(clone, "json3");
            ]]></u:java>
        </u:inSequence>
        <u:outSequence>
            <u:java><![CDATA[
                if (msg.getMessageProperty("MERGED") == null) {
                    MessageAggregator agg = mediation.getSpringBean("agg", MessageAggregator.class);
                    agg.aggregate(msg);
                }

                if (msg.getMessageProperty("EXPIRED") != null) {
                    // an expired message arrives after the MessageAggregator timesout. Usually there is no use for them
                    return;
                }

                // StreamingAggregator marks messages as "part i of n" where i = 1 for the first, and i = n for last
                Integer part = (Integer) msg.getMessageProperty("part");
                Integer size = (Integer) msg.getMessageProperty("size");

                if (size < 0) {
                    // size -1 indicates early termination using this last payload for closure
                    if (part == 1) {
                        mediation.setPayloadFromString(msg, "{ \"merged\" : []}");
                    } else {
                        mediation.setPayloadFromString(msg, " {} ]}");
                    }

                } else {
                    if (part == 1) {
                        String result = "{ \"merged\": [" + mediation.readPayloadAsString(msg) + ",";
                        mediation.setPayloadFromString(msg, result);
                    } else if (part == size) {
                        String result = mediation.readPayloadAsString(msg) + "]}";
                        mediation.setPayloadFromString(msg, result);
                    } else {
                        String result = mediation.readPayloadAsString(msg) + ",";
                        mediation.setPayloadFromString(msg, result);
                    }
                }
                mediation.sendPart(msg, 200, part, size);
            ]]></u:java>
        </u:outSequence>
    </u:target>
</u:proxy>

To facilitate timeouts, the MessageAggregator.beginTimer() must be invoked from the inSequence for the original incoming message, so that the aggregator can correctly trigger completion after the expected duration from that moment.

On the outSequence, each response received is handed to the message aggregator, that stamps it as 'EXPIRED' if the aggregation group to which the response belongs has expired. Such messages are dropped.

When the StreamingAggregator is configured with a timeout, it triggers an event after expiration of this time from the beginTimer() call for each aggregation group. Thus, a premature completion will trigger the StreamingAggregator to inject a trigger message marked as "part <i> of <n>", with n set to -1 for such a timeout, signaling immediate closure of the response, and indicating current position expected.

Thus a premature close at part1 will send an empty response as:

{ "merged" : []}

While a premature close at part 2, or 3 will send a response as:

{} ]}

to effectively close the premature response as a valid JSON message when one or two parts has been already sent as

{ "merged" : [ <part 1>, 
{ "merged" : [ <part 1>, <part 2>,

Testing the samples in 5 minutes

The configuration defines three external endpoints (json1, json2, json3) with which the proxy services interact €" and in this simple example, we've made them all point at the same sample JSON service hosted in the test environment at: http://localhost:9000/service/JsonServlet

To try the sample service, you could use the AdroitLogic ToolBox, or the UltraESB unit test cases JsonLoadTest OR JsonTimeoutLoadTest which could be executed through your IDE.

Testing aggregation without timeouts

To test the sample 211 without timeouts, start the UltraESB as "./ultraesb.sh -sample 211" from the bin directory of your installation. Next start the ToolBox as "./toolbox.sh" from the bin directory of your installation. [Note: You may use the .bat variants on a Windows environment]

From the ToolBox GUI, select File->New->JettyServer and press "Start Jetty" to start the sample JSON services on port 9000. Now select File->New->Http/s Client and specify the URL http://localhost:9000/service/JsonServlet, the content type as "application/json" and the payload as follows to test the backend service.

{ "prefix_for_Server 1" : "Hello Server 1 - hello world, I am client N!", "v1" : 100, "v2" : 300}

You should see a response as shown below

HTTP/1.1 200 OK
Content-Type: application/json
Connection: close
Server: Jetty(6.1.21)

{"v1":100,"result":30000,"v2":300,"prefix_for_Server 1":"Welcome to Server 1"}

Now send the following request payload to the proxy services as described below.

{ "message" : "hello world, I am client N!", "v1" : 100, "v2" : 300}

Lets first send the request to the merge-and-aggregate-proxy by selecting the URL as http://localhost:8280/service/merge-and-aggregate-proxy .You will receive a response as follows:

HTTP/1.0 200 OK
Date: Mon, 26 Apr 2010 06:05:20 GMT
Server: UltraESB/1.0.0 (b1)
Content-Length: 251
Content-Type: text/plain; charset=ISO-8859-1
Connection: close

{ "merged": [{"v1":100,"result":30000,"prefix_for_Server 2":"Welcome to Server 2","v2":300},{"v1":100,"result":30000,"v2":300,"prefix_for_Server 1":"Welcome to Server 1"},{"v1":100,"result":30000,"v2":300,"prefix_for_Server 3":"Welcome to Server 3"}]}

Note that the response received aggregates responses from three endpoints. Next send a request to the stream-and-aggregate-proxy by selecting the URL as http://localhost:8280/service/stream-and-aggregate-proxy. You will see a similar response with an aggregated message.

To test the proxies with timeouts, start sample 212 as "./ultraesb.sh -sample 212" from the bin directory of your installation, and use the TookBox.

Similarly send the same request to the two proxy services at the same proxy service URLs. You will sometimes receive a response with three parts, and sometimes with two, one or none as shown below - depending on how many responses from the backend were collected before the timeout in 1000ms.

{ "merged" : []}
{ "merged": [{"v1":100,"result":30000,"prefix_for_Server 2":"Welcome to Server 2","v2":300}, {} ]}
{ "merged": [{"v1":100,"result":30000,"prefix_for_Server 2":"Welcome to Server 2","v2":300},{"v1":100,"result":30000,"v2":300,"prefix_for_Server 1":"Welcome to Server 1"}, {} ]}
{ "merged": [{"v1":100,"result":30000,"v2":300,"prefix_for_Server 3":"Welcome to Server 3"},{"v1":100,"result":30000,"v2":300,"prefix_for_Server 1":"Welcome to Server 1"},{"v1":100,"result":30000,"prefix_for_Server 2":"Welcome to Server 2","v2":300}]}

<h3 style="text-align: justify;">

Using TCPDump to see the streaming in action

To verify the streaming of the response with the stream-and-aggregate-proxy, on a Unix or Linux system perform a tcpdump over port 8280. [Note: On a Windows system Wireshark or similar tools will allow such a dump]. The following is an ASCII tcpdump trace revealing the streaming in action

HTTP/1.0 200 OK
Content-Type: application/json
Date: Mon, 26 Apr 2010 06:08:51 GMT
Server: UltraESB/1.0.0 (b1)
Connection: Close

11:38:51.034786 IP 127.0.0.1.13685 > 127.0.0.1.8280: . ack 137 win 265 <nop,nop,timestamp 2876208 2876208>
E..4..@. @../........5u X..w........    GY.....
.+.0.+.0
11:38:51.034954 IP 127.0.0.1.8280 > 127.0.0.1.13685: P 137:229(92) ack 328 win 265 <nop,nop,timestamp 2876208 2876208>
E....F@.@.& ........ X5u......w....    .......
.+.0.+.0{ "merged": [{"v1":100,"result":30000,"prefix_for_Server 2":"Welcome to Server 2","v2":300},
11:38:51.034972 IP 127.0.0.1.13685 > 127.0.0.1.8280: . ack 229 win 265 <nop,nop,timestamp 2876208 2876208>
E..4..@. @...........5u X..w....D...    F......
.+.0.+.0
11:38:52.009770 IP 127.0.0.1.8280 > 127.0.0.1.13685: P 229:308(79) ack 328 win 265 <nop,nop,timestamp 2876452 2876208>
E....G@.@.&,........ X5u...D..w....    .w.....
.+.$.+.0{"v1":100,"result":30000,"v2":300,"prefix_for_Server 3":"Welcome to Server 3"},
11:38:52.009801 IP 127.0.0.1.13685 > 127.0.0.1.8280: . ack 308 win 265 <nop,nop,timestamp 2876452 2876452>
E..4..@. @..-........5u X..w........    D......
.+.$.+.$
11:38:52.092732 IP 127.0.0.1.8280 > 127.0.0.1.13685: P 308:388(80) ack 328 win 265 <nop,nop,timestamp 2876472 2876452>
E....H@.@.&*........ X5u......w....    .x.....
.+.8.+.${"v1":100,"result":30000,"v2":300,"prefix_for_Server 1":"Welcome to Server 1"}]}

Note that the different parts were returned via separate TCP messages as they became available. In contrast a trace with the merge-and-aggregate-proxy will reveal the complete response being returned as one TCP layer response as follows.

HTTP/1.0 200 OK
Date: Mon, 26 Apr 2010 06:10:53 GMT
Server: UltraESB/1.0.0 (b1)
Content-Length: 251
Content-Type: text/plain; charset=ISO-8859-1
Connection: close

11:40:53.438934 IP 127.0.0.1.13688 > 127.0.0.1.8280: . ack 172 win 265 <nop,nop,timestamp 2906809 2906809>
E..4..@. @...........5x XD..hD......    y(.....
.,Z..,Z.
11:40:53.439034 IP 127.0.0.1.8280 > 127.0.0.1.13688: P 172:423(251) ack 327 win 265 <nop,nop,timestamp 2906809 2906809>
E../..@.@.m......... X5xD...D..h...    .#.....
.,Z..,Z.{ "merged": [{"v1":100,"result":30000,"v2":300,"prefix_for_Server 3":"Welcome to Server 3"},{"v1":100,"result":30000,"v2":300,"prefix_for_Server 1":"Welcome to Server 1"},{"v1":100,"result":30000,"prefix_for_Server 2":"Welcome to Server 2","v2":300}]}