DataWeave:A New Era in Mulesoft

Dataweave is a new data mapping tool which comes with MuleSoft 3.7 run time. Before Mule 3.7 runtime, Datamapper was there for data mapping. Dataweave inherit some of the functionality from Datamapper but due to restriction of complex mapping in Datamapper, Dataweave emerged with Mulsesoft 3.7 runtime.
There are three section of Dataweave.
1) Input
2) Output
3) Data transformation language

Data transformation language is based on JSON like language. If you are familiar with JSON, it is very easy to write Dataweave transformation logic and maintain this logic.

Here are some tips to write and maintain Dataweave transformation logic.

1)   Default Output of Dataweave Transformation is set into payload. But you can easily     change from dropdown and set this output as variable or session variable
dataweave

2) Output of transformed data you can easily define in Dataweave. If you are transforming data into XML you just need to define as “ %output application/xml” without touching underline transformation logic. Same way if you are transforming your data into json or any other format you just need to define output like without touching underline transformation logic as “ %output application/json”, “ %output application/java”, “ %output application/csv”..

%dw 1.0
%output application/xml
%namespace ns0 http://schemas.vanrish.com/2010/01/ldaa

3)  Dataweave transformation logic gives leverage to skip any null field during data transformation. This is only declarative. Here is declaration to skipping null fields everywhere during transformation skipNullOn=”everywhere”

%dw 1.0
%output application/xml skipNullOn=”everywhere”
%namespace ns0 http://schemas.vanrish.com/2010/01/ldaa


4)  Dataweave transformation allows to access flowVars and sessionVars directly into transformation field.

orderParameter:{
         name:“MINOR_LI_ID”,
         value:flowVars.payloadVar.minorLiId
}

5)  Dataweave transformation reads properties value directly from properties file. You can access properties value in Dataweave like you are accessing during flow.

teamName:“$($.teameName)${teamNameSuffice},
  teamNameSuffice is defined in properties file.

6)  Dataweave transformation allows implementing condition logic for each field. It is very easy to implement and transform your data based on these condition logic. Here is example to implement to condition logic for field partnershipType.

partnershipType:“NEW” when $.partnership-type==”NEW”
                            otherwise “USED” when $.partnership-type==”USED”
                            otherwise “EM” when $.partnership-type==”EM”
                            otherwise “PU” when $.partnership-type==”PU”
                            otherwise “PN” when $.partnership-type==”PN”
                            otherwise $.partnership-type,
Here is another example

creativeType:“GRAPHIC” when ($.creative-type == “GRAPHIC” or $.creative-type == null)
                         otherwise “TEMPLATED_AD” when $.creative-type == “TEMP_AD”
                         otherwise “AD_TAG” when $.creative-type == “AD_TAG”
                         otherwise “”,

7)  During Dataweave transformation logic you can call global function and transform your field based on these function output. This is one of the ways you can call java object into Dataweave transformation.

Here is example
global-functions is tag for mule-config file where you can define function for entire mule flow

<global-functions>

def toUUID() {
return java.util.UUID.randomUUID().toString()
}

def getImageType(imageName) {
return imageName.substring(imageName.lastIndexOf(‘.’)+1)
}

</global-functions>

Now you can call this function inside Dataweave transformation logic.

 imageType:getImageType($.origin-name as :string) when $.origin-name != null otherwise “”,

Mulesoft:Flow Execution Time

Mulesoft application is based on flows. Every flow has their own execution time. We can calculate this flow execution is couple of ways. But Mulesoft provides one of the easy way to calculate this flow execution time by using interceptor . Timer interceptor (<timer-interceptor/>) is one of the mule interceptor to calculate Mulesoft flow execution time.

Here is flow diagram for timer-interceptor

muleTimerInterceptorFlow

Here is code for timer-interceptor to implement in your application


<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"       xmlns:spring="http://www.springframework.org/schema/beans"       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsdhttp://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd">

<http:listener-config name="HTTP_Listener_Configuration" host="0.0.0.0" port="8081" basePath="/demo" doc:name="HTTP Listener Configuration"/>

<flow name="muleTimerInterceptorFlow">
<http:listener config-ref="HTTP_Listener_Configuration" path="/" doc:name="HTTP"/>               <timer-interceptor/>
     <set-payload doc:name="Set Payload" value="Hello World"/>
<logger message="#[payload]" level="INFO" doc:name="Logger"/>
</flow>
</mule>

 <timer-interceptor/> tag display time in milliseconds.

You can customize flow execution time to replace <timer-interceptor/>  with  <custom-interceptor>.
In this custom interceptor you need to mention your custom interceptor java class.

<custom-interceptor class=”com.vanrish.interceptor.TimerInterceptor” />

Here is flow diagram for custom timer-interceptor

muleTimerCustomInterceptorFlow

