Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add queue interop based driver. Supports AMQP, Amazon SQS, Kafka, Google PubSub, Redis, STOMP, Gearman, Beanstalk #20148

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
},
"require-dev": {
"aws/aws-sdk-php": "~3.0",
"queue-interop/queue-interop": "^0.5",
"doctrine/dbal": "~2.5",
"mockery/mockery": "~0.9.4",
"pda/pheanstalk": "~3.0",
Expand Down Expand Up @@ -104,6 +105,7 @@
},
"suggest": {
"aws/aws-sdk-php": "Required to use the SQS queue driver and SES mail driver (~3.0).",
"queue-interop/queue-interop": "Required to use the queue interop driver compatible transports",
"doctrine/dbal": "Required to rename columns and drop SQLite columns (~2.5).",
"fzaninotto/faker": "Required to use the eloquent factory builder (~1.4).",
"guzzlehttp/guzzle": "Required to use the Mailgun and Mandrill mail drivers and the ping methods on schedules (~6.0).",
Expand Down
41 changes: 41 additions & 0 deletions src/Illuminate/Queue/Connectors/InteropConnector.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<?php

namespace Illuminate\Queue\Connectors;

use Illuminate\Queue\InteropQueue;
use Interop\Queue\PsrConnectionFactory;

class InteropConnector implements ConnectorInterface
{
/**
* {@inheritdoc}
*/
public function connect(array $config)
{
$config = array_replace([
'connection_factory_class' => null,
'dsn' => null,
'queue' => 'default',
'time_to_run' => 0,
], $config);

if (empty($config['connection_factory_class'])) {
throw new \LogicException('The "connection_factory_class" option is required');
}

$factoryClass = $config['connection_factory_class'];
if (false == class_exists($factoryClass)) {
throw new \LogicException(sprintf('The "connection_factory_class" option "%s" is not a class', $factoryClass));
}

$rc = new \ReflectionClass($factoryClass);
if (false == $rc->implementsInterface(PsrConnectionFactory::class)) {
throw new \LogicException(sprintf('The "connection_factory_class" option must contain a class that implements "%s" but it is not', PsrConnectionFactory::class));
}

/** @var PsrConnectionFactory $factory */
$factory = new $factoryClass($config['dsn'] ? $config['dsn'] : $config);

return new InteropQueue($factory->createContext(), $config['queue'], $config['time_to_run']);
}
}
132 changes: 132 additions & 0 deletions src/Illuminate/Queue/InteropQueue.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
<?php

namespace Illuminate\Queue;

use Illuminate\Contracts\Queue\Queue as QueueContract;
use Illuminate\Queue\Jobs\InteropJob;
use Interop\Queue\PsrContext;

class InteropQueue extends Queue implements QueueContract
{
/**
* @var string
*/
protected $queueName;

/**
* @var int
*/
protected $timeToRun;
/**
* @var PsrContext
*/
private $psrContext;

/**
* @param PsrContext $psrContext
* @param string $queueName
* @param int $timeToRun
*/
public function __construct(PsrContext $psrContext, $queueName, $timeToRun)
{
$this->psrContext = $psrContext;
$this->queueName = $queueName;
$this->timeToRun = $timeToRun;
}

/**
* {@inheritdoc}
*/
public function size($queue = null)
{
return 0;
}

/**
* {@inheritdoc}
*/
public function push($job, $data = '', $queue = null)
{
return $this->pushRaw($this->createPayload($job, $data), $queue);
}

/**
* Push a new job onto the queue.
*
* @param string $queue
* @param string $job
* @param mixed $data
*
* @return mixed
*/
public function pushOn($queue, $job, $data = '')
{
new \LogicException('to be implemented');
}

/**
* {@inheritdoc}
*/
public function pushRaw($payload, $queue = null, array $options = [])
{
return $this->psrContext->createProducer()->send(
$this->getQueue($queue),
$this->psrContext->createMessage($payload)
);
}

/**
* {@inheritdoc}
*/
public function later($delay, $job, $data = '', $queue = null)
{
new \LogicException('to be implemented');
}

/**
* {@inheritdoc}
*/
public function pop($queue = null)
{
$queue = $this->getQueue($queue);

$psrConsumer = $this->psrContext->createConsumer($queue);
if ($psrMessage = $psrConsumer->receive(1000)) { // 1 sec
return new InteropJob(
$this->container,
$this->psrContext,
$psrConsumer,
$psrMessage,
$this->connectionName
);
}
}

/**
* Get the queue or return the default.
*
* @param string|null $queue
*
* @return \Interop\Queue\PsrQueue
*/
public function getQueue($queue = null)
{
return $this->psrContext->createQueue($queue ?: $this->queueName);
}

/**
* @return PsrContext
*/
public function getPsrContext()
{
return $this->psrContext;
}

/**
* @return int
*/
public function getTimeToRun()
{
return $this->timeToRun;
}
}
94 changes: 94 additions & 0 deletions src/Illuminate/Queue/Jobs/InteropJob.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
<?php

namespace Illuminate\Queue\Jobs;

use Illuminate\Container\Container;
use Illuminate\Contracts\Queue\Job as JobContract;
use Interop\Queue\PsrConsumer;
use Interop\Queue\PsrContext;
use Interop\Queue\PsrMessage;

class InteropJob extends Job implements JobContract
{
/**
* @var PsrContext
*/
private $psrContext;

/**
* @var PsrConsumer
*/
private $psrConsumer;

/**
* @var PsrMessage
*/
private $psrMessage;

/**
* @param Container $container
* @param PsrContext $psrContext
* @param PsrConsumer $psrConsumer
* @param PsrMessage $psrMessage
* @param string $connectionName
*/
public function __construct(Container $container, PsrContext $psrContext, PsrConsumer $psrConsumer, PsrMessage $psrMessage, $connectionName)
{
$this->container = $container;
$this->psrContext = $psrContext;
$this->psrConsumer = $psrConsumer;
$this->psrMessage = $psrMessage;
$this->connectionName = $connectionName;
}

/**
* {@inheritdoc}
*/
public function delete()
{
parent::delete();

$this->psrConsumer->acknowledge($this->psrMessage);
}

/**
* {@inheritdoc}
*/
public function release($delay = 0)
{
if ($delay) {
throw new \LogicException('To be implemented');
}

$requeueMessage = clone $this->psrMessage;
$requeueMessage->setProperty('x-attempts', $this->attempts() + 1);

$this->psrContext->createProducer()->send($this->psrConsumer->getQueue(), $requeueMessage);

$this->psrConsumer->acknowledge($this->psrMessage);
}

/**
* {@inheritdoc}
*/
public function getQueue()
{
return $this->psrConsumer->getQueue()->getQueueName();
}

/**
* {@inheritdoc}
*/
public function attempts()
{
return $this->psrMessage->getProperty('x-attempts', 1);
}

/**
* {@inheritdoc}
*/
public function getRawBody()
{
return $this->psrMessage->getBody();
}
}
16 changes: 15 additions & 1 deletion src/Illuminate/Queue/QueueServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Illuminate\Queue;

use Illuminate\Queue\Connectors\InteropConnector;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Connectors\SqsConnector;
use Illuminate\Queue\Connectors\NullConnector;
Expand Down Expand Up @@ -77,7 +78,7 @@ protected function registerConnection()
*/
public function registerConnectors($manager)
{
foreach (['Null', 'Sync', 'Database', 'Redis', 'Beanstalkd', 'Sqs'] as $connector) {
foreach (['Null', 'Sync', 'Database', 'Redis', 'Beanstalkd', 'Sqs', 'Interop'] as $connector) {
$this->{"register{$connector}Connector"}($manager);
}
}
Expand Down Expand Up @@ -160,6 +161,19 @@ protected function registerSqsConnector($manager)
});
}

/**
* Register the interop queue connector.
*
* @param \Illuminate\Queue\QueueManager $manager
* @return void
*/
protected function registerInteropConnector($manager)
{
$manager->addConnector('interop', function () {
return new InteropConnector();
});
}

/**
* Register the queue worker.
*
Expand Down