Wednesday, December 27, 2017

Durable Subscription in WSO2 ESB with WSO2 MB


In this post, We are going to implement a jms service in WSO2 ESB which has the durable subscription with WSO2 MB Topic. The Durable Subscriber avoids missing messages while subscriber not listening for topic. Once the subscriber appears online, The messages will reached to the subscriber.

It is always better to implement an inbound endpoint in WSO2 ESB when the subscription is durable instead of JMS proxy service. Since durable subscription creates a subscription with Message broker with the given subcription name, another service can not create a subscription with the same name. If a JMS proxy service is created with the durable subscription name, It will work for single instance. But will cause the issue in clustered environment because of It gets deployed in all instances of the ESB cluster.

Inbound endpoint has the coordination support in the cluster. So that It can only running in a single instance at a given time. So there will be no issue with durable subscription too. Once a node which runs inbound endpoint went down, It automatically schedule to run another node in a cluster. 

For this sample demonstration, The WSO2 EI 6.1.1 used since It has the Integrator(ESB) and Broker(MB) as the profile. So using one product we can implement above scenario.

1) First we need to start a WSO2 Message Broker.
    go to the EI_HOME/bin and run the broker.sh
    sh broker.sh
Then It will start the broker profile

You need to configure the Integrator to connect with broker. Please follow the link How to Integrate WSO2 ESB with WSO2 MB for configuring integrator if you have not configured integrator.

2) Start the Integrator.
      go to the EI_HOME/bin and run the integrator.sh
      sh integrator.sh

Now we have configured Message broker and Integrator. Both servers are up and running. Need to deploy the inbound endpoint on the integrator to consume the message from MB topic.

Below is the inbound configuration. Once It deployed, It will creates a subscription with MB and started to consume messages from the topic defined with transport.jms.Destination.
The inbound endpoint will inject the messages once It pull from topic to to the sequence LogMsgSeq and If there is any error while mediating the message, It will inject to LogErrorSeq.

 <?xml version="1.0" encoding="UTF-8"?>  
 <inboundEndpoint name="DurableTopicInboundListener" onError="LogErrorSeq"   
 protocol="jms" sequence="LogMsgSeq" suspend="false" xmlns="http://ws.apache.org/ns/synapse">  
   <parameters>  
     <parameter name="interval">1000</parameter>  
     <parameter name="sequential">true</parameter>  
     <parameter name="coordination">true</parameter>  
     <parameter name="transport.jms.Destination">TestInboundTopicDurable</parameter>  
     <parameter name="transport.jms.CacheLevel">3</parameter>  
     <parameter name="transport.jms.ConnectionFactoryJNDIName">TopicConnectionFactory</parameter>  
     <parameter name="java.naming.factory.initial">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter>  
     <parameter name="java.naming.provider.url">conf/jndi.properties</parameter>  
     <parameter name="transport.jms.SessionAcknowledgement">AUTO_ACKNOWLEDGE</parameter>  
     <parameter name="transport.jms.SessionTransacted">false</parameter>  
     <parameter name="transport.jms.SubscriptionDurable">true</parameter>  
     <parameter name="transport.jms.ConnectionFactoryType">topic</parameter>  
     <parameter name="transport.jms.DurableSubscriberClientID">test_inbound_sbscrb</parameter>  
     <parameter name="transport.jms.ContentType">application/json</parameter>  
     <parameter name="transport.jms.DurableSubscriberName">test_inbound_sbscrb</parameter>  
     <parameter name="transport.jms.SharedSubscription">false</parameter>  
   </parameters>  
 </inboundEndpoint>  

 <sequence name="LogMsgSeq" trace="disable" xmlns="http://ws.apache.org/ns/synapse">  
   <log description="msg" level="full"/>  
 </sequence>  

 <sequence name="LogErrorSeq" trace="disable" xmlns="http://ws.apache.org/ns/synapse">  
   <log description="Error" level="custom">  
     <property expression="$ctx:ERROR_MESSAGE" name="Error Message"/>  
   </log>  
 </sequence>  

In above configuration, transport.jms.DurableSubscriberName and transport.jms.DurableSubscriberClientID is important when creating durable subscription with MB. Without the parameter it wont be a durable subscription. And please make sure not to add any inbound configuration or proxy service with the same DurableSubscriberName and ID. Those parameter should be unique for services to service.

Now you can test the inbound endpoint by publishing message to the topic. If you log into the management console of MB, you can see a topic named TestInboundTopicDurable. Publish a message to that topic. Then Integrator terminal will log the message content by pulling the message from the topic. Then stop the Integration and publish messages to topic. Again star the Integrator and you can see Integrator consumes thos messages as well even It had been stopped at the time of messages was published.


No comments:

Post a Comment