Oracle8i Application Developer's Guide - Advanced Queuing Release 2 (8.1.6) Part Number A76938-01 |
|
Creating Applications Using JMS , 7 of 8
A JMS application can receive messages by creating a message consumer. Messages can be received synchronously using the receive
call or an synchronously via a Message Listener.
There are three modes of receive,
public BolOrder get_new_order1(QueueSession jms_session) { Queue queue; QueueReceiver qrec; ObjectMessage obj_message; BolCustomer customer; BolOrder new_order = null; String state; try { /* get a handle to the new_orders queue */ queue = ((AQjmsSession) jms_session).getQueue("OE", "OE_neworders_que"); qrec = jms_session.createReceiver(queue); /* wait for a message to show up in the queue */ obj_message = (ObjectMessage)qrec.receive(); new_order = (BolOrder)obj_message.getObject(); customer = new_order.getCustomer(); state = customer.getState(); System.out.println("Order: for customer " + customer.getName()); } catch (JMSException ex) { System.out.println("Exception: " + ex); } return new_order; }
public BolOrder get_new_order2(QueueSession jms_session) { Queue queue; QueueReceiver qrec; ObjectMessage obj_message; BolCustomer customer; BolOrder new_order = null; String state; try { /* get a handle to the new_orders queue */ queue = ((AQjmsSession) jms_session).getQueue("OE", "OE_neworders_que"); qrec = jms_session.createReceiver(queue); /* wait for 60 seconds for a message to show up in the queue */ obj_message = (ObjectMessage)qrec.receive(60000); new_order = (BolOrder)obj_message.getObject(); customer = new_order.getCustomer(); state = customer.getState(); System.out.println("Order: for customer " + customer.getName()); } catch (JMSException ex) { System.out.println("Exception: " + ex); } return new_order; }
public BolOrder poll_new_order3(QueueSession jms_session) { Queue queue; QueueReceiver qrec; ObjectMessage obj_message; BolCustomer customer; BolOrder new_order = null; String state; try { /* get a handle to the new_orders queue */ queue = ((AQjmsSession) jms_session).getQueue("OE", "OE_neworders_que"); qrec = jms_session.createReceiver(queue); /* check for a message to show in the queue */ obj_message = (ObjectMessage)qrec.receiveNoWait(); new_order = (BolOrder)obj_message.getObject(); customer = new_order.getCustomer(); state = customer.getState(); System.out.println("Order: for customer " + customer.getName()); } catch (JMSException ex) { System.out.println("Exception: " + ex); } return new_order; }
When a consumer does the first receive in its session, its gets the first message in the queue or topic. Subsequent receives get the next message, and so on. The default behavior works well for FIFO queues and topics but not for priority ordered queues. If a high priority message arrives for the consumer, this client program will not receive the message until it has cleared the messages that were already there for it.
To provide the consumer a better control in navigating the queue for its messages, the AQ navigation modes are made available to it as JMS extensions. These modes can be set at the TopicSubscriber
, QueueReceiver
or the TopicReceiver
.
FIRST_MESSAGE
resets the consumer's position to the beginning of the queue. This is a useful mode for priority ordered queues as it allows the consumer to remove the message on the top of the queue.
NEXT_MESSAGE
get the message after the established position of the consumer. For example, a NEXT_MESSAGE
issued after the position is at the fourth message, will get the second message in the queue. This is the default behavior.
For transaction grouping
FIRST_MESSAGE
resets the consumer's position to the beginning of the queue
NEXT_MESSAGE
sets the position to the next message in the same transaction.
NEXT_TRANSACTION
sets the position to the first message in the next transaction.
Note that the transaction grouping property may be negated if messages are received in the following ways:
Receive
specifying a correlation identifier in the selector,
Receive
by specifying a message identifier in the selector,
ommitting
before all the messages of a transaction group have been received.
If in navigating through the queue, the program reaches the end of the queue while using the NEXT
MESSAGE
or NEXT
TRANSACTION
option, and you have specified a blocking receive, then the navigating position is automatically changed to the beginning of the queue.
By default, a QueueReceiver
, Topic Receiver
, or TopicSubscriber
uses FIRST_MESSAGE for the first receive
call, and NEXT_MESSAGE for the subsequent receive
calls.
The get_new_orders()
procedure retrieves orders from the OE_neworders_que
. Each transaction refers to an order, and each message corresponds to an individual book in that order. The get_orders()
procedure loops through the messages to retrieve the book orders. It resets the position to the beginning of the queue using the FIRST
MESSAGE
option before the first receive. It then uses the next message navigation option to retrieve the next book (message) of an order (transaction). If it gets an exception indicating all message in the current group/transaction have been fetched, it changes the navigation option to next transaction and get the first book of the next order. It then changes the navigation option back to next message for fetching subsequent messages in the same transaction. This is repeated until all orders (transactions) have been fetched.
public void get_new_orders(QueueSession jms_session) { Queue queue; QueueReceiver qrec; ObjectMessage obj_message; BolCustomer customer; BolOrder new_order; String state; int new_orders = 1; try { /* get a handle to the new_orders queue */ queue = ((AQjmsSession) jms_session).getQueue("OE","OE_neworders_que"); qrec = jms_session.createReceiver(queue); /* set navigation to first message */ ((AQjmsTopicSubscriber)qrec).setNavigationMode(AQjmsConstants.NAVIGATION_FIRST_ MESSAGE); while(new_orders != 0) { try{ /* wait for a message to show up in the topic */ obj_message = (ObjectMessage)qrec.receiveNoWait(); if (obj_message != null) /* no more orders in the queue */ { System.out.println(" No more orders "); new_orders = 0; } new_order = (BolOrder)obj_message.getObject(); customer = new_order.getCustomer(); state = customer.getState(); System.out.println("Order: for customer " + customer.getName()); /* Now get the next message */ ((AQjmsTopicSubscriber)qrec).setNavigationMode(AQjmsConstants.NAVIGATION_NEXT_ MESSAGE); }catch(AQjmsException ex) { if (ex.getErrorNumber() == 25235) { System.out.println("End of transaction group"); ((AQjmsTopicSubscriber)qrec).setNavigationMode(AQjmsConstants.NAVIGATION_NEXT_ TRANSACTION); } else throw ex; } } }catch (JMSException ex) { System.out.println("Exception: " + ex); } }
Aside from the normal receive
, which allows the dequeuing client to delete the message from the queue, JMS provides an interface that allows the JMS client to Browse its messages in the queue. A QueueBrowser
can be created via the createBrowser
method from QueueSession
.
If a message is browsed, it remains available for further processing. Note that after a message has been browsed there is no guarantee that the message will be available to the JMS session again as a receive
call from a concurrent session might remove the message.
To prevent a viewed message from being removed by a concurrent JMS client, you can view the message in the locked mode. To do this, you need to create a QueueBrowser
with the locked mode using the AQ extension to the JMS interface.The lock on the message with a browser with locked mode is released when the session performs a commit or a rollback.
To remove the message viewed by a QueueBrowser
, the session must create a QueueReceiver
and use the JMSmesssageID
as the selector.
Refer to the QueueBrowser
Example in Point to Point features
The MessageConsumer
can remove the message from the queue or topic without retrieving the message using the receiveNoData
call. This is useful when the application has already examined the message, perhaps using the QueueBrowser
. This mode allows the JMS client to avoid the overhead of retrieving the payload from the database, which can be substantial for a large message.
In the following scenario from the BooksOnLine example, international orders destined to Mexico and Canada are to be processed separately due to trade policies and carrier discounts. Hence, a message is viewed in the locked mode (so no other concurrent user removes the message) via the QueueBrowser
and the customer country (message payload) is checked. If the customer country is Mexico or Canada the message be deleted from the queue using the remove with no data (since the payload is already known) mode. Alternatively, the lock on the message is released by the commit
call. Note that the receive call uses the message identifier obtained from the locked mode browse
.
public void process_international_orders(QueueSession jms_session) { QueueBrowser browser; Queue queue; ObjectMessage obj_message; BolOrder new_order; Enumeration messages; String customer_name; String customer_country; QueueReceiver qrec; String msg_sel; try { /* get a handle to the new_orders queue */ queue = ((AQjmsSession) jms_session).getQueue("OE", "OE_neworders_que"); /* create a Browser to look at RUSH orders */ browser = ((AQjmsSession)jms_session).createBrowser(queue, null, true); for (messages = browser.getEnumeration() ; messages.hasMoreElements() ;) { obj_message = (ObjectMessage)messages.nextElement(); new_order = (BolOrder)obj_message.getObject(); customer_name = new_order.getCustomer().getName(); customer_country = new_order.getCustomer().getCountry(); if (customer_country equals ("Canada") || customer_country equals ( "Mexico")) { System.out.println("Order for Canada or Mexico"); msg_sel = "JMSMessageID = '" + obj_message. getJMSMessageID()+ "'"; qrec = jms_session.createReceiver(queue, msg_sel); ((AQjmsQueueReceiver)qrec).receiveNoData(); } } }catch (JMSException ex) { System.out.println("Exception " + ex); } }
If the transaction receiving the message from a queue/topic fails, it is regarded as an unsuccessful attempt to remove the message. AQ records the number of failed attempts to remove the message in the message history.
In addition, it also allows the application to specify at the queue/topic level, the maximum number of retries supported on messages. If the number of failed attempts to remove a message exceed this number, the message is moved to the exception queue and is no longer available to applications.
If the transaction receiving a message aborted, this could be because of a 'bad' condition, for example, an order that could not be fulfilled because there were insufficient books in stock. Since inventory updates are made every 12 hours, it makes sense to retry
after that time. If an order was not filled after 4 attempts, this could indicates there is a problem.
AQ allows users to specify a retry_delay
along with max_retries
. This means that a message that has undergone a failed attempt at retrieving will remain visible in the queue for dequeue after 'retry_delay
' interval. Until then it will be in the 'WAITING
' state. The AQ background process, the time manager enforces the retry delay property.
The maximum retries and retry delay are properties of the queue/topic which can be set when the queue/topic is created or via the alter method on the queue/topic. The default value for MAX_RETRIES is 5.
If an order cannot be filled because of insufficient inventory, the transaction processing the order is aborted. The booked_orders
topic is set up with max_retries
= 4 and retry_delay
= 12 hours.Thus, if an order is not filled up in two days, it is moved to an exception queue.
public BolOrder process_booked_order(TopicSession jms_session) { Topic topic; TopicSubscriber tsubs; ObjectMessage obj_message; BolCustomer customer; BolOrder booked_order = null; String country; int i = 0; try { /* get a handle to the OE_bookedorders_topic */ topic = ((AQjmsSession)jms_session).getTopic("WS", "WS_bookedorders_topic"); /* Create local subscriber - to track messages for Western Region */ tsubs = jms_session.createDurableSubscriber(topic, "SUBS1", "Region = 'Western' ", false); /* wait for a message to show up in the topic */ obj_message = (ObjectMessage)tsubs.receive(10); booked_order = (BolOrder)obj_message.getObject(); customer = booked_order.getCustomer(); country = customer.getCountry(); if (country == "US") { jms_session.commit(); } else { jms_session.rollback(); booked_order = null; } }catch (JMSException ex) { System.out.println("Exception " + ex) ;} return booked_order; }
The JMS client can receive messages asynchronously by setting the MessageListener
using the setMessageListener
method available with the Consumer.
When a message arrives for the message consumer, the onMessage
method of the message listener is invoked with the message. The message listener can commit or abort the receipt of the message. The message listener will not receive messages if the JMS Connection
has been stopped. The receive
call must not be used to receive messages once the message listener has been set for the consumer.
The application processing the new orders queue can be set up for asynchronously receiving messages from the queue.
public class OrderListener implements MessageListener { QueueSession the_sess; /* constructor */ OrderListener(QueueSession my_sess) { the_sess = my_sess; } /* message listener interface */ public void onMessage(Message m) { ObjectMessage obj_msg; BolCustomer customer; BolOrder new_order = null; try { /* cast to JMS Object Message */ obj_msg = (ObjectMessage)m; /* Print some useful information */ new_order = (BolOrder)obj_msg.getObject(); customer = new_order.getCustomer(); System.out.println("Order: for customer " + customer.getName()); /* call the process order method * NOTE: we are assuming it is defined elsewhere * / process_order(new_order); /* commit the asynchronous receipt of the message */ the_sess.commit(); }catch (JMSException ex) { System.out.println("Exception " + ex) ;} } } public void setListener1(QueueSession jms_session) { Queue queue; QueueReceiver qrec; MessageListener ourListener; try { /* get a handle to the new_orders queue */ queue = ((AQjmsSession) jms_session).getQueue("OE", "OE_neworders_que"); /* create a queue receiver */ qrec = jms_session.createReceiver(queue); /* create the message listener */ ourListener = new OrderListener(jms_session); /* set the message listener for the receiver */ qrec.setMessageListener(ourListener); } catch (JMSException ex) { System.out.println("Exception: " + ex); } }
The JMS client can receive messages asynchronously for all the consumers of the session by setting the MessageListener
at the session.
When a message arrives for any of the message consumers of the session, the onMessage
method of the message listener is invoked with the message. The message listener can commit or abort the receipt of the message. The message listener will not receive messages if the JMS connection has been stopped. No other mode for receiving messages must be used in the session once the message listener has been set.
In the customer service component of the BooksOnLine example, messages from different databases arrive at the customer service topics, indicating the state of the order. The customer service application monitors the topics and whenever there is a message about a customer order, it updates the order status in the order_status_table
. The application uses the session listener to monitor the different topics. Whenever there is a message in any of the topics, the onMessage
method of the session MessageListener
is invoked.
/* define our message listener class */ public class CustomerListener implements MessageListener { TopicSession the_sess; /* constructor */ CustomerListener(TopicSession my_sess) { the_sess = my_sess; } /* message listener interface */ public void onMessage(Message m) { ObjectMessage obj_msg; BolCustomer customer; BolOrder new_order = null; try { /* cast to JMS Object Message */ obj_msg = (ObjectMessage)m; /* Print some useful information */ new_order = (BolOrder)obj_msg.getObject(); customer = new_order.getCustomer(); System.out.println("Order: for customer " + customer.getName()); /* call the update status method * NOTE: we are assuming it is defined elsewhere * / update_status(new_order, new_order.getStatus()); /* commit the asynchronous receipt of the message */ the_sess.commit(); }catch (JMSException ex) { System.out.println("Exception: " + ex); } } } public void monitor_status_topics(TopicSession jms_session) { Topic[] topic = new Topic[4]; TopicSubscriber[] tsubs= new TopicSubscriber[4]; try { /* get a handle to the OE_bookedorders_topic */ topic[0] = ((AQjmsSession)jms_session).getTopic("CS", "CS_bookedorders_topic"); tsubs[0] = jms_session.createDurableSubscriber(topic[0], "BOOKED_ORDER"); topic[1] = ((AQjmsSession)jms_session).getTopic("CS", "CS_billedorders_topic"); tsubs[1] = jms_session.createDurableSubscriber(topic[1], "BILLED_ORDER"); topic[2] = ((AQjmsSession)jms_session).getTopic("CS", "CS_backdorders_topic"); tsubs[2] = jms_session.createDurableSubscriber(topic[2], "BACKED_ORDER"); topic[3] = ((AQjmsSession)jms_session).getTopic("CS", "CS_shippedorders_topic"); tsubs[3] = jms_session.createDurableSubscriber(topic[3], "SHIPPED_ORDER"); MessageListener mL = new CustomerListener(jms_session); /* set the session's message listener */ jms_session.setMessageListener(mL); }catch(JMSException ex) { System.out.println("Exception: " + ex); } }
AQ provides four integrated mechanisms to support exception handling in applications: EXCEPTION_QUEUES
, EXPIRATION
, MAX_RETRIES
and RETRY_DELAY
.
An exception_queue
is a repository for all expired or unserviceable messages. Applications cannot directly enqueue into exception queues. However, an application that intends to handle these expired or unserviceable messages can receive/remove them from the exception queue.
To retrieve messages from exception queues, the JMS client must use the point to point interface.The exception queue for messages intended for a topic must be created in a queue table with multiple consumers enabled. Like any other queue, the exception queue must be enabled for receiving messages using the start method in the AQOracleQueue
class. You will get an exception if you try to enable it for enqueue.
The exception queue is a provider (Oracle) specific message property called "JMS_OracleExcpQ"
that can be set with the message before sending/publishing it. If an exception queue is not specified, the default exception queue is used. If the queue/topic is created in a queue table, say QTAB, the default exception queue will be called AQ$_QTAB_E
. The default exception queue is automatically created when the queue table is created.
Messages are moved to the exception queues by AQ under the following conditions:
timeToLive
. For messages intended for more than one subscriber, the message will be moved to the exception queue if one or more of the intended recipients is not able to dequeue the message within the specified timeToLive
. If the timeToLive
was not specified for the message, (either in the publish
or send
call, or as the publisher or sender), it will never expire.
receive
. The message is returned to the queue/topic and will be available for any applications that are waiting to receive messages. Since this was a failed attempt to receive the message, its retry count is updated.
If the retry count of the message exceeds the maximum value specified for the queue/topic where it resides, it is moved to the exception queue. When a message has multiple subscribers, then the message is moved to the exception queue only when all the recipients of the message have exceeded the retry limit.
A receive is considered rolled back or undone if the application aborts the entire transaction, or if it rolls back to a savepoint that was taken before the receive.
The section retry with delay interval has an example with MAX_RETRIES
. In the BooksOnLine application, the business rule for each shipping region is that an order will be placed in a back order queue if the order cannot be filled immediately. The back order application will try to fill the order once a day. If the order cannot be filled within 7 days, it is placed in an exception queue for special processing. We implement this using the Time-to-Live property of messages in conjunction with exception queues.
public void create_excp_que(TopicSession jms_session) { AQQueueTable q_table; Queue excpq; try { /* create the exception queue in the queue table with multiple * consumer flag true */ q_table = ((AQjmsSession)jms_session).getQueueTable("WS", "WS_orders_ mqtab"); AQjmsDestinationProperty dest_prop = new AQjmsDestinationProperty(); dest_prop.setQueueType(AQjmsDestinationProperty.EXCEPTION_QUEUE); excpq = ((AQjmsSession)jms_session).createQueue(q_table, "WS_back_orders_excp_que", dest_prop); /* start the exception queue for receiving (dequeuing) messages only */ ((AQjmsDestination)excpq).start(jms_session, false, true); } catch (JMSException ex) { System.out.println("Exception " + ex); } }
public static void requeue_back_order(TopicSession jms_session, String sale_region, BolOrder back_order) { Topic back_order_topic; ObjectMessage obj_message; TopicPublisher tpub; long timetolive; try { back_order_topic = ((AQjmsSession)jms_session).getTopic("WS", "WS_backorders_topic"); obj_message = jms_session.createObjectMessage(); obj_message.setObject(back_order); /* set exception queue */ obj_message.setStringProperty("JMS_OracleExcpQ", "WS.WS_back_orders_ excp_que"); tpub = jms_session.createPublisher(null); /* Set message expiration to 7 days: */ timetolive = 7*60*60*24*1000; // specified in milliseconds /* Publish the message */ tpub.publish(back_order_topic, obj_message, DeliveryMode.PERSISTENT, 1, timetolive); jms_session.commit(); } catch (Exception ex) { System.out.println("Exception :" + ex); } }
public BolOrder get_expired_order(QueueSession jms_session) { Queue queue; QueueReceiver qrec; ObjectMessage obj_message; BolCustomer customer; BolOrder exp_order = null; try { /* get a handle to the exception queue */ queue = ((AQjmsSession) jms_session).getQueue("WS", "WS_back_orders_excp_ que"); qrec = jms_session.createReceiver(queue); /* wait for a message to show up in the queue */ obj_message = (ObjectMessage)qrec.receive(); exp_order = (BolOrder)obj_message.getObject(); customer = exp_order.getCustomer(); System.out.println("Expired Order: for customer " + customer.getName()); } catch (JMSException ex) { System.out.println("Exception: " + ex); } return exp_order; }
|
![]() Copyright © 1996-2000, Oracle Corporation. All Rights Reserved. |
|