Reliable Messaging
- Messaging Queuing with JMS
- Message-driven Bean
- JMS as a SOAP Transport
- Reliable Messaging on the Internet
The EJB architecture integrates object-oriented programming with transaction processing in an elegant way. Its distributed object architecture relies on a client/server model where the client sends a request and waits for a response from a server synchronously. This synchronous invocation model fits into transaction processing model we have thoroughly reviewed. However, the synchronous model has several limitations that are crucial for EAI.
Mainly, the problems are related to communication between the client and server. With a synchronous model, when a communication failure occurs, the client receives an error message. Accordingly, the client must send the identical request to the server later. In some cases, the client may not want to retry but would prefer to have someone send the request when communication is restored.
This problem can be resolved by message queuing, where the client puts a message on a queue and the server gets a message from the queue. The message in the queue is often recorded in persistent storage; therefore, the message is sure to be sent to the server. Thus, message queuing is often called reliable messaging or guaranteed message delivery. Furthermore, message queuing is closely related to transaction processing because the queue can be considered a transaction resource.
Let's examine Java Message Service (JMS), which is a standard API for message queuing systems. First, we will update our purchase order example to include JMS. Then we will examine the EJB 2.0 Message-driven Bean, which is asynchronously invoked to handle the processing of incoming JMS messages as in JMS applications. Finally, we consider how to adopt JMS as a SOAP transport.
Message Queuing with JMS
We could make our purchase order program more functional by assuming that there is an order management system instead of an order database. Figure 1 illustrates this extension including a message queue front-ending the order management system. POProcess puts order information in the queue and proceeds to the next operation without blocking. On the other hand, the order management system asynchronously gets the information from the queue to record the order information.
Figure 1 Application integration with message queue.
Let's rewrite our purchase order example using JMS. Listing 1 is a modification of POProcessBean, namely POProcessBeanJMS.
Listing 1: POProcessBeanJMS Class
public class POProcessBeanJMS implements SessionBean { public Order order(String shipId, String billId, String sku, [ccc] int quantity) throws java.rmi.RemoteException, Exception { Exception ex = null; UserTransaction tran = mySessionContext.getUserTransaction(); try { tran.begin(); Product product = productHome.findByPrimaryKey(sku); if (quantity > product.getInStock()) { throw new XXXException("Stock is not enough"); } product.setInStock(product.getInStock() - quantity); OrderData order = new OrderData("" + [ccc] System.currentTimeMillis()); order.setBillTo(billId); order.setShipTo(shipId); order.setSKU(sku); order.setProductName(product.getName()); order.setQuantity(quantity); int total = quantity * product.getPrice(); order.setTotalPrice(total); queueConnectionFactory = (QueueConnectionFactory) jndiContext.lookup("QueueConnectionFactory"); queue = (Queue)jndiContext.lookup(queueName); queueConnection = queueConnectionFactory.createQueueConnection(); queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); queueSender = queueSession.createSender(queue); message = queueSession.createObjectMessage(); message.setObject(order); queueSender.send(message); if (total > MAX_TOTAL) { throw new Exception("Exceed the max charge (" + [ccc] MAX_TOTAL + ")"); } return order; } catch(Exception e) { ex = e; e.printStackTrace(); } finally { if (ex!=null) { tran.rollback(); throw ex; } else { tran.commit(); } } return null; } }
Note that the entity bean Order is replaced by a Java class OrderData. The typical way of using JMS, as shown in bold in the program, is as follows:
Look up QueueConnectionFactory and a queue via JNDI.
Create a connection and a session object to access a queue manager.
Create a queue sender object.
Create a message object and send it.
Because queues can be transaction resources, they adhere to the two-phase commitment protocol. More specifically, when tran.commit() is invoked in Listing 1, a prepare message is sent to the queue manager. Only when a commit message is sent to the queue manager is the message placed into the queue, where the server can get it.
Let's look at the server side, namely OrderManagementListener, which is the front end of the order management system. Listing 2 is an OrderManagementListener class that gets order messages from the queue.
Listing 2: OrderManagementListener Class
public class OrderManagementListener { public static void main(String[] args) { .................. try { jndiContext = new InitialContext(); queueConnectionFactory = (QueueConnectionFactory) jndiContext.lookup("QueueConnectionFactory"); queue = (Queue) jndiContext.lookup(queueName); queueConnection = queueConnectionFactory.createQueueConnection(); queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); queueReceiver = queueSession.createReceiver(queue); que ueConnection.start(); while (true) { Message m = queueReceiver.receive(1); if (m != null) { if (m instanceof ObjectMessage) { message = (ObjectMessage) m; OrderData order = [ccc] (OrderData)message.getObject(); /// Invoke order management system } else { // Do something } } } } catch (Exception e) { } finally { if (queueConnection != null) { try { queueConnection.close(); } catch (JMSException e) {} } } } }
Unlike on the client side, a queue receiver object is created to get messages from the queue. Because there is an OrderData object in a message, we just invoke the order management system with the order data. Again, the message is received only when a transaction is committed at the client side.
You can integrate applications in a loosely coupled and extensible manner with message queuing. First, the client does not have to know who receives the message. Second, even if the server is not available because of a server failure or communication problem, the client can still continue to send requests as long as the queue is available. In addition, load balancing is also possible by simply adding replications of the server.