Skip to content

Latest commit

 

History

History
176 lines (123 loc) · 4.15 KB

mongodb.md

File metadata and controls

176 lines (123 loc) · 4.15 KB

Enqueue Mongodb message queue transport

Allows to use MongoDB as a message queue broker.

Installation

$ composer require enqueue/mongodb

Create context

<?php
use Enqueue\Mongodb\MongodbConnectionFactory;

// connects to localhost
$connectionFactory = new MongodbConnectionFactory();

// same as above
$factory = new MongodbConnectionFactory('mongodb:');

// same as above
$factory = new MongodbConnectionFactory([]);

$factory = new MongodbConnectionFactory([
    'dsn' => 'mongodb://localhost:27017/db_name',
    'dbname' => 'enqueue',
    'collection_name' => 'enqueue',
    'polling_interval' => '1000',
]);

$psrContext = $factory->createContext();

// if you have enqueue/enqueue library installed you can use a factory to build context from DSN 
$psrContext = (new \Enqueue\ConnectionFactoryFactory())->create('mongodb:')->createContext();

Send message to topic

<?php
/** @var \Enqueue\Mongodb\MongodbContext $psrContext */
/** @var \Enqueue\Mongodb\MongodbDestination $fooTopic */

$message = $psrContext->createMessage('Hello world!');

$psrContext->createProducer()->send($fooTopic, $message);

Send message to queue

<?php
/** @var \Enqueue\Mongodb\MongodbContext $psrContext */
/** @var \Enqueue\Mongodb\MongodbDestination $fooQueue */

$message = $psrContext->createMessage('Hello world!');

$psrContext->createProducer()->send($fooQueue, $message);

Send priority message

<?php
/** @var \Enqueue\Mongodb\MongodbContext $psrContext */

$fooQueue = $psrContext->createQueue('foo');

$message = $psrContext->createMessage('Hello world!');

$psrContext->createProducer()
    ->setPriority(5) // the higher priority the sooner a message gets to a consumer
    //    
    ->send($fooQueue, $message)
;

Send expiration message

<?php
/** @var \Enqueue\Mongodb\MongodbContext $psrContext */
/** @var \Enqueue\Mongodb\MongodbDestination $fooQueue */

$message = $psrContext->createMessage('Hello world!');

$psrContext->createProducer()
    ->setTimeToLive(60000) // 60 sec
    //    
    ->send($fooQueue, $message)
;

Send delayed message

<?php
use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy;

/** @var \Enqueue\Mongodb\MongodbContext $psrContext */
/** @var \Enqueue\Mongodb\MongodbDestination $fooQueue */

// make sure you run "composer require enqueue/amqp-tools".

$message = $psrContext->createMessage('Hello world!');

$psrContext->createProducer()
    ->setDeliveryDelay(5000) // 5 sec
    
    ->send($fooQueue, $message)
;

Consume message:

<?php
/** @var \Enqueue\Mongodb\MongodbContext $psrContext */
/** @var \Enqueue\Mongodb\MongodbDestination $fooQueue */

$consumer = $psrContext->createConsumer($fooQueue);

$message = $consumer->receive();

// process a message

$consumer->acknowledge($message);
// $consumer->reject($message);

Subscription consumer

<?php
use Interop\Queue\PsrMessage;
use Interop\Queue\PsrConsumer;

/** @var \Enqueue\Mongodb\MongodbContext $psrContext */
/** @var \Enqueue\Mongodb\MongodbDestination $fooQueue */
/** @var \Enqueue\Mongodb\MongodbDestination $barQueue */

$fooConsumer = $psrContext->createConsumer($fooQueue);
$barConsumer = $psrContext->createConsumer($barQueue);

$subscriptionConsumer = $psrContext->createSubscriptionConsumer();
$subscriptionConsumer->subscribe($fooConsumer, function(PsrMessage $message, PsrConsumer $consumer) {
    // process message
    
    $consumer->acknowledge($message);
    
    return true;
});
$subscriptionConsumer->subscribe($barConsumer, function(PsrMessage $message, PsrConsumer $consumer) {
    // process message
    
    $consumer->acknowledge($message);
    
    return true;
});

$subscriptionConsumer->consume(2000); // 2 sec

back to index