Extensively worked on IOT, Microservices, APIs,SOA application, Cloud, Amazon AWS,Big Data, Analytics, Artificial intelligence and Security.
I described in my previous blog about parallel processing, in this blog also I am continuing my previous blog but adding new flavor of parallel processing in Mulesoft.
Splitter flow control splits a message into separate fragments and then sends these fragments parallel and concurrent to the next message processor in the flow. Segments are identified based on an expression parameter, usually written in Mule Expression Language (MEL), but other formats can be employed also. There are three ways we can spilt Mulesoft message.
Splitter — Splitter can split all types of data like Object, XML, JSON, and Payload based on MEL (Mule expression language) and processes each split into individual thread. .
Collection Splitter – If input type is collection, then collection splitter split data based on collection and process each element in individual thread.
Chunk Splitter – Chunk of splitter spilts message into chunk of bytes based on user input and processes each chunk of bytes in individual thread.
Since Collection splitter is one of the most usable splitter in Mulesoft. So I am showing example about Collection splitter.
In this example HasMap is coming as payload. Collection Splitter splits HasMap into different threads (limiting max 50 threads) and processes these threads in parallel.
Mulesoft code for parallel processing (Splitter)
<flow name="Vanrish-processFlow" processingStrategy="allow50Threads"> <logger message="*** Starting Vanrish-processFlow ***" category="edi-vanrish-process" level="INFO" doc:name="Flow Start Log"/> <set-payload value="#[map-payload:processing]" doc:name="Payload Processing"/> <set-variable variableName="numberOfMessages" value="#[payload.entrySet().size()]" doc:name="Variable"/> <logger message="Processing #[flowVars['numberOfMessages']] entities" level="INFO" doc:name="logger-status go to database"/> <splitter doc:name="Collection Splitter" expression="#[payload.entrySet()]"/> <vm:outbound-endpoint exchange-pattern="one-way" path="VanrishVM" doc:name="VM"/> <logger message="*** Ending Vanrish-processFlow ***" category="edi-Vanrish-process" level="INFO" doc:name="Flow End Log"/> </flow> <flow name="Vanrish_Splitter_Demo" processingStrategy="allow50Threads"> <vm:inbound-endpoint exchange-pattern="one-way" path="VanrishVM" doc:name="VM"/> <logger message="Company Canonical Start Time -> #[server.dateTime]" level="INFO" doc:name="Company Logger"/> <flow-ref name="vanrishMsgPrivateFlow" doc:name="companyMsgPrivateFlow"/> </flow>