Mulesoft Parallel processing (Scatter-Gather)

There is always a bottleneck of application when application is being read or written to multiple sources. Application always takes more time to process.
Parallel processing techniques can help reduce the time it takes to process a solution and make application fast.

So what is parallel processing?
   Parallel processing is a form of process in which many process are carried out simultaneously, operating on the principle that large problems can often be divided into smaller ones, which are then solved at the same time

Mule introduces Scatter-Gather processor to implement parallel processing.  <All> message processor replaced by scatter-Gather in Mule 3.5 version with more feature. Scatter-Gather router sends a message for concurrent processing to all configured routes. It uses a thread pool to concurrently execute all routes. This means that the total time the caller thread needs to be waiting for routes to respond is no longer the sum of all route’s time, but just the longest of them. If there are no failures, Mule aggregates the results from each of the routes into a message collection. If any exception comes during Scatter-Gather process it throws a CompositeRoutingException, which maps each exception to its corresponding route.

 Flow diagrame to implement parallel processing for mule application

scatter-gather

Mule code for parallel processing (scatter-Gather)

<custom-transformer class="com.vanrish.transformer.GenerateMessageTransformer" doc:name="Java Transformer"/>
  <scatter-gather doc:name="Scatter-Gather">
    <processor-chain>
      <expression-filter expression="#[flowVars.COM != null]" doc:name="Expression Filter"/>
      <set-payload value="#[flowVars.COM]" doc:name="Set Payload"/>
      <logger message="Msg =&gt; #[payload.message]" level="DEBUG" doc:name="Msg Log"/>
      <custom-transformer class="com. vanrish.transformer.ComTransformer" doc:name="Java Transformer"/>    
    </processor-chain>
    <processor-chain>
      <expression-filter expression="#[flowVars.CON != null]" doc:name="CON Expression Filter"/>
      <set-payload value="#[flowVars.CON]" doc:name="Set Payload"/>
      <logger message="Msg =&gt; #[payload.message]" level="DEBUG" doc:name="Msg Log"/>
      <custom-transformer class="com.vanrish. transformer.ConTransformer" doc:name="Java Transformer"/>    
    </processor-chain>
    <processor-chain>
      <expression-filter expression="#[flowVars.CBA != null]" doc:name="CBA Expression Filter"/>
      <set-payload value="#[flowVars.CBA]" doc:name="Set Payload"/>
      <logger message="Msg =&gt; #[payload.message]" level="DEBUG" doc:name="Msg Log"/>
      <custom-transformer class="com. vanrish.transformer.CbaTransformer" doc:name="Java Transformer"/>    
    </processor-chain>
    <processor-chain&gt
      <expression-filter expression="#[flowVars.IDF != null]" doc:name="IDF Expression Filter"/>
      <set-payload value="#[flowVars.IDF]" doc:name="Set Payload"/>
      <logger message="IDF =&gt; #[payload.message]" level="DEBUG" doc:name="Msg Log"/>
      <custom-transformer class="com. vanrish.transformer.IdfTransformer" doc:name="Java Transformer"/>
    </processor-chain>
    <processor-chain>
      <expression-filter expression="#[flowVars.CCI != null]" doc:name="Expression Filter"/>
      <set-payload value="#[flowVars.CCI]" doc:name="Set Payload"/>
      <logger message="#[payload.message]" level="DEBUG" doc:name="Msg Log"/>
      <custom-transformer class="com. vanrish.transformer.CciTransformer" doc:name="Java Transformer"/>    
    </processor-chain>
  </scatter-gather>
  <choice doc:name="Choice">
    <when expression="#[payload is List]">
      <logger level="INFO" message="i am list" doc:name="Logger"/>
      <expression-component doc:name="Company Msg Exp"><![CDATA[payload=app.dserializeObjectToXML(message.payload[0]));]]></expression-component>
    </when>
    <otherwise>
      <logger message="CompanyMsg class" level="INFO" doc:name="Logger"/>
      <expression-component doc:name="Company Msg Exp"><![CDATA[payload=app.dserializeObjectToXML(message.payload));]]></expression-component>
    </otherwise>
   </choice>
  <set-variable variableName="messageType" value="Company" doc:name="Variable"/>

5 Replies to “Mulesoft Parallel processing (Scatter-Gather)”

  1. The article is good and is précised to describe the Scatter-Gather router. However can you please list down the practical scenarios of using this router for parallel processing. Also in this situation(mule flow), if some process takes too long to output the result or might be choked then what would be the impact on whole result. Advantages/disadvantages etc.

    Also in the flow you’ve used a choice router after the Scatter-Gather. My point is that since the Scatter-Gather aggregates the results in some specific data format like Map, LinkedList, ArrayList…etc so how the choice router comes in this picture because I need to pass conditions(if case) in choice router for further processing. May be I have not fully understand the scenario of the flow, so please can you please let me know the scenario you have used here.

  2. Garuav,
    In scatter gather process longest thread count as total time to process all data. If one thread finish early and 2nd thread are taking more time, then first thread wait for to finish all other threads to finish.

    your 2nd answer In my example I am getting one instance and multiple instance of object so based on number of instances I am processing data.

    Thanks

  3. Charanteja,
    This is complete project. Before this flow I am getting data from database as object and processing through scatter-gather

    Thanks

  4. hi,

    Can anyone explain me based on what stratergy does it merge the data?…in the sense if i get 5 fields from one database and 6 fields from second database..my scatter gather will give me total 11 fields data. Can u explain me how does it map the data (merge stratergy?)?

Leave a Reply

Your email address will not be published. Required fields are marked *