Here is mule-config.xml  code for custom timer-interceptor


<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"       xmlns:spring="http://www.springframework.org/schema/beans"       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd">

<http:listener-config name="HTTP_Listener_Configuration" host="0.0.0.0" port="8081" basePath="/demo" doc:name="HTTP Listener Configuration"/>

<flow name="muleTimerInterceptorFlow">

<http:listener config-ref="HTTP_Listener_Configuration" path="/" doc:name="HTTP"/>

 <custom-interceptor class="com.vanrish.interceptor.TimerInterceptor" />

<set-payload doc:name="Set Payload" value="Hello World"/>
<logger message="#[payload]" level="INFO" doc:name="Logger"/>
</flow>
</mule>

Java TimerInterceptor  code for custom timer-interceptor tag


package com.vanrish.interceptor;

 

import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.mule.api.MuleEvent;
import org.mule.api.MuleException;
import org.mule.api.interceptor.Interceptor;
import org.mule.processor.AbstractInterceptingMessageProcessor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/**
* <code>TimerInterceptor</code> simply times and displays the time taken to
* process an event.

*/

public class TimerInterceptor extends AbstractInterceptingMessageProcessor

implements Interceptor {

/**

* logger used by this class

*/

private static Log logger = LogFactory.getLog(TimerInterceptor.class);

public MuleEvent process(MuleEvent event) throws MuleException {
long startTime = System.currentTimeMillis();
DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
Date stdate = new Date();
String start = dateFormat.format(stdate);
System.out.println(start);

MuleEvent resultEvent = processNext(event);

Date enddate = new Date();
String end = dateFormat.format(enddate);

if (logger.isInfoEnabled()) {

long executionTime = System.currentTimeMillis() - startTime;
            logger.info("Custom Timer : "+resultEvent.getFlowConstruct().getName() + " Start at "+start+" and end at "+end +" it took " + executionTime + "ms to process event ["                   + resultEvent.getId() + "]");
}
return resultEvent;
}
}

Mulesoft Polling : A New Perspective

Mulesoft polling is one of the most important features in Mulesoft. If you are dealing with large set of data or asynchronous workflow, polling architect is one of the best options to manage this scenario.
There are two options to configure Mulesoft poll scheduling.

  1. Fixed frequency scheduler – you can define polling time in frequency, start delay and time unit. This is one of the simplest ways to define your polling.
  2. Cron scheduler – Cron scheduler gives ability to use expression language and manage complex scheduling polling.

There is no relationship between two polling. This was challenge for me to get the relationship between two polling so that I can manage my data more efficiently.

Mulesoft gives couple of options to set up relationship between two polls.

Watermark – In polling there is always challenge to process newly created data and keep persist pointer for processed data to avoid duplicate processing. Mulesoft allows us as Watermark to persist this pointer in objectstore. Mule sets a watermark to a default value the first time the flow runs, then uses it as necessary when running a query or making an outbound request. Based on flow Mule may update the original value of the watermark or maintain the original value.
Here is simple flow to show how to implement watermark for poll

Poll Watermark Flow Diagram
poll-watermark

Here is code for this flow


<?xml version="1.0" encoding="UTF-8"?>
  <mule xmlns:schedulers="http://www.mulesoft.org/schema/mule/schedulers" xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking"
   xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:mulexml="http://www.mulesoft.org/schema/mule/xml" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
   xmlns:spring="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://www.mulesoft.org/schema/mule/db http://www.mulesoft.org/schema/mule/db/current/mule-db.xsd
   http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
   http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
   http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
   http://www.mulesoft.org/schema/mule/xml http://www.mulesoft.org/schema/mule/xml/current/mule-xml.xsd
   http://www.mulesoft.org/schema/mule/ee/tracking http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd
   http://www.mulesoft.org/schema/mule/schedulers http://www.mulesoft.org/schema/mule/schedulers/current/mule-schedulers.xsd">
  <spring:beans>
     <spring:bean id="dataSourceBean" name="dataSource_Bean" class="org.apache.commons.dbcp.BasicDataSource">
       <spring:property name="driverClassName" value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/>
       <spring:property name="username" value="********"/>
       <spring:property name="password" value="********* "/>
       <spring:property name="url" value="jdbc:jtds:sqlserver://localhost:60520;Instance=CRM;DatabaseName=vanrish;domain=man;integrated security=false"/>
    </spring:bean>
 </spring:beans>
  <db:generic-config name="Generic_Database_Configuration" dataSource-ref="dataSourceBean" doc:name="Generic Database Configuration" >
     <reconnect-forever frequency="30000"/>
  </db:generic-config>
