Recent Posts

High Performance through Asynch operations using Symfony Background Processing

High Performance through Asynch operations using Symfony Background Processing

The Problem aka Opportunity:

Web Users expect high performance, who knew?

While standard performance monitoring/tweaking on pages can make improve performance, some requests are bound to take a while due to some slow, blocking operation that is out of your control. Often these operations don’t impact the response going back to the client – think sending email – and thus can be cut from the request all together. Let’s analyze this scenario with an example.

The Solution – Procrastination:

The answer is doing only the bare minimum needed to return a response, and doing the rest of the processing out-of-band or asynchronously. This can be broken-down into three steps:

  1. Instead of calling a slow task, the request will simply persist an event to a queue in the database
  2. A background process watching the event queue will claim the event and dispatch it to any listeners
  3. The listeners preform any slow, blocking tasks

The idea is not new: asynch calls, batch processing, message queuing, etc have existed for a while. In this post we’re taking a look at creating a simple and generic background processing system using Symfony 2, its Event and Dependency Injection tools, and Doctrine 2. Reading the associated documentation/manuals may assist in understanding the code samples below.

So say you’re running a blog and you have a feature that allows users to subscribe to a specific post. Whenever someone adds a comment to the post it needs to email all subscribers. Your controller method for adding comments may look something like this:


    public function createComment($id)
    {
        $em = $this->getDoctrine()->getManager();
        // get the current comment
        $comment = $em->getRepository('ACMEBundle:Comment')->find($id);
    
        // bind the form request
        $form = $this->createForm(new Form(), $comment);
        $form->bind($request);
    
        // if form is valid then save the comment and send any subscriber emails
        if ($form->isValid()) {
            $em->persist($comment);
            $em->flush();
            $this->get('email.subscribers')->send($comment)   
        }
    }

We’re not going to get into the implementation of get(’email.subscribers’)->send(), but suffice to say it’s a service that does two things: gets a list of all subscribers to the comment’s post; then, it emails each one details of the comment. By doing the bare minimum and keeping the rest for later, we can see that the response time can be improved by delaying things that this user doesn’t care about.

How do we do that? One possible way is to create an email queue and a complimentary cli command that processes it. This solution has some draw backs however; while email sending is a frequent choice for something to push off to the background, it’s likely not the only operation that you’ll want to get out of the user’s request (web service calls, heavy database operations, etc). A much better solution is to create an event firing system that any “after the fact” logic can tap into.

Why do now what you can put off for tomorrow’s cron job?

The implementation is actually pretty simple. You’ll need a service for saving the events, an entity for persisting them, a command/dispatcher for firing saved events, and listeners to handle whatever logic is necessary for that event. Let’s take a look at the service and entity first – starting with the service.


namespace ACMEEventBundleService;
    
    use SymfonyComponentDependencyInjectionContainer;
    use ACMEEventBundleEntityEvent;
    
    class EventService
    {
        /**
         *
         * @var Container
         */
        protected $container;
    
    
        public function __construct(Container $container)
        {
            // we have to use the container due to "circular reference" exceptions
            $this->container = $container;
        }
    
    
        public function save($name, array $data=array())
        {
            $event = new Event();
            $event->setName($name);
            $event->setData($data);
            $event->setCreated(new DateTime());
            $event->setProcessed(0);
            $event->setClaimedBy('');
            $event->setError('');
    
            $em = $this->container->get('doctrine.orm.entity_manager');
            $em->persist($event);
            $em->flush($event);
        }
    }

And the Entity…


    namespace ACMEEventBundleEntity;
    use DoctrineORMMapping as ORM;
    
    /**
     * @ORMTable(name="event")
     * @ORMEntity(repositoryClass="ACMEEventBundleEntityEventRepository")
     */
    class Event
    {
        /**
         * @ORMColumn(name="id", type="integer")
         * @ORMId
         * @ORMGeneratedValue(strategy="AUTO")
         */
        private $id;
    
        /**
         * @ORMColumn(name="name", type="string", length=80)
         */
        private $name;
    
        /**
         * @ORMColumn(name="data", type="array")
         */
        private $data;
    
        /**
         * @ORMColumn(name="created", type="datetime")
         */
        private $created;
    
        /**
         * @ORMColumn(name="processed", type="integer")
         */
        private $processed;
    
        /**
         * @ORMColumn(name="claimedBy", type="string", length=80)
         */
        private $claimedBy;
    
        /**
         * @ORMColumn(name="error", type="string")
         */
        private $error;
    
        // getters and setters here...
    }

The idea here is that the controller will call the service’s save() method instead of sending the emails. Of course, you can have the controller save the event directly, but using a service helps trim down your controllers and prevent duplication of code. Let’s see what the updated controller looks like:


    public function createComment($id)
    {
        $em = $this->getDoctrine()->getManager();
        // get the current comment
        $comment = $em->getRepository('ACMEBundle:Comment')->find($id);
    
        // bind the form request
        $form = $this->createForm(new Form(), $comment);
        $form->bind($request);
    
        // if form is valid then save the comment and send any subscriber emails
        if ($form->isValid()) {
            $em->persist($comment);
            $em->flush();
            $this->get('event.service')->save('comment.created', array(
                'commentId' => $comment->getId()
            ));
        }
    }

