Skip to content

Commit b38e40a

Browse files
authored
Merge pull request #672 from php-enqueue/sns
Amazon SNS transport. Publish-Subscribe support based on combination of SQS and SNQ
2 parents 532425f + 905c471 commit b38e40a

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

72 files changed

+3453
-5
lines changed

bin/subtree-split

+4
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ remote rdkafka [email protected]:php-enqueue/rdkafka.git
5858
remote dbal [email protected]:php-enqueue/dbal.git
5959
remote null [email protected]:php-enqueue/null.git
6060
remote sqs [email protected]:php-enqueue/sqs.git
61+
remote sns [email protected]:php-enqueue/sns.git
62+
remote snsqs [email protected]:php-enqueue/snsqs.git
6163
remote gps [email protected]:php-enqueue/gps.git
6264
remote enqueue-bundle [email protected]:php-enqueue/enqueue-bundle.git
6365
remote job-queue [email protected]:php-enqueue/job-queue.git
@@ -84,6 +86,8 @@ split 'pkg/redis' redis
8486
split 'pkg/dbal' dbal
8587
split 'pkg/null' null
8688
split 'pkg/sqs' sqs
89+
split 'pkg/sns' sns
90+
split 'pkg/snsqs' snsqs
8791
split 'pkg/gps' gps
8892
split 'pkg/enqueue-bundle' enqueue-bundle
8993
split 'pkg/job-queue' job-queue

composer.json

+3-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
"doctrine/orm": "~2.4",
2626
"mongodb/mongodb": "^1.2",
2727
"pda/pheanstalk": "^3",
28-
"aws/aws-sdk-php": "~3.26",
28+
"aws/aws-sdk-php": "^3.26",
2929
"stomp-php/stomp-php": "^4",
3030
"php-http/guzzle6-adapter": "^1.1",
3131
"php-http/client-common": "^1.7@dev",
@@ -76,6 +76,8 @@
7676
"Enqueue\\Redis\\": "pkg/redis/",
7777
"Enqueue\\SimpleClient\\": "pkg/simple-client/",
7878
"Enqueue\\Sqs\\": "pkg/sqs/",
79+
"Enqueue\\Sns\\": "pkg/sns/",
80+
"Enqueue\\SnsQs\\": "pkg/snsqs/",
7981
"Enqueue\\Stomp\\": "pkg/stomp/",
8082
"Enqueue\\Test\\": "pkg/test/",
8183
"Enqueue\\Dsn\\": "pkg/dsn/",

docker-compose.yml

+4-1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ services:
3434
- PHPREDIS_DSN=redis+phpredis://redis
3535
- GPS_DSN=gps:?projectId=mqdev&emulatorHost=http://google-pubsub:8085
3636
- SQS_DSN=sqs:?key=key&secret=secret&region=us-east-1&endpoint=http://localstack:4576&version=latest
37+
- SNS_DSN=sns:?key=key&secret=secret&region=us-east-1&endpoint=http://localstack:4575&version=latest
38+
- SNSQS_DSN=snsqs:?key=key&secret=secret&region=us-east-1&sns_endpoint=http://localstack:4575&sqs_endpoint=http://localstack:4576&version=latest
3739
- WAMP_DSN=wamp://thruway:9090
3840
- REDIS_HOST=redis
3941
- REDIS_PORT=6379
@@ -129,9 +131,10 @@ services:
129131
image: 'localstack/localstack:latest'
130132
ports:
131133
- '4576:4576'
134+
- '4575:4575'
132135
environment:
133136
HOSTNAME_EXTERNAL: 'localstack'
134-
SERVICES: 'sqs'
137+
SERVICES: 'sqs,sns'
135138

136139
influxdb:
137140
image: 'influxdb:latest'

docs/index.md