<flow name="poll-watermarking" processingStrategy="synchronous">
  <poll doc:name="Poll">
    <schedulers:cron-scheduler expression="0 22 12 * * ?"/>
    <watermark variable="serialNumber" default-expression="0" selector="LAST" selector-expression="#[payload.serialNumber]"/>
    <db:select config-ref="Generic_Database_Configuration" doc:name="Select Database">
       <db:dynamic-query><![CDATA[SELECT
MessageId, MessageType,SerialNumber,CreatedOn  FROM Message where SerialNumber  > #[Integer.parseInt(flowVars['serialNumber'])] order by SerialNumber asc]]></db:dynamic-query>
    </db:select>
 </poll>
   <logger message="#[flowVars['serialNumber']] == Hello this is Loggin Message == #[payload]" level="INFO" doc:name="Logger"/>
  </flow>
</mule>

Idempotent Filter – Idempotent filter is another way in Mule we can keep track between two polling. This filter ensures that only unique messages are received by a service by checking the unique ID of the incoming message. This filter also store unique id/pointer in objectstore in mule.
Here is simple flow to use Idempotent Filter

Poll Idempotent Filter flow Diagram
poll-idempotent

idempotent-objectStore

Here is code

<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:schedulers="http://www.mulesoft.org/schema/mule/schedulers" xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking" xmlns:db="http://www.mulesoft.org/schema/mule/db"
  xmlns:cassandradb="http://www.mulesoft.org/schema/mule/cassandradb" xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:json="http://www.mulesoft.org/schema/mule/json" xmlns:mulexml="http://www.mulesoft.org/schema/mule/xml" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
  xmlns:spring="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.mulesoft.org/schema/mule/db http://www.mulesoft.org/schema/mule/db/current/mule-db.xsd
  http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
  http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
  http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
  http://www.mulesoft.org/schema/mule/xml http://www.mulesoft.org/schema/mule/xml/current/mule-xml.xsd
  http://www.mulesoft.org/schema/mule/ee/tracking http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd
  http://www.mulesoft.org/schema/mule/schedulers http://www.mulesoft.org/schema/mule/schedulers/current/mule-schedulers.xsd">
    <spring:beans>
      <spring:bean id="dataSourceBean" name="dataSource_Bean" class="org.apache.commons.dbcp.BasicDataSource">
        <spring:property name="driverClassName" value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/>
        <spring:property name="username" value="*******"/>
        <spring:property name="password" value="********"/>
        <spring:property name="url" value="jdbc:jtds:sqlserver://localhost:60520;Instance=CRM;DatabaseName=vanrish;domain=man;integrated security=false"/>
      </spring:bean>
    </spring:beans>
    <db:generic-config name="Generic_Database_Configuration" dataSource-ref="dataSourceBean" doc:name="Generic Database Configuration" >
      <reconnect-forever frequency="30000"/>
    </db:generic-config>
    <flow name="poll-idempotent" processingStrategy="synchronous">
      <poll doc:name="Poll">
        <fixed-frequency-scheduler frequency="10000"/>
        <db:select config-ref="Generic_Database_Configuration" doc:name="Select Database">
          <db:dynamic-query><![CDATA[SELECT  MessageId, MessageType,SerialNumber,CreatedOn  FROM Message order by SerialNumber asc]]></db:dynamic-query>
        </db:select>
      </poll>
      <foreach doc:name="For Each">
        <idempotent-message-filter idExpression="#[payload.SerialNumber]" doc:name="Idempotent Message">
          <in-memory-store entryTTL="120000" expirationInterval="1800000"/>
        </idempotent-message-filter>
        <logger message="#[payload.SerialNumber] == Hello this is Loggin Message == #[payload]" level="INFO" doc:name="Logger"/>
      </foreach>
    </flow>
  </mule>

MuleSoft Security – Encryption

Security is about protecting your assets. These assets could be anything in company. Please refer to my previous blog about  what-is-security.

Mulesoft provides security suite to protect company assets. This suite of security features provides various methods for applying security to Mule Service-Oriented Architecture (SOA) implementations and Web services. Mulesoft security suits are available in enterprise version of Mulesoft.

In this blog I am showing, how to use Encryption and Decryption from Mulesoft Security suits. Mule can encrypt an entire payload or several fields of data within a message. This encryption prevents unauthorized access of data like password, SSN, credit card… etc. and moves this data between systems securely.

Mule Message Encryption processor changes the payload or Message so that it becomes unreadable by unauthorized entities. Mule Encryption processor encrypts the payload using one of the following three Encryption Strategies

1) JCE Encrypter — encrypts stream, byte[] or string
2) XML Encrypter — encrypts string, encrypts individual fields using xpath expressions.
3) PGP Encrypter — encrypts stream, byte[] or string, applies tighter security (relative to JCE and XML), increases processing load (relative to JCE and XML)

Encryption-Decryption Flow Diagram
securitydemoproject
Encryption Connector Configuration
In my example I am using Jce Encrypter. I am setting value for key and keypassword
encryption_connector
Here is full code of this implementation


