Skip to content

Commit 81412df

Browse files
committed
Develop
1 parent 9adbf01 commit 81412df

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+2647
-93
lines changed

bin/run-fun-test.sh

+2-1
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@
33
set -x
44
set -e
55

6-
COMPOSE_PROJECT_NAME=mqdev docker-compose run --workdir="/mqdev" --rm dev ./bin/test "$@"
6+
#COMPOSE_PROJECT_NAME=mqdev
7+
docker-compose run --workdir="/mqdev" --rm dev ./bin/test "$@"

bin/test

+1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ waitForService redis 6379 50
3636
waitForService beanstalkd 11300 50
3737
waitForService gearmand 4730 50
3838
waitForService kafka 9092 50
39+
waitForService mongo 27017 50
3940

4041
php pkg/job-queue/Tests/Functional/app/console doctrine:database:create --if-not-exists
4142
php pkg/job-queue/Tests/Functional/app/console doctrine:schema:update --force

composer.json

+5
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
"enqueue/fs": "*@dev",
1717
"enqueue/null": "*@dev",
1818
"enqueue/dbal": "*@dev",
19+
"enqueue/mongodb": "*@dev",
1920
"enqueue/sqs": "*@dev",
2021
"enqueue/pheanstalk": "*@dev",
2122
"enqueue/gearman": "*@dev",
@@ -143,6 +144,10 @@
143144
{
144145
"type": "path",
145146
"url": "pkg/async-event-dispatcher"
147+
},
148+
{
149+
"type": "path",
150+
"url": "pkg/mongodb"
146151
}
147152
]
148153
}

docker-compose.yml

+8-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ services:
1313
- zookeeper
1414
- google-pubsub
1515
- rabbitmqssl
16+
- mongo
1617
volumes:
1718
- './:/mqdev'
1819
environment:
@@ -24,7 +25,7 @@ services:
2425
- RABBITMQ_PASSWORD=guest
2526
- RABBITMQ_VHOST=mqdev
2627
- RABBITMQ_AMQP__PORT=5672
27-
- RABBITMQ_STOMP_PORT=61613
28+
- RABBITMQ_STOMP_PORT=61613
2829
- DOCTRINE_DRIVER=pdo_mysql
2930
- DOCTRINE_HOST=mysql
3031
- DOCTRINE_PORT=3306
@@ -44,6 +45,7 @@ services:
4445
- RDKAFKA_PORT=9092
4546
- PUBSUB_EMULATOR_HOST=http://google-pubsub:8085
4647
- GCLOUD_PROJECT=mqdev
48+
- MONGO_DSN=mongodb://mongo
4749

4850
rabbitmq:
4951
image: 'enqueue/rabbitmq:latest'
@@ -102,6 +104,11 @@ services:
102104
image: 'google/cloud-sdk:latest'
103105
entrypoint: 'gcloud beta emulators pubsub start --host-port=0.0.0.0:8085'
104106

107+
mongo:
108+
image: mongo
109+
ports:
110+
- "27017:27017"
111+
105112
volumes:
106113
mysql-data:
107114
driver: local

pkg/dbal/.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@
44
/phpunit.xml
55
/vendor/
66
/.idea/
7+
/examples/

pkg/dbal/examples/consume.php

-46
This file was deleted.

pkg/dbal/examples/produce.php

-45
This file was deleted.

pkg/mongodb/.gitignore

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
*~
2+
/composer.lock
3+
/composer.phar
4+
/phpunit.xml
5+
/vendor/
6+
/.idea/

pkg/mongodb/Client/MongodbDriver.php

