Skip to content

Commit f6c5c85

Browse files
committed
multi transport simple client
1 parent b3bc044 commit f6c5c85

15 files changed

+295
-95
lines changed

composer.json

+5
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
"enqueue/sqs": "*@dev",
1616
"enqueue/enqueue-bundle": "*@dev",
1717
"enqueue/job-queue": "*@dev",
18+
"enqueue/simple-client": "*@dev",
1819
"enqueue/test": "*@dev",
1920

2021
"phpunit/phpunit": "^5",
@@ -77,6 +78,10 @@
7778
{
7879
"type": "path",
7980
"url": "pkg/sqs"
81+
},
82+
{
83+
"type": "path",
84+
"url": "pkg/simple-client"
8085
}
8186
]
8287
}

phpunit.xml.dist

+4
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@
5656
<testsuite name="job-queue">
5757
<directory>pkg/job-queue/Tests</directory>
5858
</testsuite>
59+
60+
<testsuite name="simple-client">
61+
<directory>pkg/simple-client/Tests</directory>
62+
</testsuite>
5963
</testsuites>
6064

6165
<filter>

pkg/enqueue/Tests/Functional/Client/RpcClientTest.php

+16-3
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
use Enqueue\AmqpExt\AmqpContext;
66
use Enqueue\Client\RpcClient;
7-
use Enqueue\Client\SimpleClient;
7+
use Enqueue\SimpleClient\SimpleClient;
88
use Enqueue\Consumption\ChainExtension;
99
use Enqueue\Consumption\Extension\LimitConsumedMessagesExtension;
1010
use Enqueue\Consumption\Extension\LimitConsumptionTimeExtension;
@@ -39,14 +39,27 @@ public function setUp()
3939
$this->context = $this->buildAmqpContext();
4040
$this->replyContext = $this->buildAmqpContext();
4141

42-
$this->removeQueue('default');
42+
$this->removeQueue('enqueue.app.default');
4343
}
4444