<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:encryption="http://www.mulesoft.org/schema/mule/encryption" 
xmlns:http="http://www.mulesoft.org/schema/mule/http" 
xmlns="http://www.mulesoft.org/schema/mule/core" 
xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"       
xmlns:spring="http://www.springframework.org/schema/beans" version="EE-3.7.0"       
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"       
xsi:schemaLocation="http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/encryption http://www.mulesoft.org/schema/mule/encryption/current/mule-encryption.xsd">

  <http:listener-config name="HTTP_Listener_Configuration" host="0.0.0.0" port="8081" basePath="/demo" doc:name="HTTP Listener Configuration"/>
  <encryption:config name="Encryption" doc:name="Encryption">
    <encryption:jce-encrypter-config key="8aVrj8x8IevyeaD=" keyPassword="0Zb+smauaT8v6hRiFGJDnakwlS/YC2u="/>    </encryption:config>

  <flow name="securitydemoprojectFlow">

    <http:listener config-ref="HTTP_Listener_Configuration" path="/" doc:name="HTTP"/>

    <set-payload value="Hello World" doc:name="Set Payload"/>

    <encryption:encrypt config-ref="Encryption" doc:name="Encryption" using="JCE_ENCRYPTER">

      <encryption:jce-encrypter key="8aVrj8x8IevyeaD=" algorithm="AES" encryptionMode="CBC" keyPassword="0Zb+smauaT8v6hRiFGJDnakwlS/YC2u="/>

    </encryption:encrypt>

    <logger message=" Encrypted Message ==#[payload]" level="INFO" doc:name="Logger"/>

    <encryption:decrypt config-ref="Encryption" doc:name="Decryption" using="JCE_ENCRYPTER">

      <encryption:jce-encrypter key="8aVrj8x8IevyeaD=" keyPassword="0Zb+smauaT8v6hRiFGJDnakwlS/YC2u=" algorithm="AES" encryptionMode="CBC"/>

    </encryption:decrypt>
  </flow>
</mule>

Mulesoft Parallel processing-2 (Splitter,Collection Splitter, Chunk Splitter)

I described in my previous blog about parallel processing, in this blog also I am continuing my previous blog but adding new flavor of parallel processing in Mulesoft.
Splitter flow control splits a message into separate fragments and then sends these fragments parallel and concurrent to the next message processor in the flow. Segments are identified based on an expression parameter, usually written in Mule Expression Language (MEL), but other formats can be employed also. There are three ways we can spilt Mulesoft message.

Splitter — Splitter can split all types of data like Object, XML, JSON, and Payload based on MEL (Mule expression language) and processes each split into individual thread. .

Collection Splitter – If input type is collection, then collection splitter split data based on collection and process each element in individual thread.

Chunk Splitter – Chunk of splitter spilts message into chunk of bytes based on user input and processes each chunk of bytes in individual thread.

Since Collection splitter is one of the most usable splitter in Mulesoft. So I am showing example about Collection splitter.
In this example HasMap is coming as payload. Collection Splitter splits HasMap into different threads (limiting max 50 threads) and processes these threads in parallel.

 Flow diagrame to implement parallel processing through Mulesoft Collection Splitter
splitter

Mulesoft code for parallel processing (Splitter)

<flow name="Vanrish-processFlow" processingStrategy="allow50Threads">

    <logger message="*** Starting Vanrish-processFlow ***" category="edi-vanrish-process" level="INFO" doc:name="Flow Start Log"/>

    <set-payload value="#[map-payload:processing]" doc:name="Payload Processing"/>
    <set-variable variableName="numberOfMessages" value="#[payload.entrySet().size()]" doc:name="Variable"/>
    <logger message="Processing #[flowVars['numberOfMessages']] entities" level="INFO" doc:name="logger-status go to database"/>
    <splitter doc:name="Collection Splitter" expression="#[payload.entrySet()]"/>
    <vm:outbound-endpoint exchange-pattern="one-way" path="VanrishVM" doc:name="VM"/>

    <logger message="*** Ending Vanrish-processFlow ***" category="edi-Vanrish-process" level="INFO" doc:name="Flow End Log"/>
</flow>

<flow name="Vanrish_Splitter_Demo" processingStrategy="allow50Threads">
    <vm:inbound-endpoint exchange-pattern="one-way" path="VanrishVM" doc:name="VM"/>
    <logger message="Company Canonical Start Time -&gt; #[server.dateTime]" level="INFO" doc:name="Company Logger"/>
    <flow-ref name="vanrishMsgPrivateFlow" doc:name="companyMsgPrivateFlow"/>
</flow>

Java Profiler is to show how Mulesoft Splitter splits collection message into multiple threads and process data
splitter-profiler