[ Team LiB ] Previous Section Next Section

JMS Publish-Subscribe Model

Figure 12.25 depicts the basic JMS architecture elements that support publish-subscribe messaging. The publish-subscribe messaging architecture is an extension of the core JMS architecture with features specialized to suit a publish-subscribe messaging model. Connection factories, connections, sessions, message producers, message consumers, and endpoint destinations are all extended with publish-subscribe message model interfaces. As we'll also illustrate later in this chapter, new to JMS v1.1, the more generic JMS architecture may also be used to implement publish-subscribe messaging without much consideration for the messaging-domain–specific APIs illustrated in Figure 12.25.

Figure 12.25. JMS publish-subscribe messaging.

graphics/12fig25.gif

JMS clients use JNDI to obtain an initial reference to a named TopicConnectionFactory object. The TopicConnectionFactory.createTopicConnection() methods are used to create an instance of a TopicConnection object. The createTopicConnection() method can be called with a username and password or by using the parameterless version of the method with a default user identity assumed.

The TopicConnection interface is a type of Connection interface that represents a connection to a JMS publish-subscribe messaging service. The createConnectionConsumer() and createDurableConnectionConsumer() methods are not used by regular JMS clients and are primarily used by a messaging server to manage publish-subscribe message service connections. The createTopicSession() method is called by JMS clients to create a TopicSession instance. Session transactions and acknowledgment mode are also established during the creation of a TopicSession.

TopicSession.createTopic() creates an instance of a Topic object given a provider-specific name for a topic. As with creating queues using QueueSession, the creation of named topics is something that will be provided using other means by most service-provider implementations. The Topic interface encapsulates a topic destination to which publishers publish messages and from which subscribers subscribe to receive messages. Different service providers will implement a hierarchy of topic names differently, but a Topic.getTopicName() can be used to obtain the String representation of the topic. The TopicSession.createTemporaryTopic() method creates a TemporaryTopic object. The TemporaryTopic is deleted when its TopicConnection is closed.

The TopicSession.createPublisher() method creates a TopicPublisher message producer that is used to publish messages to a particular Topic. Messages can be published to a Topic using the various TopicPublisher.publish() methods. Variations of these methods enable the publishing of messages to a TopicPublisher object's associated Topic or to a newly specified Topic object passed to the publish() method. Delivery modes, priorities, and time to live for a message can also be specified during calls to TopicPublisher.publish(). Messages to publish to a Topic can be created using the various message-creation methods defined on the Session interface from which TopicSession extends.

Two types of TopicSubscriber creation methods exist on TopicSession. The TopicSession.createSubscriber() methods create nondurable TopicSubscriber instances. Nondurable TopicSubscribers are those subscribers who receive notification only of published messages to which they have subscribed while the subscriber is active. Durable subscribers are those who can receive messages later, even after they were temporarily unavailable; they can be created using the TopicSession.createDurableSubscriber() calls. Durable subscribers have a name associated with the published messages stored for their deferred notification. Versions of TopicSession.createSubscriber() and TopicSession.createDurableSubscriber() exist to also enable use of message selector filters, and with a boolean noLocal flag indicating that messages published by their own connection should be ignored.

A TopicRequestor helper class can also be used to simplify the use of the JMS publish-subscribe model for request and reply behavior. A TopicRequestor object is first created with a handle to a TopicSession and Topic object. The request() method may then be used to publish a Message to a topic destination and wait for a response in the form of a Message. To release and close any open distributed resources associated with the TopicRequestor and the underlying TopicSession, the JMS client invokes the close() method on the TopicRequestor.

Finally, as depicted in Figure 12.26, a set of interfaces exists to encapsulate transactional support for publish-subscribe messaging. As mentioned before, the JMS client should not be coded to talk directly to such interfaces. Rather, the messaging server will use such interfaces to imbue a publish-subscribe message connection factory, connection, and session with JTA XA-compliant transaction management functionality. The interfaces in Figure 12.26 simply extend the interfaces presented in Figure 12.9 and provide method signatures specific to the publish-subscribe message domain model.

Figure 12.26. JMS transactional publish-subscribe messaging.

graphics/12fig26.gif

Publish-Subscribe Sample Source Code

We present a brief example here to illustrate the use of JMS publish-subscribe messaging. Because our example here is very similar to the queue example presented earlier, we highlight only the core differences. This example implements a TopicSupplier to publish OrderItem objects to an UnleashedTopic topic name. A TopicConsumer mimics a simple topic subscriber that subscribes to receive OrderItem messages.

NOTE

The sample code strewn throughout this section leaves out some exception handling and other nonessential features in the interest of simplifying the description. The TopicManager, TopicSupplier, TopicConsumer, and MessageHandler classes implement the core of this example, and Props, OrderItem, and OrderManager classes are also used.

