Thursday, 24 April 2014

How to implement asynchronous queue in CQ5

Scenario1:
You have an application where you are calling an external web service. But you don't want to call the web service and get response synchronously.
1) You want if there are some other processes running and they have highr priority they get completed first and so you want to add your tasks to a queue and process them on basis of their respective priority.
2) You also want to retry a task if there is some exception or your web service returns an error reponse.

Solution:
Sling defines an internal queue. So, instead of calling the webservice directly you can add the task to sling queue by creating a job manager and posting your payload to the queue via Event admin.

EventAdmin manages event and exposes 2 methods:
  • postEvent : Initiate asynchronous delivery of an event. This method returns to the caller before delivery of the event is completed. 
  • sendEvent: Initiate synchronous delivery of an event. This method does not return to the caller until delivery of the event is completed. 
JobManager class:
It is used to manage the sling job queues

For e.g.:
Find below implementation Job Manager which posts events to the queue:

@Component
@Service
 public class JobManager  {

    @Reference
    private JobManager jobManager;

    @Reference
    private EventAdmin eventAdmin;
  
  @Override
    public void startJob(String topicName, Map<String,Object> payloadMap)
    {
     eventAdmin.postEvent(event);
      }
}

In order to set certain properties in the job you are posting so that they can be retrieved in the consumer,  you need to create a job payload map of type <String,Object>.

For e.g.: This code defines job priority, number of retries and retry delay.
jobPayload.put(JobUtil.PROPERTY_JOB_PRIORITY, "NORM");
jobPayload.put(JobUtil.PROPERTY_JOB_RETRIES ,5);
jobPayload.put(JobUtil.PROPERTY_JOB_RETRY_DELAY, 3000);

You can then call startJob method of your JobManager class to add your task to the queue.
e.g.:jobManager.startJob("topic/test1", jobPayload);

The last and final step is creation of job consumer service which will read the job queue and process the job:
.
@Component(immediate = true)
@Property(label = "Event Topic For Test", value = {
        "topic/test1"}, description = "Event Topics this event handler will to respond to", name = EventConstants.EVENT_TOPIC)
@Service
public JobConsumer implements JobProcessor, EventHandler {

    @Override
    public void handleEvent(Event event) {
       if (EventUtil.isLocal(event)) {
            JobUtil.processJob(event, this);

        }
    }

    @Override
    public boolean process(Event job) {

        /* Extract all the job parameters here and call the required method or continue with your actual 
             processing*/

           String sample1 = (String) job.getProperty("test1");

}


No comments:

Post a Comment