Wednesday, December 27, 2017

How to Integrate WSO2 ESB with WSO2 MB as a Message Broker

This post will describe how to integrate WSO2 ESB with WSO2 Message Broker. This will provide the configuration required for ESB to connect to the Message Broker.

I am using the WSO2 EI 6.1.1 since It has both Integrator and Broker product reside as the profiles.


First we need to start the message broker. Message broker profile can be found under EI_HOME/wso2/broker directory.

1) Start the the message broker by executing the broker.sh in ESB_HOME/bin
     sh broker.sh
As an alternative, you can start the broker by executing the wso2server.sh under EI_HOME/wso2/broker/bin directory. Both will do the same thing that message broker starts.


Now Message broker is up and running. We need to configure the Integrator(ESB) with the broker details.

2) Add the below configuration in jndi.properties file under the EI_HOME/conf directory if they are not exist. The configuration has the host name and the ports information where broker is running.

 connectionfactory.QueueConnectionFactory = amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5675'  
 connectionfactory.TopicConnectionFactory = amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5675'  
 
3) Enable the JMS transport listener and sender configuration in axis2.xml of the Integrator. It can be found on EI_HOME/conf/axis2/axis2.xml
  by default It is commented out. So It has to be enabled as bellow.
  The java.naming.provider.url define the conf/jndi.properties and according to the configuration
  in jndi.properties file we have configured above, Integrator will connect with the broker.

 <transportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener">  
     <parameter name="myTopicConnectionFactory" locked="false">  
       <parameter name="java.naming.factory.initial" locked="false">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter>  
       <parameter name="java.naming.provider.url" locked="false">conf/jndi.properties</parameter>  
       <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">TopicConnectionFactory</parameter>  
       <parameter name="transport.jms.ConnectionFactoryType" locked="false">topic</parameter>  
     </parameter>  
     <parameter name="myQueueConnectionFactory" locked="false">  
       <parameter name="java.naming.factory.initial" locked="false">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter>  
       <parameter name="java.naming.provider.url" locked="false">conf/jndi.properties</parameter>  
       <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>  
       <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>  
     </parameter>  
     <parameter name="default" locked="false">  
       <parameter name="java.naming.factory.initial" locked="false">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter>  
       <parameter name="java.naming.provider.url" locked="false">conf/jndi.properties</parameter>  
       <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>  
       <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>  
     </parameter>  
   </transportReceiver>  

  <transportSender name="jms" class="org.apache.axis2.transport.jms.JMSSender"/>  

Now All the requied configuration has been done in Integrator. You can start the integrator.

4) Start the Integrator.
      go to the EI_HOME/bin and run the integrator.sh
      sh integrator.sh

The integrator is ready to consume message and publish message with WSO2 MB. You can deploy jms services such as JMS proxy service or Inbound Endpoint in ESB and They will consume messages from Message Broker.

Durable Subscription in WSO2 ESB with WSO2 MB


In this post, We are going to implement a jms service in WSO2 ESB which has the durable subscription with WSO2 MB Topic. The Durable Subscriber avoids missing messages while subscriber not listening for topic. Once the subscriber appears online, The messages will reached to the subscriber.

It is always better to implement an inbound endpoint in WSO2 ESB when the subscription is durable instead of JMS proxy service. Since durable subscription creates a subscription with Message broker with the given subcription name, another service can not create a subscription with the same name. If a JMS proxy service is created with the durable subscription name, It will work for single instance. But will cause the issue in clustered environment because of It gets deployed in all instances of the ESB cluster.

Inbound endpoint has the coordination support in the cluster. So that It can only running in a single instance at a given time. So there will be no issue with durable subscription too. Once a node which runs inbound endpoint went down, It automatically schedule to run another node in a cluster. 

For this sample demonstration, The WSO2 EI 6.1.1 used since It has the Integrator(ESB) and Broker(MB) as the profile. So using one product we can implement above scenario.

1) First we need to start a WSO2 Message Broker.
    go to the EI_HOME/bin and run the broker.sh
    sh broker.sh
Then It will start the broker profile

You need to configure the Integrator to connect with broker. Please follow the link How to Integrate WSO2 ESB with WSO2 MB for configuring integrator if you have not configured integrator.

