|
| 1 | +--- |
| 2 | +title: Storm JMS Integration |
| 3 | +layout: documentation |
| 4 | +documentation: true |
| 5 | +--- |
| 6 | +## Example Storm JMS Topology |
| 7 | + |
| 8 | +The storm-jms source code contains an example project (in the "examples" directory) |
| 9 | +builds a multi-bolt/multi-spout topology (depicted below) that uses the JMS Spout and JMS Bolt components. |
| 10 | + |
| 11 | + |
| 12 | + |
| 13 | +The green components represent instances of the storm-jms components. White components represent |
| 14 | +"standard" Storm bolts (in the example these bolts are instances of `GenericBolt` which simply logs |
| 15 | +information about the tuples it receives and emits). |
| 16 | + |
| 17 | +Grey arrows represent JMS messages, while black arrows represent the flow of Storm tuple objects. |
| 18 | + |
| 19 | +### JMS Transactions and Gauranteed Processing |
| 20 | +The example is set up to be "transactional," meaning the JMS Spout will use Storm's guranteed |
| 21 | +processing capabilities to determine if a JMS Message should be acknowledged. Each bolt in the |
| 22 | +topology will anchor to each tuple it receives. If every bolt successfully processes and acks |
| 23 | +each tuple in the chain, the original JMS Message will be acknowledged, and the underlying |
| 24 | +JMS implementation will not attempt re-delivery of the message. If a bolt fails to process/ack |
| 25 | +a tuple, the JMS message will not be acknowledged, and the JMS implementation will queue the |
| 26 | +message for redelivery. |
| 27 | + |
| 28 | +### Data Flow |
| 29 | +The topology contains two chains: One originating from a JMS Spout connected to a Queue, and |
| 30 | +another originating from a JMS Spout connected to a Topic. |
| 31 | + |
| 32 | +**Chain #1** |
| 33 | + |
| 34 | +1. The "JMS Queue Spout" receives a JMS Message object from the queue, and emits a |
| 35 | +tuple to the "Intermediate Bolt" |
| 36 | +2. The "Intermediate Bolt" emits a tuple to the "Final Bolt" and the "JMS Topic Bolt", and acks |
| 37 | +the tuple it recieved. |
| 38 | +3. The "Final Bolt" receives the tuple and simply acks it, it does not emit anything. |
| 39 | +4. The "JMS Topic Bolt" receives a tuple, constructs a JMS Message from the tuple's values, |
| 40 | +and publishes the message to a JMS Topic. |
| 41 | +5. If the "JMS Topic Bolt" successfully publishes the JMS message, it will ack the tuple. |
| 42 | +6. The "JMS Queue Spout" will recieve notification if all bolts in the chain have acked |
| 43 | +and acknowledge the original JMS Message. If one or more bolts in the chain fail to ack a tuple, the |
| 44 | +"JMS Queue Spout" will not acknowledge the JMS message. |
| 45 | + |
| 46 | +**Chain #2** |
| 47 | + |
| 48 | +1. The "JMS Topic Spout" receives a JMS message from the topic and emits a tuple to "Another Bolt." |
| 49 | +2. The "Another Bolt" receives and acks the tuple. |
| 50 | +3. The "JMS Topic Spout" acknowledges the JMS message. |
| 51 | + |
| 52 | + |
| 53 | +### Building the Example Topology |
| 54 | + |
| 55 | + $ cd storm-jms |
| 56 | + $ mvn clean install |
| 57 | + |
| 58 | + |
| 59 | + |
| 60 | +### Running the Example Topology Locally |
| 61 | + |
| 62 | +The example uses ApacheMQ 5.4.0 for JMS. Download and install it from |
| 63 | +[Apache ActiveMQ Downloads](http://activemq.apache.org/download.html). |
| 64 | + |
| 65 | +There is no specific configuration required, simply start ActiveMQ: |
| 66 | + |
| 67 | + $ [ACTIVEMQ_HOME]/bin/activemq |
| 68 | + |
| 69 | +Run the example topology from the `examples` directory: |
| 70 | + |
| 71 | + $ mvn exec:java |
| 72 | + |
| 73 | +When the topology runs, it will connect to ActiveMQ and the following JMS Destinations will be created: |
| 74 | + |
| 75 | + backtype.storm.contrib.example.queue |
| 76 | + backtype.storm.contrib.example.topic |
| 77 | + |
| 78 | +To publish a message to the `backtype.storm.contrib.example.queue` queue: |
| 79 | + |
| 80 | +1. Open the ActiveMQ Queue admin console: http://localhost:8161/admin/queues.jsp |
| 81 | +2. Click the [Send To](http://localhost:8161/admin/send.jsp?JMSDestination=backtype.storm.contrib.example.queue&JMSDestinationType=queue) |
| 82 | +link for the `backtupe.storm.example.queue` queue entry. |
| 83 | +3. On the "Send a JMS Message" form, select the "Persistent Delivery" checkbox, enter |
| 84 | +some text for the message body, and click "Send". |
| 85 | + |
| 86 | + |
| 87 | +In the terminal you should see the following among the output: |
| 88 | + |
| 89 | + DEBUG (backtype.storm.contrib.jms.bolt.JmsBolt:183) - Connecting JMS.. |
| 90 | + DEBUG (backtype.storm.contrib.jms.spout.JmsSpout:213) - sending tuple: ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:budreau.home-51286-1321074044423-2:4:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:budreau.home-51286-1321074044423-2:4:1:1, destination = queue://backtype.storm.contrib.example.queue, transactionId = null, expiration = 0, timestamp = 1321735055910, arrival = 0, brokerInTime = 1321735055910, brokerOutTime = 1321735055921, correlationId = , replyTo = null, persistent = true, type = , priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = org.apache.activemq.util.ByteSequence@6c27ca12, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {secret=880412b7-de71-45dd-8a80-8132589ccd22}, readOnlyProperties = true, readOnlyBody = true, droppable = false, text = Hello storm-jms!} |
| 91 | + DEBUG (backtype.storm.contrib.jms.spout.JmsSpout:219) - Requested deliveryMode: CLIENT_ACKNOWLEDGE |
| 92 | + DEBUG (backtype.storm.contrib.jms.spout.JmsSpout:220) - Our deliveryMode: CLIENT_ACKNOWLEDGE |
| 93 | + DEBUG (backtype.storm.contrib.jms.spout.JmsSpout:224) - Requesting acks. |
| 94 | + DEBUG (backtype.storm.contrib.jms.example.GenericBolt:60) - [INTERMEDIATE_BOLT] Received message: source: 1:10, stream: 1, id: {-7100026097570233628=-7100026097570233628}, [Hello storm-jms!] |
| 95 | + DEBUG (backtype.storm.contrib.jms.example.GenericBolt:66) - [INTERMEDIATE_BOLT] emitting: source: 1:10, stream: 1, id: {-7100026097570233628=-7100026097570233628}, [Hello storm-jms!] |
| 96 | + DEBUG (backtype.storm.contrib.jms.example.GenericBolt:75) - [INTERMEDIATE_BOLT] ACKing tuple: source: 1:10, stream: 1, id: {-7100026097570233628=-7100026097570233628}, [Hello storm-jms!] |
| 97 | + DEBUG (backtype.storm.contrib.jms.bolt.JmsBolt:136) - Tuple received. Sending JMS message. |
| 98 | + DEBUG (backtype.storm.contrib.jms.example.GenericBolt:60) - [FINAL_BOLT] Received message: source: 2:2, stream: 1, id: {-7100026097570233628=-5393763013502927792}, [Hello storm-jms!] |
| 99 | + DEBUG (backtype.storm.contrib.jms.example.GenericBolt:75) - [FINAL_BOLT] ACKing tuple: source: 2:2, stream: 1, id: {-7100026097570233628=-5393763013502927792}, [Hello storm-jms!] |
| 100 | + DEBUG (backtype.storm.contrib.jms.bolt.JmsBolt:144) - ACKing tuple: source: 2:2, stream: 1, id: {-7100026097570233628=-9118586029611278300}, [Hello storm-jms!] |
| 101 | + DEBUG (backtype.storm.contrib.jms.spout.JmsSpout:251) - JMS Message acked: ID:budreau.home-51286-1321074044423-2:4:1:1:1 |
| 102 | + DEBUG (backtype.storm.contrib.jms.spout.JmsSpout:213) - sending tuple: ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:budreau.home-60117-1321735025796-0:0:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:budreau.home-60117-1321735025796-0:0:1:1, destination = topic://backtype.storm.contrib.example.topic, transactionId = null, expiration = 0, timestamp = 1321735056258, arrival = 0, brokerInTime = 1321735056260, brokerOutTime = 1321735056260, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, text = source: 2:2, stream: 1, id: {-710002609757023... storm-jms!]} |
| 103 | + DEBUG (backtype.storm.contrib.jms.spout.JmsSpout:219) - Requested deliveryMode: CLIENT_ACKNOWLEDGE |
| 104 | + DEBUG (backtype.storm.contrib.jms.spout.JmsSpout:220) - Our deliveryMode: CLIENT_ACKNOWLEDGE |
| 105 | + DEBUG (backtype.storm.contrib.jms.spout.JmsSpout:224) - Requesting acks. |
| 106 | + DEBUG (backtype.storm.contrib.jms.example.GenericBolt:60) - [ANOTHER_BOLT] Received message: source: 5:9, stream: 1, id: {-5117078009445186058=-5117078009445186058}, [source: 2:2, stream: 1, id: {-7100026097570233628=-9118586029611278300}, [Hello storm-jms!]] |
| 107 | + DEBUG (backtype.storm.contrib.jms.example.GenericBolt:75) - [ANOTHER_BOLT] ACKing tuple: source: 5:9, stream: 1, id: {-5117078009445186058=-5117078009445186058}, [source: 2:2, stream: 1, id: {-7100026097570233628=-9118586029611278300}, [Hello storm-jms!]] |
| 108 | + DEBUG (backtype.storm.contrib.jms.spout.JmsSpout:251) - JMS Message acked: ID:budreau.home-60117-1321735025796-0:0:1:1:1 |
| 109 | + |
| 110 | +The topology will run for 2 minutes, then gracefully shut down. |
| 111 | + |
0 commit comments