Integrating JMS

Messaging is a commonly used mechanism to communicate with various systems in an enterprise, e.g.:

With its JMS application facility, the Infinity Process Engine provides a means to integrate with any kind of message-oriented system.

For the remainder of this section, it is assumed that you are familiar with the JMS 1.1 specification.

The following sections describe JMS applications and their usage:

Sending and Receiving JMS Messages

Applications in process models are synchronous , procedural and - if possible - transactional operations. If a non-interactive application (e.g. a Session Bean method) is invoked, the workflow thread, in which the application is performed, is blocked until the application has been executed completely. Return values are retrieved from this application then.

Sending a message via JMS is an asynchronous operation. An execution thread sending a JMS message is only blocked until the messaging system confirms the reception of the message. It is the responsibility of the messaging system to deliver the message to the addresses processing the message.

Receiving a message via JMS requires the receiver to listen to a particular message queue.

Transactional Behavior

Sending a request message for a JMS application is controlled by the same transaction mechanisms as spawning a workflow thread for concurrent process topologies (see section Executing Process Instances in chapter Modeling Basics). The message is sent when the JTA transaction demarcated for the workflow thread is committed.

If the JMS application specifies a response message, the JTA transaction (Java Transaction API) is committed immediately when the request message has been sent. If no response message has been specified, the workflow thread continues to execute and the JTA transaction is committed if the thread terminates or is asynchronously forked (see Executing Process Instances in chapter Modeling Basics). If you intend to send the request message of a JMS application without a response message to be sent immediately, you need to set the Fork On Traversal flag for all outgoing transitions of the corresponding activity. Please refer to the section Setting Transition Conditions of the chapter Working with Transitions for detailed information on this flag.

Receiving the response message and reactivating the workflow thread of the corresponding activity is executed in a JTA transaction context by the Infinity Process Engine. The transactional behavior of this workflow thread is the same as for any workflow thread started with the startProcess() or complete() methods of the Infinity Process Platform Workflow Session.

JMS Applications

Infinity Process Platform allows you to specify JMS applications as a sequence of sending a request message and receiving a response message. Input parameters may be passed to the request message; output parameters are retrieved from the response message.

A workflow thread executing a JMS application will send the request message and then immediately return to the control flow of the calling workflow thread. This thread will be suspended until the response message is received. Starting the application indicates the "expectation" to receive a response message in a similar way as the activation of an interactive activity indicates the expectation of a subsequent completion or abortion by a user:

You may either omit sending the request message or receive the response message.

In the first case, the application will only send the request message and than immediately continue to execute the calling workflow thread:

In the latter case, the workflow thread executing the application will be immediately suspended until the response message is received:

Modeling JMS Applications

To define a JMS application in the Infinity Process Platform modeler:

  1. In the diagram toolbar palette click Applications and select JMS Application.

  2. Open the property page of the newly created application.
  3. Select Request or Response as type.

Please refer to the section Adding a JMS Request and the section Adding a JMS Response respectively for detailed information on the according properties.

Adding a JMS Request

To add a JMS request, select the Request entry below the JMS Application entry on the left side of the property page. The request specific property entries are displayed in the right pane.

JMS Application Properties
Figure: JMS Application Request Properties

Click the Add button to add an in access point for the JMS request.

JMS Application Properties
Figure: Request In Access Point

In the In Access Point section:

  1. Choose between Body or Header as location.
  2. Add a name and id.
  3. Browse to a class defining the type.
  4. Optionally enter a default value.

Click Add to create further in access points for the request.

Adding a JMS Response

To add a JMS response, select the Response entry below the JMS Application entry on the left side of the property page. The response specific property entries are displayed in the right pane.

JMS Application Properties
Figure: JMS Application Response Properties

Click the Add button to add an out access point for the JMS response.

JMS Application Properties
Figure: Response Out Access Point

In the Out Access Point section:

  1. Choose between Body or Header as location.
  2. Add a name and id.
  3. Browse to a class defining the type.

Click Add to create further out access points for the response.

Queues and Queue Connection Factories

For request messages you may specify any JMS Queue Connection Factory and corresponding JMS Queue from the resource environment of the WorkflowService performing your workflow model by providing their JNDI names in the JMS Application Properties dialog. The resource environment is specified in the EJB Deployment Descriptor of the Infinity Process Engine.