As with the previous example, Ant can be used to execute the build.xml compilation script associated with this example. In fact, if you've already executed the Ant script for the last example, the code and execution scripts for this example will already have been generated. After running the Ant script, the generated runtopic-supplier and runtopic-consumer script files can be used for executing the example.


TopicManager

The TopicManager used with these examples serves as the base class for both the TopicConsumer and TopicSupplier. The constructor of the TopicManager performs all the necessary initialization steps for obtaining a Topic handle as illustrated here:


package com.wls8unleashed.jms;
 ...
public class TopicManager {

 protected TopicConnectionFactory topicConnectionFactory;
 protected TopicConnection topicConnection;
 protected TopicSession topicSession;
 protected Topic topic;

 public TopicManager(Context context){
     ...
   // Get JMS factory JNDI name
   String jmsFactoryName = Props.get("jms.factory.for.topic");

   // Create topic connection factory
   System.out.println("Looking up factory name: " + jmsFactoryName);
   topicConnectionFactory =
    (TopicConnectionFactory) context.lookup(jmsFactoryName);

   // Create topic connection to the factory
   System.out.println("Creating topic connection...");
   topicConnection = topicConnectionFactory.createTopicConnection();

   // Create session to the connection
   System.out.println("Creating topic session...");
   topicSession = topicConnection.createTopicSession(
                   false, Session.AUTO_ACKNOWLEDGE);

   // Get topic name
   String topicName = Props.get("topic.name");

   // Lookup handle to the Topic
   try {
    System.out.println("Looking up topic name: " + topicName);
    topic = (Topic) context.lookup(topicName);
   } catch (NamingException namingException) {
    // If not created, create new topic, and bind topic to name
    System.out.println("Didn't find the topic...so creating: " + topicName);
    topic = topicSession.createTopic(topicName);
    System.out.println("Binding topic: " + topicName);
    context.bind(topicName, topic);
   }
  }
  ...
 }
}

The topic connection factory JNDI name and topic JNDI name are read from the following build.properties file properties:


# JMS Connection Factory for Topics
jms.factory.for.topic=UnleashedJMSFactory

# JMS Topic Name
topic.name=UnleashedTopic
TopicConsumer

The sample TopicConsumer that extends the TopicManager represents a vendor that subscribes to receive orders from the UnleashedTopic topic. The TopicConsumer.main() method first creates a JNDI context with a call to Props.getInitialContext(). The TopicConsumer is then constructed with the JNDI context as an input parameter. The TopicConsumer receives messages until the user exits the program. Finally, TopicConsumer.close() ensures that all resources are cleaned up along with the JNDI context. The TopicConsumer.main() method is shown here:


public static void main(String[] args){
 try {
  // Create JNDI context
  Context context = Props.getInitialContext();

  // Create new TopicConsumer
  System.out.println("Creating new TopicConsumer...");
  TopicConsumer topicConsumer = new TopicConsumer(context);

  System.out.println(" TopicConsumer is ready to receive messages.");

  // Receive messages until user quits from program or quit flags true.
  synchronized (topicConsumer) {
   while (!topicConsumer.quitFromReceiving) {
    try {
    // Wait for messages
    topicConsumer.wait();
    } catch (InterruptedException interruptedException) {
     System.out.println("Interruption!");
     interruptedException.printStackTrace();
    }
   }
  }

  // Close up resources when done and exit
  topicConsumer.close();
  context.close();
  System.exit(0);
  ...
}

The TopicConsumer constructor first invokes its TopicManager superclass constructor, which looks up the TopicConnectionFactory, creates a TopicConnection, creates a TopicSession, and looks up or creates a Topic. The TopicConsumer then creates a TopicSubscriber and registers itself as a MessageListener with the TopicSubscriber. The TopicConnection is then initialized to start receiving published messages. The TopicConsumer constructor initialization is shown here:


public TopicConsumer(Context context)
 throws NamingException, JMSException {

 // Call superclass constructor
 super(context);

 // Create subscriber
 System.out.println("Creating topic subscriber...");
 topicSubscriber = topicSession.createSubscriber(topic);

 // Register subscriber as a MessageListener
 System.out.println("Setting message listener...");
 topicSubscriber.setMessageListener(this);

 // Start the connection thread
 System.out.println("Starting connection thread...");
 topicConnection.start();
}

Because the TopicConsumer is a message listener implementing a MessageListener, it must implement the onMessage(Message) method. The TopicConsumer.onMessage() is called when the JMS service provider receives an OrderItem published by the TopicSupplier. The onMessage() method simply delegates its call to the MessageHandler implementation to display the information packed into a JMS message as illustrated here:


public void onMessage(Message message) {
 MessageHandler handler = new MessageHandler();
 handler.onMessage(message);
}

Finally, all TopicReceiver, TopicSession, and TopicConnection resources are closed in the close() method:


public void close() throws JMSException {
 topicSubscriber.close();
 this.topicSession.close();
 this.topicConnection.close();
}
TopicSupplier

The TopicSupplier class also extends the TopicManager class for basic topic resource initialization functionality. The TopicSupplier.main() method first retrieves the Collection of orders from the OrderManager.getOrders() call. For each OrderItem in the Collection, a call to sendOrder() is made on a TopicSupplier with an extracted OrderItem:


public static void main(String[] args) throws Exception {
 try {
  // Get some orders from the OrderManager class
  System.out.println("Getting a bunch of orders to send...");
  Collection orders = OrderManager.getOrders();

  // For each order, get order request and send to consumer
  Iterator it = orders.iterator();
  while (it.hasNext()) {
   // Get an OrderItem object to send
   OrderItem item = (OrderItem) it.next();

   // Create JNDI context
   Context context = Props.getInitialContext();

   // Create new TopicSupplier
   System.out.println("Creating new TopicSupplier...");
   TopicSupplier topicSupplier = new TopicSupplier(context);

   // Send the order
   System.out.println("Initiating order to send...");
   topicSupplier.sendOrder(item);

   // Close TopicSupplier
   System.out.println("Closing TopicSupplier...");
   topicSupplier.close();
  }
  ...
}

Inside the TopicSupplier constructor, a call is made to the TopicManager superclass constructor for topic resource initialization. A TopicPublisher is then created. The TopicSupplier constructor is shown here:


public TopicSupplier(Context context){

 // Call superclass TopicManager constructor
 super(context);
  ...
 // Create topic publisher
 System.out.println("Creating topic publisher...");
 topicPublisher = topicSession.createPublisher(topic);
  ...
}

The TopicSupplier.sendOrder() method publishes a message containing an OrderItem to a topic as shown here:


public void sendOrder(OrderItem message) throws JMSException {
 System.out.println("Publishing order message:" + message);

 // Create empty ObjectMessage on session
 System.out.println("Creating empty object message...");
 ObjectMessage sendingMessage = topicSession.createObjectMessage();

 // Start the topicconnection
 System.out.println("Starting topic connection...");
 topicConnection.start();

 // Set the order object onto the message carrier
 System.out.println("Setting order item into message...");
 sendingMessage.setObject((Serializable) message);

 // Send the message
 System.out.println("Publishing the order message...");
 topicPublisher.publish(sendingMessage);
}

Finally, the TopicSupplier.close() method closes its TopicPublisher, TopicSession, and TopicConnection resources:


public void close() throws JMSException {
 topicPublisher.close();
 topicSession.close();
 topicConnection.close();
}

WebLogic JMS Topic Configuration

We've already seen earlier in this chapter how to configure a JMS server and connection factory inside of a WebLogic Server. We've also configured a JMS queue inside of the server. Configuring a JMS topic is almost identical to the configuration of a JMS queue. By clicking on the Destinations link in the left browser pane beneath a particular JMS server name in the JMSServers list, you yield the screen displayed in Figure 12.27.

Figure 12.27. Viewing JMS server destinations.

graphics/12fig27.jpg

For this example, we want to click on Create a New JMS Topic in the center pane of the screen shown in Figure 12.27. The screen shown in Figure 12.28 is then used to perform the basic configuration for a topic. The topic name known to the JMS server and the JNDI name for the topic must be established along with other basic configuration information such as the cluster-ability of the topic's JNDI name and the persistence of the topic. We use the JNDI name UnleashedTopic in Figure 12.28 to be consistent with the name we assume for our topic sample code as read in via the build.properties file:


# JMS Topic Name
topic.name=UnleashedTopic
Figure 12.28. Configuring JMS topics.

graphics/12fig28.jpg

As with JMS queue configuration, additional configuration screens also exist as tabs beneath the topic's configuration tab in the center pane. Configuration screens exist to define thresholds and quotas for message storage, override values for message priorities and delivery modes, message expiration policies, and message redelivery policies.

JMS Topic Sample Execution

As with the queue example, a set of execution scripts will be generated when you run the Ant script for this chapter's code by typing ant at the command line in the directory where you have deposited the source code. Ant will compile all the source code examples and generate two scripts named runtopic-consumer and runtopic-supplier. The scripts will have either a .bat or .sh extension, depending on whether you are running the examples on a Windows or Unix platform, respectively.

In one command-line window, execute the runtopic-consumer script to start the sample TopicConsumer. In a separate window, you need to execute the runtopic-supplier script to start the TopicSupplier. The TopicSupplier publishes messages to the JMS topic that we configured. The messages are then asynchronously delivered to the TopicConsumer by the WebLogic JMS server.

    [ Team LiB ] Previous Section Next Section