+1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ Enqueue is an MIT-licensed open source project with its ongoing development made
1212
* [Quick tour](quick_tour.md)
1313
* [Transports](#transports)
1414
- Amqp based on [the ext](transport/amqp.md), [bunny](transport/amqp_bunny.md), [the lib](transport/amqp_lib.md)
15+
- [Amazon SNS-SQS](transport/snsqs.md)
1516
- [Amazon SQS](transport/sqs.md)
1617
- [Google PubSub](transport/gps.md)
1718
- [Beanstalk (Pheanstalk)](transport/pheanstalk.md)

docs/transport/snsqs.md

+182
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
<h2 align="center">Supporting Enqueue</h2>
2+
3+
Enqueue is an MIT-licensed open source project with its ongoing development made possible entirely by the support of community and our customers. If you'd like to join them, please consider:
4+
5+
- [Become a sponsor](https://www.patreon.com/makasim)
6+
- [Become our client](http://forma-pro.com/)
7+
8+
---
9+
10+
# Amazon SNS-SQS transport
11+
12+
Utilize two Amazon services [SNS-SQS](https://docs.aws.amazon.com/sns/latest/dg/sns-sqs-as-subscriber.html) to
13+
implement [Publish-Subscribe](https://www.enterpriseintegrationpatterns.com/patterns/messaging/PublishSubscribeChannel.html)
14+
enterprise integration pattern. As opposed to single SQS transport this adds ability to use [MessageBus](https://www.enterpriseintegrationpatterns.com/patterns/messaging/MessageBus.html)
15+
with enqueue.
16+
17+
A transport for [Amazon SQS](https://aws.amazon.com/sqs/) broker.
18+
It uses internally official [aws sdk library](https://packagist.org/packages/aws/aws-sdk-php)
19+
20+
* [Installation](#installation)
21+
* [Create context](#create-context)
22+
* [Declare topic, queue and bind them together](#declare-topic-queue-and-bind-them-together)
23+
* [Send message to topic](#send-message-to-topic)
24+
* [Send message to queue](#send-message-to-queue)
25+
* [Consume message](#consume-message)
26+
* [Purge queue messages](#purge-queue-messages)
27+
* [Queue from another AWS account](#queue-from-another-aws-account)
28+
29+
## Installation
30+
31+
```bash
32+
$ composer require enqueue/sqs
33+
```
34+
35+
## Create context
36+
37+
```php
38+
<?php
39+
use Enqueue\SnsQs\SnsQsConnectionFactory;
40+
41+
$factory = new SnsQsConnectionFactory([
42+
'key' => 'aKey',
43+
'secret' => 'aSecret',
44+
'region' => 'aRegion',
45+
46+
// or you can segregate options using prefixes "sns_", "sqs_"
47+
'key' => 'aKey', // common option for both SNS and SQS
48+
'sns_region' => 'aSnsRegion', // SNS transport option
49+
'sqs_region' => 'aSqsRegion', // SQS transport option
50+
]);
51+
52+
// same as above but given as DSN string. You may need to url encode secret if it contains special char (like +)
53+
$factory = new SnsQsConnectionFactory('snsqs:?key=aKey&secret=aSecret&region=aRegion');
54+
55+
$context = $factory->createContext();
56+
57+
// if you have enqueue/enqueue library installed you can use a factory to build context from DSN
58+
$context = (new \Enqueue\ConnectionFactoryFactory())->create('snsqs:')->createContext();
59+
```
60+
61+
## Declare topic, queue and bind them together
62+
63+
Declare topic, queue operation creates a topic, queue on a broker side.
64+
Bind creates connection between topic and queue. You publish message to
65+
the topic and topic sends message to each queue connected to the topic.
66+
67+
68+
```php
69+
<?php
70+
/** @var \Enqueue\SnsQs\SnsQsContext $context */
71+
72+
$inTopic = $context->createTopic('in');
73+
$context->declareTopic($inTopic);
74+
75+
$out1Queue = $context->createQueue('out1');
76+
$context->declareQueue($out1Queue);
77+
78+
$out2Queue = $context->createQueue('out2');
79+
$context->declareQueue($out2Queue);
80+
81+
$context->bind($inTopic, $out1Queue);
82+
$context->bind($inTopic, $out2Queue);
83+
84+
// to remove topic/queue use deleteTopic/deleteQueue method
85+
//$context->deleteTopic($inTopic);
86+
//$context->deleteQueue($out1Queue);
87+
//$context->unbind(inTopic, $out1Queue);
88+
```
89+
90+
## Send message to topic
91+
92+
```php
93+
<?php
94+
/** @var \Enqueue\SnsQs\SnsQsContext $context */
95+
96+
$inTopic = $context->createTopic('in');
97+
$message = $context->createMessage('Hello world!');
98+
99+
$context->createProducer()->send($inTopic, $message);
100+
```
101+
102+
## Send message to queue
103+
104+
You can bypass topic and publish message directly to the queue
105+
106+
```php
107+
<?php
108+
/** @var \Enqueue\SnsQs\SnsQsContext $context */
109+
110+
$fooQueue = $context->createQueue('foo');
111+
$message = $context->createMessage('Hello world!');
112+
113+
$context->createProducer()->send($fooQueue, $message);
114+
```
115+
116+
117+
## Consume message:
118+
119+
```php
120+
<?php
121+
/** @var \Enqueue\SnsQs\SnsQsContext $context */
122+
123+
$out1Queue = $context->createQueue('out1');
124+
$consumer = $context->createConsumer($out1Queue);
125+
126+
$message = $consumer->receive();
127+
128+
// process a message
129+
130+
$consumer->acknowledge($message);
131+
// $consumer->reject($message);
132+
```
133+
134+
## Purge queue messages:
135+
136+
```php
137+
<?php
138+
/** @var \Enqueue\SnsQs\SnsQsContext $context */
139+
140+
$fooQueue = $context->createQueue('foo');
141+
142+
$context->purgeQueue($fooQueue);
143+
```
144+
145+
## Queue from another AWS account
146+
147+
SQS allows to use queues from another account. You could set it globally for all queues via option `queue_owner_aws_account_id` or
148+
per queue using `SnsQsQueue::setQueueOwnerAWSAccountId` method.
149+
150+
```php
151+
<?php
152+
use Enqueue\SnsQs\SnsQsConnectionFactory;
153+
154+
// globally for all queues
155+
$factory = new SnsQsConnectionFactory('snsqs:?sqs_queue_owner_aws_account_id=awsAccountId');
156+
157+
$context = (new SnsQsConnectionFactory('snsqs:'))->createContext();
158+
159+
// per queue.
160+
$queue = $context->createQueue('foo');
161+
$queue->setQueueOwnerAWSAccountId('awsAccountId');
162+
```
163+
164+
## Multi region examples
165+
166+
Enqueue SNSQS provides a generic multi-region support. This enables users to specify which AWS Region to send a command to by setting region on SnsQsQueue.
167+
If not specified the default region is used.
168+
169+
```php
170+
<?php
171+
use Enqueue\SnsQs\SnsQsConnectionFactory;
172+
173+
$context = (new SnsQsConnectionFactory('snsqs:?region=eu-west-2'))->createContext();
174+
175+
$queue = $context->createQueue('foo');
176+
$queue->setRegion('us-west-2');
177+
178+
// the request goes to US West (Oregon) Region
179+
$context->declareQueue($queue);
180+
```
181+
182+
[back to index](../index.md)

phpunit.xml.dist

+8
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,14 @@
6161
<directory>pkg/sqs/Tests</directory>
6262
</testsuite>
6363

64+
<testsuite name="sns transport">
65+
<directory>pkg/sns/Tests</directory>
66+
</testsuite>
67+
68+
<testsuite name="snsqs transport">
69+
<directory>pkg/snsqs/Tests</directory>
70+
</testsuite>
71+
6472
<testsuite name="pheanstalk transport">
6573
<directory>pkg/pheanstalk/Tests</directory>
6674
</testsuite>

pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php

+8
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,14 @@ public function provideEnqueueConfigs()
143143
],
144144
]];
145145

146+
yield 'snsqs' => [[
147+
'default' => [
148+
'transport' => [
149+
'dsn' => getenv('SNSQS_DSN'),
150+
],
151+
],
152+
]];
153+
146154
//
147155
// yield 'gps' => [[
148156
// 'transport' => [
+90
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
<?php
2+
3+
namespace Enqueue\Client\Driver;
4+
5+
use Enqueue\SnsQs\SnsQsContext;
6+
use Enqueue\SnsQs\SnsQsTopic;
7+
use Interop\Queue\Destination;
8+
use Psr\Log\LoggerInterface;
9+
use Psr\Log\NullLogger;
10+
11+
/**
12+
* @method SnsQsContext getContext()
13+
* @method SnsQsTopic createRouterTopic()
14+
*/
15+
class SnsQsDriver extends GenericDriver
16+
{
17+
public function __construct(SnsQsContext $context, ...$args)
18+
{
19+
parent::__construct($context, ...$args);
20+
}
21+
22+
public function setupBroker(LoggerInterface $logger = null): void
23+
{
24+
$logger = $logger ?: new NullLogger();
25+
$log = function ($text, ...$args) use ($logger) {
26+
$logger->debug(sprintf('[SqsQsDriver] '.$text, ...$args));
27+
};
28+
29+
// setup router
30+
$routerTopic = $this->createRouterTopic();
31+
$log('Declare router topic: %s', $routerTopic->getTopicName());
32+
$this->getContext()->declareTopic($routerTopic);
33+
34+
$routerQueue = $this->createQueue($this->getConfig()->getRouterQueue());
35+
$log('Declare router queue: %s', $routerQueue->getQueueName());
36+
$this->getContext()->declareQueue($routerQueue);
37+
38+
$log('Bind router queue to topic: %s -> %s', $routerQueue->getQueueName(), $routerTopic->getTopicName());
39+
$this->getContext()->bind($routerTopic, $routerQueue);
40+
41+
// setup queues
42+
$declaredQueues = [];
43+
$declaredTopics = [];
44+
foreach ($this->getRouteCollection()->all() as $route) {
45+
$queue = $this->createRouteQueue($route);
46+
if (false === array_key_exists($queue->getQueueName(), $declaredQueues)) {
47+
$log('Declare processor queue: %s', $queue->getQueueName());
48+
$this->getContext()->declareQueue($queue);
49+
50+
$declaredQueues[$queue->getQueueName()] = true;
51+
}
52+
53+
if ($route->isCommand()) {
54+
continue;
55+
}
56+
57+
$topic = $this->doCreateTopic($this->createTransportQueueName($route->getSource(), true));
58+
if (false === array_key_exists($topic->getTopicName(), $declaredTopics)) {
59+
$log('Declare processor topic: %s', $topic->getTopicName());
60+
$this->getContext()->declareTopic($topic);
61+
62+
$declaredTopics[$topic->getTopicName()] = true;
63+
}
64+
65+
$log('Bind processor queue to topic: %s -> %s', $queue->getQueueName(), $topic->getTopicName());
66+
$this->getContext()->bind($topic, $queue);
67+
}
68+
}
69+
70+
protected function createRouterTopic(): Destination
71+
{
72+
return $this->doCreateTopic(
73+
$this->createTransportRouterTopicName($this->getConfig()->getRouterTopic(), true)
74+
);
75+
}
76+
77+
protected function createTransportRouterTopicName(string $name, bool $prefix): string
78+
{
79+
$name = parent::createTransportRouterTopicName($name, $prefix);
80+
81+
return str_replace('.', '_dot_', $name);
82+
}
83+
84+
protected function createTransportQueueName(string $name, bool $prefix): string
85+
{
86+
$name = parent::createTransportQueueName($name, $prefix);
87+
88+
return str_replace('.', '_dot_', $name);
89+
}
90+
}

pkg/enqueue/Client/Resources.php

+7
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
use Enqueue\Client\Driver\RabbitMqStompDriver;
1313
use Enqueue\Client\Driver\RdKafkaDriver;
1414
use Enqueue\Client\Driver\RedisDriver;
15+
use Enqueue\Client\Driver\SnsQsDriver;
1516
use Enqueue\Client\Driver\SqsDriver;
1617
use Enqueue\Client\Driver\StompDriver;
1718

@@ -92,6 +93,12 @@ public static function getKnownDrivers(): array
9293
'requiredSchemeExtensions' => [],
9394
'packages' => ['enqueue/enqueue', 'enqueue/sqs'],
9495
];
96+
$map[] = [
97+
'schemes' => ['snsqs'],
98+
'driverClass' => SnsQsDriver::class,
99+
'requiredSchemeExtensions' => [],
100+
'packages' => ['enqueue/enqueue', 'enqueue/sqs', 'enqueue/sns', 'enqueue/snsqs'],
101+
];
95102
$map[] = [
96103
'schemes' => ['stomp'],
97104
'driverClass' => StompDriver::class,

0 commit comments

Comments
 (0)