Message Provider and Message Acceptor Classes

In some cases, you might desire to perform your own assembling and disassembling of message headers. Although Infinity Process Platform currently only provides a default implementation, realizing the described message types, in a future release users may implement custom message provider and message acceptor classes.

These classes have to be selected in the combo boxes Message Provider Type Class and Message Acceptor Type Class in the Application Properties dialog for the JMS application.

Messages Types and Parameters

Infinity Process Platform supports the following types of JMS messages:

Depending on the type of message, different parameter sets can be specified for the body of the JMS Application in the Infinity Process Workbench:

Message Type Parameter Set
Map Message an arbitrary list of named Java primitive types
Object Message a single, unnamed serializable Java object
Text Message a single, unnamed object of type java.lang.String
Stream Message multiple, unnamed serializable Java objects, identified by order

These parameter sets may serve as application access points to define data mappings for corresponding activities.

Message Headers

In addition to the data provided with the message body, it is possible to provide data as properties in the JMS message header of the request and the response message by specifying the Location option in the request or response parameters table.

In addition to header properties specified by the modeler Infinity Process Platform generates a tuple of message header properties for the request message and expects the response message to contain a set of specific message header properties by default.

These properties are intended to specify the process context (e.g. process instance OID, activity ID) in which a request message is sent and a response message is intended to be received. They are not intended to be used for passing semantic business data accessed in the process invoking the corresponding JMS applications.

The header of the request message contains the properties:

The header of the response message is expected to contain the following properties:

or

or

In case the response message contains the latter tuple of properties, the value has to match exactly the type of the requested data. This tuple is intended to be used in scenarios where a JMS application only specifies a response message and the process context is to be retrieved via business data. An external system might, for example, send a response message with the header:

to Infinity Process Platform . This information is sufficient to identify the process instance, in which context the message should be received as well as the activity which has to be completed by receiving this message.

This way, external systems can communicate with the Infinity Process Platform Process Engine without any knowledge about internal identifiers, regarding the corresponding business context.

Time-out Scenario for Response Messages

If a response message is specified for a JMS application and a corresponding message is not received after a certain time-out, error handling can be implemented with the same time-out mechanism used for interactive activities.

Creating a Message Receiver

The following example demonstrates how to write a JMS client to receive JMS messages sent from a Infinity Process Engine. A code fragment like this might be used to test your JMS application before you integrate with the target, non-JMS messaging system.

Connecting To The Message Queue

To receive messages sent by JMS applications in the Infinity Process Platform Process Engine you need to connect to the message queues defined for these applications, e.g.:

...
Context messaging = new InitialContext();
QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory)
messaging.lookup("Name of factory mapped to JNDI used in model"); Queue queue = (Queue) messaging.lookup("Name of queue mapped to JNDI used in model");
QueueConnection queueConnection = queueConnectionFactory.createQueueConnection(); queueConnection.start(); QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
QueueReceiver receiver = queueSession.createReceiver(queue); ...

Synchronous Mode

If you want to receive message in synchronous mode, you may simply call:

Message message = receiver.receive();

Asynchronous Mode

Alternatively, if you want to receive message in asynchronous mode, you can create a message listener class, create an instance of this class and register it with the consumer:

public class MyListener implements MessageListener
{
public void onMessage(Message message)
{
// Unpack and handle the messages received
....
}
} MyListener myListener = new MyListener();
// Receiver is MessageConsumer object created above
receiver.setMessageListener(myListener);

A Message Driven Bean can be created in similar fashion as the listener.

Unpacking the Message Header

Unpacking the message header does not require prior knowledge of the message type, you can simply get all parameter names and values for the header as below:

Enumeration enum = message.getParameterNames();
while (enum.hasMoreElements())
{
Object header1 = message.getObjectProperty (enum.nextElement());
}

Unpacking Infinity Process Platform header message information as mentioned above can be performed via:

long processInstanceOID = message.getLongProperty("processInstanceOID");
long activityInstanceOID = message.getLongProperty("activityInstanceOID");

Unpacking the Message Data

Unpacking the message data requires prior knowledge of the message and the JMS application that has been modeled. Depending on the type of message you have modeled for your application, you can cast the received message to the appropriate message specialization and obtain the data provided with message header and body.

Unpacking a text message can be done with:

String data = ((TextMessage)message).getText();

Unpacking an object message can be done with:

Object data = ((ObjectMessage)message).getObject();

Unpacking a map message can be done with:

MapMessage message = (MapMessage)message;

Object object1 = message.readObject("Parameter1");

Object object2 = message.readObject("Parameter2");

assuming, that you have specified the parameter names Parameter1 and Parameter2 for the request message of your JMS application.

Unpacking a stream message can be done as follows:

StreamMessage message = (Stream)message;

Object object1 = message.readObject();

Object object2 = message.readObject();

Note that in case of a stream message you will receive the objects in the order you have defined in the application of your Infinity Process Platform workflow model.

Instead of readObject(), you may also call readString() and other methods, if the object type is known in advance.

Writing a JMS Sender

The following example demonstrates how to write a JMS sender to send JMS messages to a Infinity Process Engine. A code fragment like this might be used to test your JMS application before you integrate with the target, non-JMS messaging system.

Creating a Message Sender

Message senders can be created in similar fashion as message consumers. However, you need to connect to the message queues defined in the EJB Deployment Descriptor of your Infinity Process Engine, e.g.:

...
Context messaging = new InitialContext();
QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory) 
messaging.lookup("QueueConnectionFactory1"); Queue queue = (Queue) messaging.lookup("Queue1");
QueueConnection queueConnection = queueConnectionFactory.createQueueConnection(); queueConnection.start();  QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
QueueSender sender = queueSession.createSender(queue); ...

to complete activities. The code fragment above assumes that you have specified the connection factory QueueConnectionFactory1 and the queue Queue1 in your EJB Deployment Descriptor for the Infinity Process Engine.

The response message header can have different contents as described in the following subsections.

Populating the Message Header with an Activity Instance OID

The activity instance is retrieved directly by the OID. The header has just one property with the name activityInstanceOID.

message.setLongProperty("activityInstanceOID", 4711);

Populating the Message Header with the Process Instance OID and the Activity ID

The activity instance is retrieved using the process instance and the activity ID defined in the model. The header has two properties with the name processInstanceOID and activityID.

message.setLongProperty("processInstanceOID", 1234);
message.setStringProperty("activityID", "Activity1");

Populating the Message Header with Process ID, Activity ID, Data ID, CarnotPartition ID and Data Value

The activity instance is retrieved using the model elements (process ID, activity ID, data ID and carnotPartition ID) and an existing data value. The header has five properties processID, activityID, dataID, carnotPartitionID and serializedDataValue. The first four determine the IDs of the model elements. The last property is the value of the data in order to determine the correct activity instance, which is using this data.

message.setStringProperty("processID", "ProcessDefinition1");
message.setStringProperty("activityID", "Activity1");
message.setStringProperty("dataID", "Data1");
message.setStringProperty("carnotPartitionID", "Partition1");

This value has to be provided via:

message.setStringProperty("serializedDataValue", dataValue);

for all serializable data.

Sending Messages

The message created above is sent by simply calling:

sender.sendMessage(message);

Starting Processes using JMS Triggers

If you want to automatically start a process by sending a JMS message, you do it similar as with a JMS application. However, you should send the message to the queue to which the Message-Driven Beans ResponseHandler is bound. Additionally, specify the qualified ID of the process to be started and the ID of the partition the process belongs to, by giving the message the string properties processID and carnotPartitionID respectively:

message.setStringProperty("processID", "{ModelX}ProcessDefinitionX"); 
message.setStringProperty("carnotPartitionID", "PartitionX"); 

In case no partition ID is specified, the default partition configured by the engine is used.

Refer to chapter Using JMS Application and JMS Trigger of the Tutorial guide for a demonstration on how to start processes via a JMS trigger.

Determining the Trigger Thread Mode

As described above, JMS processes can be either started in a synchronous or asynchronous manner. You can specify this behavior via properties in your server-side carnot.properties file. The following property determines the setting for all defined JMS triggers:

JMS.ProcessTrigger.ThreadMode = synchronous / asynchronous

To set this value per specific trigger, use:

ProcessEngine.TRIGGER_ID.ThreadMode = true / false

In case this property is set, it specifies if processes for a trigger with id TRIGGER_ID should be started in synchronous (true) or asynchronous (false) manner. If this property is not set, the model setting is used.