Event bus in Symfony application

In the previous posts, we created a command bus and a query bus. Now is the time for the last of the most common buses, event bus. Of these three, this is the most complex. Contrary to the previous ones, the event bus we are going to create is the asynchronous bus and requires a place where events are stored before they are processed by event consumers. This is a simplified diagram.

In this example, we are going to use doctrine transport to store and restore our events. By default, The Messenger Component provides 3 types of transports: Doctrine, AMQP, and Redis. The starting code for this post is here.

Monolog configuration

In the event handler, we are going to write a message into a log file every time a user creates a new note. Of course, it’s not the most appropriate way to save application logs, but it’s the easiest way to show you how asynchronous event handling works. We just need to install the Monolog bundle. Since we use the Symfony framework, we don’t need to do anything else, all required configuration files will be created automatically.

composer require symfony/monolog-bundle

Event and event handler

// src/Event/NoteCreated.php

<?php
namespace App\Event;

use Ramsey\Uuid\Uuid;

class NoteCreated
{
    private $noteId;

    public function __construct(Uuid $noteId)
    {
        $this->noteId = $noteId;
    }

    public function getNoteId(): Uuid
    {
        return $this->noteId;
    }
}
// src/Event/Handler/NoteCreatedHandler.php

<?php
namespace App\Event\Handler;

use App\Event\NoteCreated;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;

class NoteCreatedHandler implements MessageHandlerInterface
{
    private $logger;

    public function __construct(LoggerInterface $logger)
    {
        $this->logger = $logger;
    }

    public function __invoke(NoteCreated $event)
    {
        $noteId = $event->getNoteId()->toString();
        $this->logger->debug("Note with id $noteId has been created");
    }
}

Bus configuration

Now let’s configure the event bus and Doctrine transport. Then we have to assign the event we created to be routed into this transport.

// config/packages/messenger.yaml

framework:
    messenger:
        default_bus: command.bus
        buses:
            // ...
            event.bus:
                default_middleware: allow_no_handlers

        transports:
            async: '%env(MESSENGER_TRANSPORT_DSN)%'

        routing:
            'App\Event\NoteCreated': async

Afterward, we have to add MESSENGER_TRANSPORT_DSN variable to the .env file (or .env.local in my case). For now, we are going to use default Doctrine transport.

MESSENGER_TRANSPORT_DSN=doctrine://default

In the end, we need to restrict access to event handlers for only the event bus.

// config/services.yaml

services:
    // ...
    App\Event\Handler\:
        autoconfigure: false
        resource: '../src/Event/Handler'
        tags: [{ name: messenger.message_handler, bus: event.bus }]

Now we can finally implement dispatching our newly created event in AddNoteHandler.

<?php
namespace App\Command\Handler;

use App\Command\AddNote;
use App\Entity\Note;
use App\Event\NoteCreated;
use App\Repository\NoteRepository;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DispatchAfterCurrentBusStamp;

class AddNoteHandler implements MessageHandlerInterface
{
    private $notes;
    private $em;
    private $eventBus;

    public function __construct(
        NoteRepository $noteRepository,
        EntityManagerInterface $em,
        MessageBusInterface $eventBus
    ) {
        $this->notes = $noteRepository;
        $this->em = $em;
        $this->eventBus = $eventBus;
    }

    public function __invoke(AddNote $command)
    {
        $noteId = $command->getId();
        $noteTitle = $command->getTitle();
        $noteDescription = $command->getDescription();

        if($this->notes->noteWithTitleExists($noteTitle)) {
            throw new \LogicException("Note title has to be unique");
        }

        $note = new Note($noteId);
        $note->setTitle($noteTitle);
        $note->setDescription($noteDescription);

        $this->em->persist($note);
        $this->em->flush();

        $event = new NoteCreated($noteId);
        $envelope = new Envelope($event);
        $this->eventBus->dispatch($envelope->with(new DispatchAfterCurrentBusStamp()));
    }
}

Now after creating a new note, NoteCreated event will be dispatched and stored in the database. In PHPMyAdmin it looks like this:

Consuming events

Since we have our event stored in the database we need to run our consumer to process the event. To achieve that we only need to execute one command in the console.

php bin/console messenger:consume

And now the information about the newly created note should show up in the log file.

If you run this command for the first time, a new table called messenger_messages will be created automatically in your database. Once run message consumer continues working until it’s terminated. To terminate all consumers you just need to execute the following command.

php bin/console messenger:stop-workers

It is going to send a signal to all working consumers to finish the currently processed message and then stop. Every time you update your code, you also have to restart your consumers! Otherwise, your changes won’t be applied.

Summary

This was the last post in the series about message buses in Symfony projects. In the future, I plan to create a similar series based on the Laravel framework. Here you can find the final code. If you missed previous posts you can find them below:

You May Also Like

Leave a Reply

Your email address will not be published. Required fields are marked *