For many message processing scenarios, especially for FIX processing, it is important to correlate different messages. Messages may have been received in the wrong order and messages received first rely on the other for their processing or are even making the processing of the previously received message redundant, for example in case of order cancellation. The earlier in the processing chain this sequencing of messages is performed the less it will affect downstream resources (systems and individuals).
A related aspect is the requirement to group certain messages and process them as a whole.
In both scenarios the receiving system needs to wait and cache a set of received messages until either
FIX engines would take care of messages being processed in order. However, as long as not both endpoints, sender and receiver, are FIX engines, the order of messages has to be taken care by the messaging platform in between.
There are two ways to ensure that messages are processed in sequential order.
FIX engines would take care of message being processed in order. However, as long as not both endpoints, sender and receiver, are FIX engines, the order of messages has to be taken care by the messaging platform in between.
There are two ways to ensure that messages are processed in sequential order:
In distributed multi-threaded environments, it is difficult to guarantee that messages that have to be processed in sequence are eventually processed in sequence. Depending on the application a complete serialization of message processing is not always possible respectively desirable. To avoid moving logic that guarantees messages being processed in the right order into the application itself, the underlying layer should be able to handle it.
Unit of Order Processing can be applied on Process Definition layer. Each message triggering a certain process instance will be routed through a messaging provider utilizing a group ID in the header. The group ID could be anything in the original message that can be used to group messages. If it is not the default indicator (groupID in the header), an additional pre-processor needs to be defined on the process trigger, that prepares the message header accordingly. All messages with the same group ID are processed by one thread at the same time only. Therefore exclusive access of a thread on any kinds of resources (data structures) is guaranteed. Another header field should contain a sequence number. Messages that arrive out of order are not processed until all predecessor messages have been processed correctly. Pending messages are rejected after a certain time (some milliseconds) and stored in a distributed in-memory cache due to failover reasons.
Goal of the Message Preprocessing step is
At the end of the preprocessing, the normalized message is sent to a JMS message queue via a Camel application type with the following Route Definition:
Sequencing can be defined e.g.:
<from uri="activemq:queue:sequencerQueue?concurrentConsumers=10&maxConcurrentConsumers=50"/> <process bean="messageSequencePreprocessor"> <split streaming="true"> <to uri="ipp:direct"> </split>
with Additional Bean Definitions set to
<bean id="messageSequencePreprocessor" class="com.infinity.integration.message.MessageSequenceProcessor"/>
The Apache Camel aggregator allows to combine a number of messages together into a single message using for example:
The example below shows how to aggregate all incoming messages. If we have 4 messages aggregated in the same correlation group with the same id, we want the aggregation to complete.
<route > <from uri="direct:start"/> <aggregate strategyRef="aggregatorStrategy" completionSize="4" > <correlationExpression > <simple>header.id </simple> </correlationExpression > <to uri="..."/> </aggregate> </route > <bean id="aggregatorStrategy" class="org.apache.camel.processor.BodyInAggregatingStrategy"/ >