I would like to run some code before each Consumer::process().

Usage: using ORM with IdentityMap leads to working over cached data, not freshly fetched from storage/db. In this callback, I would force ORM release all this cached indentity map. This will force consumer behaviour as it would be freshly started, not like long running php script.

How do you solve this? Of course there is possibility to make parent of Consumer, call parent::process() and put the logic there, but I don't like inheritance for such things.

Generally, working with DB's is problematic in workers. Worker may be waiting long hours before a message hit's it and it starts working. You mainly have to handle valid reconnection to the database.

In my applications, I'm explicitly cleaning memory after myself, when a message is processed.

public function process(AMQPMessage $message)
{
    if (!$request = Json::decode($message->body, Json::FORCE_ARRAY)) {
        return self::MSG_REJECT;
    }

    $db = $this->em->getConnection();
    if ($db->ping() === FALSE) {
        $db->close();
        $db->connect();
    }

    /** @var StockRequest $order */
    if ($order = $this->em->getDao(StockRequest::class)->find($request['id'])) {
        try {
            // some work

            $this->logger->addDebug(sprintf('Stock order %s was accepted, requested by user %s', $request['id'], $request['user']));

        } catch (InvalidPriceVsAmountException $e) {
            $this->logger->addWarning(sprintf('%s: %s', get_class($e), $e->getMessage()));
            return self::MSG_REJECT;
        }

    } else {
        $this->logger->addWarning(sprintf('Order %d was not found', $request['id']));
    }

    $this->em->clear();

    return self::MSG_ACK;
}

It seems like a better way than before callback, because you're freeing the memory immediately (some our workers work with hundreds of MB of memory).

You should also be able to hook onto an event onConsume, using Kdyby/Events.

Yeah! onConsume is the thing I was looking for. What do you thing about puting this into the event:

$db = $this->em->getConnection();
 if ($db->ping() === FALSE) {
     $db->close();
     $db->connect();
 }
  • Comments:

  • Sure, why not :) But not all my consumers are working with database. So it doesn't make sense for me to have it there.

    by Filip Procházka at 28.1.2015 0:02

You must first log in to participate in this discussion