+186
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
<?php
2+
3+
namespace Enqueue\Mongodb\Client;
4+
5+
use Enqueue\Client\Config;
6+
use Enqueue\Client\DriverInterface;
7+
use Enqueue\Client\Message;
8+
use Enqueue\Client\MessagePriority;
9+
use Enqueue\Client\Meta\QueueMetaRegistry;
10+
use Enqueue\Mongodb\MongodbContext;
11+
use Enqueue\Mongodb\MongodbMessage;
12+
use Interop\Queue\PsrMessage;
13+
use Psr\Log\LoggerInterface;
14+
use Psr\Log\NullLogger;
15+
16+
class MongodbDriver implements DriverInterface
17+
{
18+
/**
19+
* @var MongodbContext
20+
*/
21+
private $context;
22+
23+
/**
24+
* @var Config
25+
*/
26+
private $config;
27+
28+
/**
29+
* @var QueueMetaRegistry
30+
*/
31+
private $queueMetaRegistry;
32+
33+
/**
34+
* @var array
35+
*/
36+
private static $priorityMap = [
37+
MessagePriority::VERY_LOW => 0,
38+
MessagePriority::LOW => 1,
39+
MessagePriority::NORMAL => 2,
40+
MessagePriority::HIGH => 3,
41+
MessagePriority::VERY_HIGH => 4,
42+
];
43+
44+
/**
45+
* @param MongodbContext $context
46+
* @param Config $config
47+
* @param QueueMetaRegistry $queueMetaRegistry
48+
*/
49+
public function __construct(MongodbContext $context, Config $config, QueueMetaRegistry $queueMetaRegistry)
50+
{
51+
$this->context = $context;
52+
$this->config = $config;
53+
$this->queueMetaRegistry = $queueMetaRegistry;
54+
}
55+
56+
/**
57+
* {@inheritdoc}
58+
*
59+
* @return MongodbMessage
60+
*/
61+
public function createTransportMessage(Message $message)
62+
{
63+
$properties = $message->getProperties();
64+
65+
$headers = $message->getHeaders();
66+
$headers['content_type'] = $message->getContentType();
67+
68+
$transportMessage = $this->context->createMessage();
69+
$transportMessage->setBody($message->getBody());
70+
$transportMessage->setHeaders($headers);
71+
$transportMessage->setProperties($properties);
72+
$transportMessage->setMessageId($message->getMessageId());
73+
$transportMessage->setTimestamp($message->getTimestamp());
74+
$transportMessage->setDeliveryDelay($message->getDelay());
75+
$transportMessage->setReplyTo($message->getReplyTo());
76+
$transportMessage->setCorrelationId($message->getCorrelationId());
77+
if (array_key_exists($message->getPriority(), self::$priorityMap)) {
78+
$transportMessage->setPriority(self::$priorityMap[$message->getPriority()]);
79+
}
80+
81+
return $transportMessage;
82+
}
83+
84+
/**
85+
* @param MongodbMessage $message
86+
*
87+
* {@inheritdoc}
88+
*/
89+
public function createClientMessage(PsrMessage $message)
90+
{
91+
$clientMessage = new Message();
92+
93+
$clientMessage->setBody($message->getBody());
94+
$clientMessage->setHeaders($message->getHeaders());
95+
$clientMessage->setProperties($message->getProperties());
96+
97+
$clientMessage->setContentType($message->getHeader('content_type'));
98+
$clientMessage->setMessageId($message->getMessageId());
99+
$clientMessage->setTimestamp($message->getTimestamp());
100+
$clientMessage->setDelay($message->getDeliveryDelay());
101+
$clientMessage->setReplyTo($message->getReplyTo());
102+
$clientMessage->setCorrelationId($message->getCorrelationId());
103+
104+
$priorityMap = array_flip(self::$priorityMap);
105+
$priority = array_key_exists($message->getPriority(), $priorityMap) ?
106+
$priorityMap[$message->getPriority()] :
107+
MessagePriority::NORMAL;
108+
$clientMessage->setPriority($priority);
109+
110+
return $clientMessage;
111+
}
112+
113+
/**
114+
* {@inheritdoc}
115+
*/
116+
public function sendToRouter(Message $message)
117+
{
118+
if (false == $message->getProperty(Config::PARAMETER_TOPIC_NAME)) {
119+
throw new \LogicException('Topic name parameter is required but is not set');
120+
}
121+
122+
$queue = $this->createQueue($this->config->getRouterQueueName());
123+
$transportMessage = $this->createTransportMessage($message);
124+
125+
$this->context->createProducer()->send($queue, $transportMessage);
126+
}
127+
128+
/**
129+
* {@inheritdoc}
130+
*/
131+
public function sendToProcessor(Message $message)
132+
{
133+
if (false == $message->getProperty(Config::PARAMETER_PROCESSOR_NAME)) {
134+
throw new \LogicException('Processor name parameter is required but is not set');
135+
}
136+
137+
if (false == $queueName = $message->getProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME)) {
138+
throw new \LogicException('Queue name parameter is required but is not set');
139+
}
140+
141+
$transportMessage = $this->createTransportMessage($message);
142+
$destination = $this->createQueue($queueName);
143+
144+
$this->context->createProducer()->send($destination, $transportMessage);
145+
}
146+
147+
/**
148+
* {@inheritdoc}
149+
*/
150+
public function createQueue($queueName)
151+
{
152+
$transportName = $this->queueMetaRegistry->getQueueMeta($queueName)->getTransportName();
153+
154+
return $this->context->createQueue($transportName);
155+
}
156+
157+
/**
158+
* {@inheritdoc}
159+
*/
160+
public function setupBroker(LoggerInterface $logger = null)
161+
{
162+
$logger = $logger ?: new NullLogger();
163+
$log = function ($text, ...$args) use ($logger) {
164+
$logger->debug(sprintf('[MongodbDriver] '.$text, ...$args));
165+
};
166+
$config = $this->context->getConfig();
167+
$log('Creating database: "%s"', $config['dbname']);
168+
$this->context->createDataBaseTable();
169+
}
170+
171+
/**
172+
* {@inheritdoc}
173+
*/
174+
public function getConfig()
175+
{
176+
return $this->config;
177+
}
178+
179+
/**
180+
* @return array
181+
*/
182+
public static function getPriorityMap()
183+
{
184+
return self::$priorityMap;
185+
}
186+
}

0 commit comments

Comments
 (0)