Magento, Magento 2

How to Use Message Queue in Magento 2

How_to_Use_Message_Queue_in_Magento_2

In this tutorial, Today I will explain you to how to use message queue in Magento 2. Before that, Let’s explore about what is message queue.

What is Message Queue in Magento 2?

A message queue provide an asynchronous communications mechanism in which the sender and the receiver of a message do not contact each other, nor do they need to communicate with the message queue at the same time. When a sender places a message onto a queue, it is stored until the recipient receives them.

It will temporarily stores messages & store upto they are processed and deleted. Each message is processed only once, by a single consumer.

Message Queue Framework supports MySQL and RabbitMQ queue systems.

How to Install RabbitMQ Server [For Linux / Ubuntu]

  • sudo apt-get install rabbitmq-server
  • sudo systemctl enable rabbitmq-server
  • sudo systemctl start rabbitmq-server
  • sudo rabbitmq-plugins enable rabbitmq_management
  • http://127.0.0.1:15672/#/
  • User: guest
    Pass: guest

How to Implement RabbitMQ in Magento 2

  • Go to Root Directory of Magento > app > etc > env.php
  • Add below code inside env.php

'queue' => [
    'amqp' => [
    	'host' => '127.0.0.1',
    	'port' => '5672',
    	'user' => 'guest',
    	'password' => 'guest',
    	'virtualhost' => '/',
    	'ssl' => ''
    ]
],

Configure the Message Queue

1) communication.xml :

  • This file defines aspects of the message queue system that all communication types have in common. It will contain list of topics and handlers.

2) queue_publisher.xml :

  • This file defines publisher classes, connection and exchange to use to publish messages for a specific topic.

3) queue_topology.xml :

  • This file defines the message routing rules and declares queues and exchanges.

4) queue_consumer.xml :

  • This file defines the relationship between an existing queue and its consumer.

Tables Details of Message Queue

Table Name Description
queue Contains a list of queue names and IDs.
queue_lock Contains a list of jobs waiting to be executed.
queue_message Contains JSON message data with its topic name.
queue_message_status Contains a status of queue_message with its queue_id and no of trials. Here, I am adding a status code with its meaning.

  • 2 = new
  • 3 = in progress
  • 4 = complete
  • 5 = retry required
  • 6 = error
  • 7 = to be deleted
queue_poison_pill Contains a version that will be re-instantiated after an update of the configuration.

Now, Let’s start to create module for create custom message queue.

Steps of Create Custom Message Queue

1) First of all, Create custom module using this link.

2) After that, Create communication.xml file at app/code/RH/Helloworld/etc/ and paste the below code :

<?xml version="1.0"?>
<!--
/**
 * Copyright © Magento, Inc. All rights reserved.
 * See COPYING.txt for license details.
 */
-->
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">
    <topic name="syncRhProductTopic" request="string" /> 
</config>

Here, <topic name=”TOPIC.NAME.HERE” request=”string” />

3) Then, Create a queue_publisher.xml file at app/code/RH/Helloworld/etc/ and paste the below code :

<?xml version="1.0"?>
<!--
/**
 * Copyright © Magento, Inc. All rights reserved.
 * See COPYING.txt for license details.
 */
-->
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/publisher.xsd">
    <publisher topic="syncRhProductTopic">
        <connection name="db" exchange="syncRhProductExchange" />
    </publisher>
</config>

Here, syncRhProductTopic = “TOPIC.NAME.HERE” & syncRhProductExchange = “EXCHANGE.NAME.HERE

4) After that, Create queue_topology.xml file at app/code/RH/Helloworld/etc/ and paste the below code :

<?xml version="1.0"?>
<!--
/**
 * Copyright © Magento, Inc. All rights reserved.
 * See COPYING.txt for license details.
 */
-->
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/topology.xsd">
    <exchange name="syncRhProductExchange" type="topic" connection="db">
        <binding id="syncRhProductBinding" topic="syncRhProductTopic"
        		destinationType="queue" destination="syncRhProductQueue"/>
    </exchange>
</config>

Here, syncRhProductTopic = “TOPIC.NAME.HERE” & syncRhProductExchange = “EXCHANGE.NAME.HERE” syncRhProductBinding = “BINDING.NAME.HERE” syncRhProductQueue = “QUEUE.NAME.HERE

