UPDATE 23/01/2012

As Sebastien pointed out in the comments, Symfony 2.1's emails are sent after the page via a kernel.terminate event, which is cool! I don't know if there's any specific configuration required, but Fabien tweeted about it. Rather than going forward with what I propose in this blog post, you could use the kernel.terminate event, to improve perceived performance for your users. Things would take place in process still, so taking up web server resources, but it's a good first step.

I love Symfony2's Event Dispatcher and find it's a really nice way of decoupling code, though admittedly it can decouple things a little too much. One thing I constantly find, is that the listeners I bind to events tend to be for sending notifications or other sorts of logging, which in an ideal world, would be handled asynchronously. That is, why should this user wait for their page response, just so your system can email the 10 people who want to know about what they did? Most people taking the leap to move these functions offline would create message queues for each of those individual listeners, or setup a new separate service bus, but I've had a look at attaching a message queue to the existing symfony architecture, by creating a simple decorator for the event dispatcher.

Example

You can get the full example on github, I'll just show snippets here as a walkthrough. I'm using Silex for the example, but the same principles apply to a full blown symfony application, or any app that's using the Event Dispatcher.

Here some code for a simple JSON API to add a comment to a blog post. Notice that the email is sent inline:

/**
 * Add a comment
 */
$app->post('/blog/{post}/comments', function(Application $app, array $post) {

    $id = !empty($post['comments']) ? max(array_keys($post['comments'])) + 1 : 1;
    $comment = json_decode($app['request']->getContent(), true);
    $comment['id'] = $id;
    $post['comments'][$id] = $comment;

    /**
     * Send an email to the author
     */
    mail($app['author'], 'New comment on ' . $post['title'], $comment['comment']);

    return new Response(
        json_encode($comment),
        201,
        array(
            'Content-type' => 'application/json',
            'Location' => "/blog/{$post['id']}/comment/{$id}",
        )
    );
})->convert('post', $postLookup);

Using Events

Our first improvement is to send that email via an event listener. First we add a listener:

$app['dispatcher']->addListener('blog.comments.new', function(Event $event) use ($app) {
    mail($app['author'], 'New comment on ' . $event->post['title'], $event->comment['comment']);
});

Then we alter the controller to fire an event rather than firing an email

/**
 * Fire and event rather than sending an email
 */
$event = new Event;
$event->post = $post;
$event->comment = $comment;
$app['dispatcher']->dispatch('blog.comments.new', $event);

This is a nice little improvement, we've decoupled sending the email from the main purpose of the endpoint which is adding the comment. The problem is, the sending of the email, which can be slow, still happens in the same synchronous process, which we'll hope to overcome in the next few steps.

Firehose Event Dispatcher

@glenathan pointed out that storing and switching on the async prefix wasn't necessary, so that's been removed, see the full changeset for details.

I implemented this as a decorator (I think, I'm rubbish with patterns), and it basically allows you to register listeners that will receive all events dispatched through a given event dispatcher. Checkout the source on github. In this example, we're going to use the firehose to refire the event with a different event name:

$app['base_dispatcher'] = $app['dispatcher'];
$app['dispatcher'] = $app->share(function($c) {
    $firehose = new FireHose($c['base_dispatcher']);
    return $firehose;
});

$app['dispatcher']->addFireHoseListener(function($eventName, Event $event) use ($app) {
    /**
     * Disable the firehose, then fire the event again but with the async
     * prefix
     */
    $app['dispatcher']->oneTimeDisableFireHose();
    $app['dispatcher']->dispatch('async.' .  $eventName, $event);
});

Now we change the name of the event for our email sending listener

$app['dispatcher']->addListener('async.blog.comments.new', function(Event $event) use ($app)
    mail($app['author'], 'New comment on ' . $event->post['title'], $event->comment['comment']);
});

At this point, we're not doing anything asynchronously, but we've given our app the ability to make a choice. Listeners can choose to listen on the regular event name, or choose the asynchronous event name, by which they should expect to be notified asynchronously.

Fire hosing on to a message queue

The next step is to change the firehose listener to one that will push the event on to a message/job queue or service bus of some description. In this example, I've used 0MQ in a pub/sub type setup, but you can insert anything you like in here. If you're interested in 0MQ, checkout Ian Barber's Slides.

$app['dispatcher']->addFireHoseListener(function($eventName, Event $event) use ($app) {
    /**
     * Dont bother with the silex or kernel events, in real life you'd probably 
     * want to switch this on class type of the event
     */ 
    if (0 !== strpos($eventName, 'kernel.') && 0 !== strpos($eventName, 'silex.')) { 

        $eventName = 'async.' . $eventName;

        if ($app['dispatcher.async']) {
            $app['dispatcher.queue.pub']->send(serialize(array($eventName, $event)));
        } else {
            // might be useful to have in process for dev
            $app['dispatcher']->oneTimeDisableFireHose();
            $app['dispatcher']->dispatch($eventName, $event);
        }
    }
});

$app['dispatcher.queue.pub.dsn'] = 'tcp://localhost:5567';
$app['dispatcher.queue.sub.dsn'] = 'tcp://localhost:5566';
$app['dispatcher.queue.pub'] = $app->share(function($c) {
    $ctx = new ZMQContext();
    $send = $ctx->getSocket(ZMQ::SOCKET_PUSH);
    $send->connect($c['dispatcher.queue.pub.dsn']);
    return $send;
});

source on github

There's an important point here, note the exclusion of the kernel and silex events, these events dont serialize. If you want to pop your Events on a queue and have them handled elsewhere, they need to be in a portable format. Events that implement the serializable interface would be a good start, but you might want to consider what other systems may want to get to the data and use a format like JSON.

Now to create a little daemon to pull things off the queue and refire the event like we did before:

$app = include __DIR__.'/app.php';
$ctx = new ZMQContext();
$sub = $ctx->getSocket(ZMQ::SOCKET_SUB);
$sub->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, '');
$sub->connect($app['dispatcher.queue.sub.dsn']);
$poll = new ZMQPoll();
$poll->add($sub, ZMQ::POLL_IN);
$read = $wri = array();
while(true) {
    $ev = $poll->poll($read, $wri, 5000000);
    if ($ev > 0) {
        list($eventName, $event) = unserialize($sub->recv());
        echo "Refiring $eventName\n";
        $app['dispatcher']->oneTimeDisableFireHose();
        $app['dispatcher']->dispatch($eventName, $event);
    }
}

source on github

Last but not least, we need a 0MQ server to sit in the middle:

$ctx = new ZMQContext();
$pub = $ctx->getSocket(ZMQ::SOCKET_PUB);
$pub->bind('tcp://*:5566');
$pull = $ctx->getSocket(ZMQ::SOCKET_PULL);
$pull->bind('tcp://*:5567');

while(true) {
    $message = $pull->recv();
    $pub->send($message);
}

source on github

Putting it all together

I know it's a rather crude example, but what you end up with is a pretty simple way of turning synchronous events into asynchronous ones.

Example Output

It would be nice to hear other people's thoughts on this, hit me up with some comments. I know it seems a bit messy, but I'm betting there's quite a few symfony sites wired up with events that could benefit from having a few taken offline?