4545
public function testProduceAndConsumeOneMessage()
4646
{
47+
$config = [
48+
'transport' => [
49+
'rabbitmq_amqp' => [
50+
'host' => getenv('SYMFONY__RABBITMQ__HOST'),
51+
'port' => getenv('SYMFONY__RABBITMQ__AMQP__PORT'),
52+
'login' => getenv('SYMFONY__RABBITMQ__USER'),
53+
'password' => getenv('SYMFONY__RABBITMQ__PASSWORD'),
54+
'vhost' => getenv('SYMFONY__RABBITMQ__VHOST'),
55+
],
56+
],
57+
];
58+
4759
$requestMessage = null;
4860

49-
$client = new SimpleClient($this->context);
61+
$client = new SimpleClient($config);
62+
$client->setupBroker();
5063
$client->bind('foo_topic', 'foo_processor', function (PsrMessage $message, PsrContext $context) use (&$requestMessage) {
5164
$requestMessage = $message;
5265

pkg/enqueue/Tests/Functional/Client/SimpleClientTest.php

-83
This file was deleted.

pkg/enqueue/composer.json

+4-2
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@
2121
"symfony/dependency-injection": "^2.8|^3",
2222
"symfony/config": "^2.8|^3",
2323
"enqueue/null": "^0.3",
24-
"enqueue/test": "^0.3"
24+
"enqueue/test": "^0.3",
25+
"enqueue/simple-client": "^0.3"
2526
},
2627
"suggest": {
2728
"symfony/console": "^2.8|^3 If you want to use li commands",
@@ -31,7 +32,8 @@
3132
"enqueue/stomp": "STOMP transport",
3233
"enqueue/fs": "Filesystem transport",
3334
"enqueue/redis": "Redis transport",
34-
"enqueue/dbal": "Doctrine DBAL transport"
35+
"enqueue/dbal": "Doctrine DBAL transport",
36+
"enqueue/sqs": "Amazon AWS SQS transport"
3537
},
3638
"autoload": {
3739
"psr-4": { "Enqueue\\": "" },

pkg/simple-client/.gitignore

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
*~
2+
/composer.lock
3+
/composer.phar
4+
/phpunit.xml
5+
/vendor/
6+
/.idea/

pkg/simple-client/.travis.yml

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
sudo: false
2+
3+
git:
4+
depth: 1
5+
6+
language: php
7+
8+
php:
9+
- '5.6'
10+
- '7.0'
11+
12+
cache:
13+
directories:
14+
- $HOME/.composer/cache
15+
16+
install:
17+
- composer self-update
18+
- composer install --prefer-source
19+
20+
script:
21+
- vendor/bin/phpunit --exclude-group=functional
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
<?php
2+
namespace Enqueue\SimpleClient\Tests\Functional;
3+
4+
use Enqueue\SimpleClient\SimpleClient;
5+
use Enqueue\Consumption\ChainExtension;
6+
use Enqueue\Consumption\Extension\LimitConsumedMessagesExtension;
7+
use Enqueue\Consumption\Extension\LimitConsumptionTimeExtension;
8+
use Enqueue\Consumption\Result;
9+
use Enqueue\Psr\PsrMessage;
10+
use Enqueue\Test\RabbitmqAmqpExtension;
11+
use Enqueue\Test\RabbitmqManagmentExtensionTrait;
12+
use PHPUnit\Framework\TestCase;
13+
14+
/**
15+
* @group functional
16+
*/
17+
class SimpleClientTest extends TestCase
18+
{
19+
use RabbitmqAmqpExtension;
20+
use RabbitmqManagmentExtensionTrait;
21+
22+
public function setUp()
23+
{
24+
if (false == getenv('SYMFONY__RABBITMQ__HOST')) {
25+
throw new \PHPUnit_Framework_SkippedTestError('Functional tests are not allowed in this environment');
26+
}
27+
28+
$this->removeQueue('enqueue.app.default');
29+
}
30+
31+
public function transportConfigDataProvider()
32+
{
33+
$amqp = [
34+
'transport' => [
35+
'amqp' => [
36+
'host' => getenv('SYMFONY__RABBITMQ__HOST'),
37+
'port' => getenv('SYMFONY__RABBITMQ__AMQP__PORT'),
38+
'login' => getenv('SYMFONY__RABBITMQ__USER'),
39+
'password' => getenv('SYMFONY__RABBITMQ__PASSWORD'),
40+
'vhost' => getenv('SYMFONY__RABBITMQ__VHOST'),
41+
],
42+
],
43+
];
44+
45+
$rabbitmqAmqp = [
46+
'transport' => [
47+
'rabbitmq_amqp' => [
48+
'host' => getenv('SYMFONY__RABBITMQ__HOST'),
49+
'port' => getenv('SYMFONY__RABBITMQ__AMQP__PORT'),
50+
'login' => getenv('SYMFONY__RABBITMQ__USER'),
51+
'password' => getenv('SYMFONY__RABBITMQ__PASSWORD'),
52+
'vhost' => getenv('SYMFONY__RABBITMQ__VHOST'),
53+
],
54+
],
55+
];
56+
57+
return [[$amqp, $rabbitmqAmqp]];
58+
}
59+
60+
/**
61+
* @dataProvider transportConfigDataProvider
62+
*/
63+
public function testProduceAndConsumeOneMessage($config)
64+
{
65+
$actualMessage = null;
66+
67+
$client = new SimpleClient($config);
68+
$client->bind('foo_topic', 'foo_processor', function (PsrMessage $message) use (&$actualMessage) {
69+
$actualMessage = $message;
70+
71+
return Result::ACK;
72+
});
73+
74+
$client->send('foo_topic', 'Hello there!', true);
75+
76+
$client->consume(new ChainExtension([
77+
new LimitConsumptionTimeExtension(new \DateTime('+5sec')),
78+
new LimitConsumedMessagesExtension(2),
79+
]));
80+
81+
$this->assertInstanceOf(PsrMessage::class, $actualMessage);
82+
$this->assertSame('Hello there!', $actualMessage->getBody());
83+
}
84+
85+
/**
86+
* @dataProvider transportConfigDataProvider
87+
*/
88+
public function testProduceAndRouteToTwoConsumes($config)
89+
{
90+
$received = 0;
91+
92+
$client = new SimpleClient($config);
93+
$client->bind('foo_topic', 'foo_processor1', function () use (&$received) {
94+
++$received;
95+
96+
return Result::ACK;
97+
});
98+
$client->bind('foo_topic', 'foo_processor2', function () use (&$received) {
99+
++$received;
100+
101+
return Result::ACK;
102+
});
103+
104+
$client->send('foo_topic', 'Hello there!', true);
105+
106+
$client->consume(new ChainExtension([
107+
new LimitConsumptionTimeExtension(new \DateTime('+5sec')),
108+
new LimitConsumedMessagesExtension(3),
109+
]));
110+
111+
$this->assertSame(2, $received);
112+
}
113+
}

pkg/simple-client/LICENSE

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
Copyright (c) 2016 Kotliar Maksym
2+
3+
Permission is hereby granted, free of charge, to any person obtaining a copy
4+
of this software and associated documentation files (the "Software"), to deal
5+
in the Software without restriction, including without limitation the rights
6+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7+
copies of the Software, and to permit persons to whom the Software is furnished
8+
to do so, subject to the following conditions:
9+
10+
The above copyright notice and this permission notice shall be included in all
11+
copies or substantial portions of the Software.
12+
13+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19+
THE SOFTWARE.

pkg/simple-client/README.md

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# Message Queue Simple Client.

pkg/enqueue/Client/SimpleClient.php pkg/simple-client/SimpleClient.php

+20-2
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,31 @@
11
<?php
2-
namespace Enqueue\Client;
2+
namespace Enqueue\SimpleClient;
33

44
use Enqueue\AmqpExt\Symfony\AmqpTransportFactory;
55
use Enqueue\AmqpExt\Symfony\RabbitMqAmqpTransportFactory;
6+
use Enqueue\Client\ArrayProcessorRegistry;
7+
use Enqueue\Client\Config;
8+
use Enqueue\Client\DelegateProcessor;
9+
use Enqueue\Client\DriverInterface;
610
use Enqueue\Client\Meta\QueueMetaRegistry;
711
use Enqueue\Client\Meta\TopicMetaRegistry;
12+
use Enqueue\Client\ProducerInterface;
13+
use Enqueue\Client\RouterProcessor;
814
use Enqueue\Consumption\CallbackProcessor;
915
use Enqueue\Consumption\ExtensionInterface;
1016
use Enqueue\Consumption\QueueConsumer;
17+
use Enqueue\Dbal\Symfony\DbalTransportFactory;
18+
use Enqueue\Fs\Symfony\FsTransportFactory;
1119
use Enqueue\Psr\PsrContext;
20+
use Enqueue\Redis\Symfony\RedisTransportFactory;
21+
use Enqueue\Sqs\Symfony\SqsTransportFactory;
22+
use Enqueue\Stomp\Symfony\RabbitMqStompTransportFactory;
23+
use Enqueue\Stomp\Symfony\StompTransportFactory;
1224
use Enqueue\Symfony\DefaultTransportFactory;
1325
use Symfony\Component\DependencyInjection\ContainerBuilder;
1426

1527
/**
16-
* Experimental class. Use it speedup setup process and learning but consider to switch to custom solution (build your own client).
28+
* Use it speedup setup process and learning but consider to switch to custom solution (build your own client).
1729
*/
1830
final class SimpleClient
1931
{
@@ -80,6 +92,12 @@ private function buildContainerExtension($config)
8092
'default' => DefaultTransportFactory::class,
8193
'amqp' => AmqpTransportFactory::class,
8294
'rabbitmq_amqp' => RabbitMqAmqpTransportFactory::class,
95+
'dbal' => DbalTransportFactory::class,
96+
'fs' => FsTransportFactory::class,
97+
'redis' => RedisTransportFactory::class,
98+
'stomp' => StompTransportFactory::class,
99+
'rabbitmq_stomp' => RabbitMqStompTransportFactory::class,
100+
'sqs' => SqsTransportFactory::class,
83101
];
84102

85103
$extension = new SimpleClientContainerExtension();

0 commit comments

Comments
 (0)