5) Now, Create queue_consumer.xml file at app/code/RH/Helloworld/etc/ and paste the below code :

<?xml version="1.0"?>
<!--
/**
 * Copyright © Magento, Inc. All rights reserved.
 * See COPYING.txt for license details.
 */
-->
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/consumer.xsd">
    <consumer name="syncRhProductConsumer" queue="syncRhProductQueue"
    		connection="db" maxMessages="5000"
    		handler="RH\Helloworld\Model\MessageQueue\SyncRhProduct::process"/>
</config>

6) Create a cron for execute custom consumer cron. For that, add below content into app/code/RH/Helloworld/etc/crontab.xml file :

<?xml version="1.0"?>
<!--
/**
 * Copyright © Magento, Inc. All rights reserved.
 * See COPYING.txt for license details.
 */
-->
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:noNamespaceSchemaLocation="urn:magento:module:Magento_Cron:etc/crontab.xsd">
    <group id="rh_cron_group">
        <job name="rh_consumer_cron" instance="RH\Helloworld\Cron\SyncRhConsumer" method="execute">
            <schedule>* * * * *</schedule>
        </job>
    </group>
</config>

7) Create SyncRhConsumer.php file at app/code/RH/Helloworld/Cron/ and paste the below code to set cron file logic :

<?php
/**
 * Copyright © Magento, Inc. All rights reserved.
 * See COPYING.txt for license details.
 */
namespace RH\Helloworld\Cron;

use Magento\Framework\Json\Helper\Data;
use Magento\Framework\MessageQueue\PublisherInterface;
use RH\Helloworld\Model\MessageQueue\SyncRhProduct;
use Magento\Framework\MessageQueue\ConsumerFactory;

class SyncRhConsumer
{
    /**
     * @var PublisherInterface
     */
    private $publisher;

    /**
     * @var Data
     */
    private $jsonHelper;

	/**
	 * @var Logger
	 */
    private $logger;

    /**
     * @var ConsumerFactory
     */
    private $consumerFactory;

    /**
     * @param PublisherInterface $publisher
     * @param Data $jsonHelper
     * @param Logger $logger
     * @param ConsumerFactory $consumerFactory
     */
    public function __construct(
        PublisherInterface $publisher,
        Data $jsonHelper,
        Logger $logger,
        ConsumerFactory $consumerFactory
    ) {
        $this->publisher = $publisher;
        $this->jsonHelper = $jsonHelper;
        $this->logger = $logger;
        $this->consumerFactory = $consumerFactory;
    }

    /**
     * @inheritdoc
     */
    public function execute()
    {
        $details[] = [
            "any_informatic_index" => "value",
        ];
        $batchSize = 500;
        $noOfMessages = 1;

        $logger = $this->logger;
        $logger->info('Cron executed successfully');

        $this->publisher->publish(
            SyncRhProduct::TOPIC_NAME,
            $this->jsonHelper->jsonEncode($details)
        );
        $consumer = $this->consumerFactory->get('syncRhProductConsumer', $batchSize);
        $consumer->process($noOfMessages);
    }
}

8) In Last, Create SyncRhProduct.php file at app/code/RH/Helloworld/Model/MessageQueue/ and paste the below code to set consumer file logic :

<?php
/**
 * Copyright © Magento, Inc. All rights reserved.
 * See COPYING.txt for license details.
 */
namespace RH\Helloworld\Model\MessageQueue;

use Magento\Framework\MessageQueue\ConsumerConfiguration;

class SyncRhProduct extends ConsumerConfiguration
{
    /**
     * Topic name defined for cron
     */
    public const TOPIC_NAME = "syncRhProductTopic";

    public function process($request)
    {
        $writer = new \Zend_Log_Writer_Stream(BP . '/var/log/rohan.log');
        $logger = new \Zend_Log();
        $logger->addWriter($writer);
        $logger->infor(print_r($request,true));
    }
}

Now, just run the cron. You can check queue database table to verify queue is success or not.

That’s it !!!

I hope this blog is easy to understand about how to use message queue in Magento 2. In case, I missed anything or need to add some information, always feel free to leave a comment in this blog, I’ll get back with proper solution.

Keep liking and sharing !!

Tagged ,