The Salesforce Streaming API is a simple way to push relevant data to the users in real time, instead of having to refresh the screen to get new information. This allows users to get notifications to be pushed from the server to the client based on criteria that we define.
The sequence of events when using Streaming API is as follows:
1. Create a PushTopic based on a SOQL query. This defines the channel.
2. A record is created or updated or deleted. The changes to that record are evaluated.
3. If the record changes match the criteria of the PushTopic query, a notification is generated by the server and received by the subscribed clients.Before going deep into the Streaming API, let’s have a brief introduction on the following technologies:
1. Push Technology
2. Bayeux Protocol, CometD, and Long PollingPush Technology
HTTP server push is a way of asynchronous communication between a web server and web browser. In this methodology, the server pushes the information to the client, without clients requesting for information. The server-client connection always remains open, so that when another event occurs the data is immediately sent to the client.
Bayeux Protocol
Bayeux is a JSON-based protocol which is more flexible and scalable to transfer asynchronous message with low latency. The messages are routed via named channels. Server-push technology is used to deliver asynchronous messages from server to client.
CometD
CometD is a scalable HTTP-based event routing bus that uses an AJAX Push technology pattern. The combination of server push technique with AJAX web application is called as comet. It is actually an umbrella term for multiple techniques for achieving client-server interaction. All methods have one thing in common that they all rely on browser-native technologies such as JavaScript, rather than relying on proprietary plugins.
Long Polling
Long polling is a technique in which the client makes an Ajax request to the server, and it is kept open until the server has new data to send to the browser. Upon receiving the server response, the browser initiates a new long polling request in order to obtain the next data set. Long polling can be achieved using either Ajax or script tag techniques.
To achieve integration between salesforce and Mule using Streaming API, the following steps should be implemented:
Prerequisites (Salesforce)
1. The “Streaming API” permission must be enabled > Your Name > Setup > Customize > User Interface.
2. The logged in user must have “Read” permission on the PushTopic Standard object to receive notifications.
3. The logged in user must have “Create” permission on the PushTopic Standard object to create and manage PushTopic records.
4. The logged in user must have “Author Apex” permissions to create a PushTopic by using the Developer Console.
Now, let us go over the concepts in more detail by creating a sample project
Salesforce
Step 1: Create an Object – In our case, we will use Account object to explain the concept. Step 2: Create a PushTopic Use the Developer Console to create the PushTopic record. You can use either Developer Console or Workbench. If needed these records can be updated.
Your Name > Developer Console > Debug > Open Execute Anonymous window
Now, copy and paste the below code and press the execute button.
PushTopic pushTopic=new PushTopic(); pushTopic.Name='RefreshAccounts'; pushTopic.Query='SELECT Id,NameFROM Account'; pushTopic.ApiVersion=30.0; insert pushTopic;
Mule Studio
Mule supports Salesforce Streaming API. So, in the below flow, we have connected to Salesforce from mule and tracked events for a particular object. Once the event occurs, mule gets imitated automatically, and it stores/fetches the event result in rabbitmq server and displays the event results in mule console with the help of logger component
Global Element Setup
Salesforce Connector and AMQP connector should be defined globally. The configuration of the components are shown below in the form of screen shots.
Configuration Components
The configuration of components used in mule are explained below:
Salesforce (Streaming): Salesforce Connector will allow to connect with the Salesforce application using regular username and password via the SOAP API as well as OAuth connection
<sfdc:subscribe-topic config-ref="config-user1 topic="/AccountUpdateNew1" doc:name="Salesforce (Streaming)"/>
Idempotent Filter: The Idempotent Receiver ensures that only unique messages are received by a service, by checking the unique message ID of the incoming message. The ID can be generated from the message using an expression defined in the idExpression attribute. By default, the expression used is #message:id
<idempotent-message-filter storePrefix="Idempotent_Message" idExpression="#[message:id]" doc:name="Idempotent Message"/>
Set Payload : The setPayload transformer is used to modify the payload of the incoming message.
<set-payload value="#[payload['Id'].getBytes()]" doc:name="Set Payload"/>
AMQP: The Advanced Message Queuing Protocol (AMQP) is an open standard for passing business messages between applications.AMQP connector is used to publish messages to exchanges to which queues are bound or not, and it decouples the producer from the final destination of its messages.
<amqp:inbound-endpoint queueName="newqueue1" responseTimeout="10000" connector-ref="AMQP_Connector" doc:name="AMQP"/>
ByteArray to String: Since the message received from rabbitmq is in the form of bytes, this transformer is used to convert byte array to string.
<byte-array-to-string-transformer doc:name="Byte Array to String"/>
Logger: It is used to see the output in console.
<logger level="INFO" doc:name="Logger" message="#[payload]"/>
Conclusion
The Salesforce Streaming API is an effective technology to push your Salesforce organization’s data to clients in real time when the data meet certain criteria defined in Salesforce.