2) Start the Integrator.
      go to the EI_HOME/bin and run the integrator.sh
      sh integrator.sh

Now we have configured Message broker and Integrator. Both servers are up and running. Need to deploy the inbound endpoint on the integrator to consume the message from MB topic.

Below is the inbound configuration. Once It deployed, It will creates a subscription with MB and started to consume messages from the topic defined with transport.jms.Destination.
The inbound endpoint will inject the messages once It pull from topic to to the sequence LogMsgSeq and If there is any error while mediating the message, It will inject to LogErrorSeq.

 <?xml version="1.0" encoding="UTF-8"?>  
 <inboundEndpoint name="DurableTopicInboundListener" onError="LogErrorSeq"   
 protocol="jms" sequence="LogMsgSeq" suspend="false" xmlns="http://ws.apache.org/ns/synapse">  
   <parameters>  
     <parameter name="interval">1000</parameter>  
     <parameter name="sequential">true</parameter>  
     <parameter name="coordination">true</parameter>  
     <parameter name="transport.jms.Destination">TestInboundTopicDurable</parameter>  
     <parameter name="transport.jms.CacheLevel">3</parameter>  
     <parameter name="transport.jms.ConnectionFactoryJNDIName">TopicConnectionFactory</parameter>  
     <parameter name="java.naming.factory.initial">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter>  
     <parameter name="java.naming.provider.url">conf/jndi.properties</parameter>  
     <parameter name="transport.jms.SessionAcknowledgement">AUTO_ACKNOWLEDGE</parameter>  
     <parameter name="transport.jms.SessionTransacted">false</parameter>  
     <parameter name="transport.jms.SubscriptionDurable">true</parameter>  
     <parameter name="transport.jms.ConnectionFactoryType">topic</parameter>  
     <parameter name="transport.jms.DurableSubscriberClientID">test_inbound_sbscrb</parameter>  
     <parameter name="transport.jms.ContentType">application/json</parameter>  
     <parameter name="transport.jms.DurableSubscriberName">test_inbound_sbscrb</parameter>  
     <parameter name="transport.jms.SharedSubscription">false</parameter>  
   </parameters>  
 </inboundEndpoint>  

 <sequence name="LogMsgSeq" trace="disable" xmlns="http://ws.apache.org/ns/synapse">  
   <log description="msg" level="full"/>  
 </sequence>  

 <sequence name="LogErrorSeq" trace="disable" xmlns="http://ws.apache.org/ns/synapse">  
   <log description="Error" level="custom">  
     <property expression="$ctx:ERROR_MESSAGE" name="Error Message"/>  
   </log>  
 </sequence>  

In above configuration, transport.jms.DurableSubscriberName and transport.jms.DurableSubscriberClientID is important when creating durable subscription with MB. Without the parameter it wont be a durable subscription. And please make sure not to add any inbound configuration or proxy service with the same DurableSubscriberName and ID. Those parameter should be unique for services to service.

Now you can test the inbound endpoint by publishing message to the topic. If you log into the management console of MB, you can see a topic named TestInboundTopicDurable. Publish a message to that topic. Then Integrator terminal will log the message content by pulling the message from the topic. Then stop the Integration and publish messages to topic. Again star the Integrator and you can see Integrator consumes thos messages as well even It had been stopped at the time of messages was published.


Tuesday, December 26, 2017

Java Keytool Comands

Java Keytool is useful tool for managing the certificates. It allows users to manage their own public/private key pairs and certificates. Below commands will be used when working on keytool.

Create a JKS file with a private certificate.
 keytool -genkey -alias nuwanw -keyalg RSA -keysize 1024 -keypass mypassoword  
 -keystore sample.jks -storepass mypassword  

List the alias of a given keystore
 keytool -list -v -keystore sample.jks -storepass mypassword  

Export public certificate of the service alies nuwanw
 keytool -export -alias nuwanw -keystore sample.jks -storepass mypassword   
 -file publicCert.pem  

Import public certificate into a Trust Store.
 keytool -import -alias nuwanw -file serviceCert.pem -keystore sample-trust-store.jks   
 -storepass mypassword