The controller can now simply insert a record instead of sending 30 emails. This will have a very positive impact on the end user – assuming that the emails actually get sent. So let’s talk about that. Here’s a subset of the command that gets executed by a cron job (or whatever you plan on using).


    while(true) {
        # attempts to claim a single event, returns it if successful. may return multiple events.
        // note: $processId is some unique id to this process, helps prevent race conditions (see below)
        $events = $this->em->getRepository('ACMEEventBundle:Event')->claimEvent($processId);
    
        # no events to process, so break out of loop.
        if (count($events) === 0) {
            break;
        }
    
        # iterate over each event to be processed, typically just 1.
        foreach($events as $eventEntity) {
            $output->write("Processing id: {$eventEntity->getId()}" . PHP_EOL);
    
            # create the event...
            $event = new Event($eventEntity);
    
            try {
                # dispatch the event!
                $this->dispatcher->dispatch($eventEntity->getName(), $event);
                # if we made it here we were successful, mark as processed
                $eventEntity->setProcessed(1);
    
            } catch (Exception $e) {
                $eventEntity->setError((string)$e);
            }
    
            $this->em->persist($eventEntity);
            $this->em->flush();
        }
    }

The command is “claiming” a single event, dispatching it with Symfony’s standard eventing system, marking it as processed, then moving to the next event in line. If an exception gets thrown we record for later troubleshooting (or when someone asks for it). You would probably have a few jobs that monitor the queue for anything fishy: records with error columns that != ”, records that have not been processed for a long time, etc.

We’re not showing the actual Event class, nor the Listener that sends the email. The former is out of brevity (check out Symfony’s docs on the Event class and you’ll be tip-top) and the latter is because that code is app specific. If you want an example check out – you guessed it – Symfony docs.

Before we move on, let’s take a look at that claimEvent() method that gets called in the ProcessCommand class above.


    ...
        protected function processEvent(InputInterface $input, OutputInterface $output)
        {
            # attempts to claim a single event, returns it if successful. may return multiple events.
            $events = $this->em->getRepository('ACMEEventBundle:Event')->claimEvent($this->processId);
            $eventCount = count($events);
    ...

If you’re curious about that repository method – and you should be – here it is


    public function claimEvent($processId) {
    
        $query = 'UPDATE event SET claimedBy = :processId '.
             'WHERE claimedBy IS NULL ORDER BY created ASC LIMIT 1';
    
        $this->getEntityManager()->getConnection()->executeUpdate($query, array(
            'processId' => $processId
        ));
    
        return $this->createQueryBuilder('aeq')
            ->where('aeq.claimedBy = :processId')
            ->andWhere('aeq.processed = 0')
            ->orderBy('aeq.created')
            ->setParameter('processId', $processId)
            ->getQuery()->getResult();
    }

Let’s take a closer look at that first query…


    $query = 'UPDATE event SET claimedBy = :processId '.
        "WHERE claimedBy = '' ORDER BY created ASC LIMIT 1";

We want to grab an event to process, but we have to make sure that we don’t get an event that has already been claimed. Likewise, we want to ensure that nobody else claims an event that we are about to process. This is a classic race condition, and naturally, good procrastinators will avoid that. We do that by setting the claimedBy column to the current processId, but only if the record currently has an empty value for that column. This should prevent the event from being claimed by multiple processes. We also order by the created column so the queue operates in FIFO (First In First Out), but you can switch to “ORDER BY created DESC” if you prefer FILO (First In Last Out). Note: we use a raw MySQL query here because Doctrine doesn’t have support for – and this is probably a good thing – a LIMIT clause in an UPDATE statement.

Next let’s look at the SELECT query.


    return $this->createQueryBuilder('e')
        ->where('e.claimedBy = :processId')
        ->andWhere('e.processed = 0')
        ->andWhere("e.error = ''")
        ->orderBy('e.created')
        ->setParameter('processId', $processId)
        ->getQuery()->getResult();

This query will return any events that are claimed by this command, but have not yet been processed. In theory this going to be the record we just updated in the previous statement, but we do add support for multiple events being returned just in the off chance that the scenario occurs.

In Conclusion:

Before wrapping it up I feel obliged to give the “when you got a hammer, everything’s a nail” warning. Just because you CAN throw some logic in a background process, doesn’t mean that you always should. In fact I say that things should never be pushed to a background process unless you have no other choice. When you move logic into the background it adds “stuff” around it that should be avoided as much as possible (less efficient, more things to break, can be harder to troubleshoot), so if you have some logic that’s a bit slow, as an example, try to rework it before moving it to the background. Else you’ve taken The Procrastination Principle too far.

Finally, if you’re interested in the above concepts but insist on bringing a sword to a knife fight then checkout Gearman and Supervisord.

Thanks for reading, if you have questions please comment below.

5

Comments
  • Rajiv Menon25 March 2013
    Reply

    I am sure there are workarounds for this but what if the background /long running process had something that DID concern the user? E.g. the emails expected to be sent out didn’t get out. Would there be an offline notification that goes back to the user? Any standard patterns to reflect a failure or partial success to the user (who is now possibly offline)?

    • sebastiaan hilbers21 February 2014
      Reply

      Then you need a bidirectional connection to your enduser (because your async stack (the command you run) loses context to your sync (http) stack). You could work with Ratchet and use websockets for notifications about success or failure…

      Chris, what is your opinion about this?

      • rmenon21 February 2014
        Reply

        Approve.

  • PuffingDev28 March 2014
    Reply

    Hi there, you might want to check out : http://puffingdev.com/async-eventdispatcher-in-symfony/

    Its pretty much the same logic, but without having to create an entity, without even having to use your entitymanager at all!

  • Maksim22 May 2017
    Reply

    I am working on an alternative solution https://github.com/php-enqueue/enqueue-dev/blob/master/docs/bundle/async_events.md
    Super easy to use. It works on different transports like AMQP, STOMP, Doctrine DBAL, Amazon SQS and so on.

    Could you please look at. The feedback would be much appreciated.

Leave a Comment