Mulesoft Parallel processing (Scatter-Gather)

There is always a bottleneck of application when application is being read or written to multiple sources. Application always takes more time to process.
Parallel processing techniques can help reduce the time it takes to process a solution and make application fast.

So what is parallel processing?
   Parallel processing is a form of process in which many process are carried out simultaneously, operating on the principle that large problems can often be divided into smaller ones, which are then solved at the same time

Mule introduces Scatter-Gather processor to implement parallel processing.  <All> message processor replaced by scatter-Gather in Mule 3.5 version with more feature. Scatter-Gather router sends a message for concurrent processing to all configured routes. It uses a thread pool to concurrently execute all routes. This means that the total time the caller thread needs to be waiting for routes to respond is no longer the sum of all route’s time, but just the longest of them. If there are no failures, Mule aggregates the results from each of the routes into a message collection. If any exception comes during Scatter-Gather process it throws a CompositeRoutingException, which maps each exception to its corresponding route.

 Flow diagrame to implement parallel processing for mule application

scatter-gather

Mule code for parallel processing (scatter-Gather)

<custom-transformer class="com.vanrish.transformer.GenerateMessageTransformer" doc:name="Java Transformer"/>
  <scatter-gather doc:name="Scatter-Gather">
    <processor-chain>
      <expression-filter expression="#[flowVars.COM != null]" doc:name="Expression Filter"/>
      <set-payload value="#[flowVars.COM]" doc:name="Set Payload"/>
      <logger message="Msg =&gt; #[payload.message]" level="DEBUG" doc:name="Msg Log"/>
      <custom-transformer class="com. vanrish.transformer.ComTransformer" doc:name="Java Transformer"/>    
    </processor-chain>
    <processor-chain>
      <expression-filter expression="#[flowVars.CON != null]" doc:name="CON Expression Filter"/>
      <set-payload value="#[flowVars.CON]" doc:name="Set Payload"/>
      <logger message="Msg =&gt; #[payload.message]" level="DEBUG" doc:name="Msg Log"/>
      <custom-transformer class="com.vanrish. transformer.ConTransformer" doc:name="Java Transformer"/>    
    </processor-chain>
    <processor-chain>
      <expression-filter expression="#[flowVars.CBA != null]" doc:name="CBA Expression Filter"/>
      <set-payload value="#[flowVars.CBA]" doc:name="Set Payload"/>
      <logger message="Msg =&gt; #[payload.message]" level="DEBUG" doc:name="Msg Log"/>
      <custom-transformer class="com. vanrish.transformer.CbaTransformer" doc:name="Java Transformer"/>    
    </processor-chain>
    <processor-chain&gt
      <expression-filter expression="#[flowVars.IDF != null]" doc:name="IDF Expression Filter"/>
      <set-payload value="#[flowVars.IDF]" doc:name="Set Payload"/>
      <logger message="IDF =&gt; #[payload.message]" level="DEBUG" doc:name="Msg Log"/>
      <custom-transformer class="com. vanrish.transformer.IdfTransformer" doc:name="Java Transformer"/>
    </processor-chain>
    <processor-chain>
      <expression-filter expression="#[flowVars.CCI != null]" doc:name="Expression Filter"/>
      <set-payload value="#[flowVars.CCI]" doc:name="Set Payload"/>
      <logger message="#[payload.message]" level="DEBUG" doc:name="Msg Log"/>
      <custom-transformer class="com. vanrish.transformer.CciTransformer" doc:name="Java Transformer"/>    
    </processor-chain>
  </scatter-gather>
  <choice doc:name="Choice">
    <when expression="#[payload is List]">
      <logger level="INFO" message="i am list" doc:name="Logger"/>
      <expression-component doc:name="Company Msg Exp"><![CDATA[payload=app.dserializeObjectToXML(message.payload[0]));]]></expression-component>
    </when>
    <otherwise>
      <logger message="CompanyMsg class" level="INFO" doc:name="Logger"/>
      <expression-component doc:name="Company Msg Exp"><![CDATA[payload=app.dserializeObjectToXML(message.payload));]]></expression-component>
    </otherwise>
   </choice>
  <set-variable variableName="messageType" value="Company" doc:name="Variable"/>

Mulesoft integration with MongoDB (NOSQL)

Mulesoft with MongoDB is one of the best combinations for big data processing.  MongoDB is leading open source NoSQL database and Mule Soft is leading open source ESB.  This blog is dedicated to integration of Mulesoft with MongoDB .

Installation and configuration of MongoDB

Install MongoDB in your system. I am using window installation of mongoDB. I installed mongoDB 3.0 in my C:\ MongoDB folder. I created data folder to store MongoDB data in C:\data.
start MongoDB server with command

> \MongoDB\Server\3.0\bin\mongod.exe" --dbpath  \ data

MongoDB server started in port –27017 and userId—admin

Now download test data from MongoDB website
 https://raw.githubusercontent.com/mongodb/docs-assets/primer-dataset/dataset.json 
save this to a file named primer-dataset.json.

Import this data into MongoDB with test instance
>  mongoimport --db test --collection restaurants --drop --file \temp\primer-dataset.json
Start mongo editor to test loaded data  
 > MongoDB\Server\3.0\bin\mongo.exe
Run this command to test your database is configured
 > db.restaurants.find().count() 
This will return result.

Configuration of MongoDB connector in Mulesoft

Now configure MongoDB connection in Mule. I am using Mule 3.7
I created small Mule flow to work with MongoDB integration with Mule. This flow getting http request to get data from MongoDB and sending those data in json object.

managoDBFlow

MongoDB connection configuration screenshot
mangoConfig

Now we have restaurants collection in MongoDB , So I use collection  as restaurants.To execute condition query I am using operation – Find object using query map

In Query Attributes I am using  — Create Object manuallybuilder

Here is full code of this implementation

       <?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:json="http://www.mulesoft.org/schema/mule/json" xmlns:mongo="http://www.mulesoft.org/schema/mule/mongo" xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
	xmlns:spring="http://www.springframework.org/schema/beans" version="EE-3.7.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/mongo http://www.mulesoft.org/schema/mule/mongo/current/mule-mongo.xsd
http://www.mulesoft.org/schema/mule/json http://www.mulesoft.org/schema/mule/json/current/mule-json.xsd">
    <http:listener-config name="HTTP_Listener_Configuration" host="0.0.0.0" port="8081" basePath="/mongo" doc:name="HTTP Listener Configuration"/>
    <mongo:config name="Mongo_DB" database="test" doc:name="Mongo DB" username="admin"/>
    <flow name="mongodbprojectFlow">
        <http:listener config-ref="HTTP_Listener_Configuration" path="/mongoproject" doc:name="HTTP"/>
        <logger message="This is Http Request and Response" level="INFO" doc:name="Logger"/>
        <mongo:find-objects-using-query-map config-ref="Mongo_DB" doc:name="Mongo DB" collection="restaurants" >
        	<mongo:query-attributes>                
            	<mongo:query-attribute key="restaurant_id">40360045</mongo:query-attribute>
        	</mongo:query-attributes>
        	<mongo:fields>
            	<mongo:field>name</mongo:field>
            	<mongo:field>cuisine</mongo:field>
            	<mongo:field>borough</mongo:field>
        </mongo:fields>
        </mongo:find-objects-using-query-map>
        <mongo:mongo-collection-to-json doc:name="Mongo DB"/>
        <json:object-to-json-transformer doc:name="Object to JSON"/>
    </flow>
</mule>