Skip to content

Commit 2376d18

Browse files
committed
[client] SpoolProducer
1 parent bec5bb9 commit 2376d18

File tree

13 files changed

+460
-4
lines changed

13 files changed

+460
-4
lines changed

composer.json

+2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
"symfony/monolog-bundle": "^2.8|^3",
2525
"symfony/browser-kit": "^2.8|^3",
2626
"symfony/expression-language": "^2.8|^3",
27+
"symfony/event-dispatcher": "^2.8|^3",
28+
"symfony/console": "^2.8|^3",
2729
"friendsofphp/php-cs-fixer": "^2",
2830
"empi89/php-amqp-stubs": "*@dev",
2931
"phpstan/phpstan": "^0.7.0"

pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php

+3-2
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ public function load(array $configs, ContainerBuilder $container)
6464

6565
if (isset($config['client'])) {
6666
$loader->load('client.yml');
67+
$loader->load('extensions/flush_spool_producer_extension.yml');
6768

6869
foreach ($config['transport'] as $name => $transportConfig) {
6970
$this->factories[$name]->createDriver($container, $transportConfig);
@@ -88,10 +89,10 @@ public function load(array $configs, ContainerBuilder $container)
8889
$container->setParameter('enqueue.client.default_queue_name', $config['client']['default_processor_queue']);
8990

9091
if (false == empty($config['client']['traceable_producer'])) {
91-
$producerId = 'enqueue.client.traceable_message_producer';
92+
$producerId = 'enqueue.client.traceable_producer';
9293
$container->register($producerId, TraceableProducer::class)
9394
->setDecoratedService('enqueue.client.producer')
94-
->addArgument(new Reference('enqueue.client.traceable_message_producer.inner'))
95+
->addArgument(new Reference('enqueue.client.traceable_producer.inner'))
9596
;
9697
}
9798

pkg/enqueue-bundle/Resources/config/client.yml

+15
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,11 @@ services:
99
- '@enqueue.client.driver'
1010
- '@enqueue.client.extensions'
1111

12+
enqueue.client.spool_producer:
13+
class: 'Enqueue\Client\SpoolProducer'
14+
arguments:
15+
- '@enqueue.client.producer'
16+
1217
enqueue.client.extensions:
1318
class: 'Enqueue\Client\ChainExtension'
1419
public: false
@@ -18,6 +23,9 @@ services:
1823
enqueue.producer:
1924
alias: 'enqueue.client.producer'
2025

26+
enqueue.spool_producer:
27+
alias: 'enqueue.client.spool_producer'
28+
2129
enqueue.client.rpc_client:
2230
class: 'Enqueue\Client\RpcClient'
2331
arguments:
@@ -123,3 +131,10 @@ services:
123131
name: 'data_collector'
124132
template: 'EnqueueBundle:Profiler:panel.html.twig'
125133
id: 'enqueue.message_queue'
134+
135+
enqueue.flush_spool_producer_listener:
136+
class: 'Enqueue\Symfony\Client\FlushSpoolProducerListener'
137+
arguments:
138+
- '@enqueue.client.spool_producer'
139+
tags:
140+
- { name: 'kernel.event_subscriber' }
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
services:
2+
enqueue.client.flush_spool_producer_extension:
3+
class: 'Enqueue\Client\ConsumptionExtension\FlushSpoolProducerExtension'
4+
public: false
5+
arguments:
6+
- '@enqueue.client.spool_producer'
7+
tags:
8+
- { name: 'enqueue.consumption.extension', priority: -100 }
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
<?php
2+
3+
namespace Enqueue\Bundle\Tests\Functional\Client;
4+
5+
use Enqueue\Bundle\Tests\Functional\WebTestCase;
6+
use Enqueue\Client\SpoolProducer;
7+
8+
/**
9+
* @group functional
10+
*/
11+
class SpoolProducerTest extends WebTestCase
12+
{
13+
public function testCouldBeGetFromContainerAsService()
14+
{
15+
$producer = $this->container->get('enqueue.client.spool_producer');
16+
17+
$this->assertInstanceOf(SpoolProducer::class, $producer);
18+
}
19+
20+
public function testCouldBeGetFromContainerAsShortenAlias()
21+
{
22+
$producer = $this->container->get('enqueue.client.spool_producer');
23+
$aliasProducer = $this->container->get('enqueue.spool_producer');
24+
25+
$this->assertSame($producer, $aliasProducer);
26+
}
27+
}

pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php

+2-2
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ public function testShouldUseTraceableMessageProducerIfTraceableProducerOptionSe
246246
],
247247
]], $container);
248248

249-
$producer = $container->getDefinition('enqueue.client.traceable_message_producer');
249+
$producer = $container->getDefinition('enqueue.client.traceable_producer');
250250
self::assertEquals(TraceableProducer::class, $producer->getClass());
251251
self::assertEquals(
252252
['enqueue.client.producer', null, 0],
@@ -255,7 +255,7 @@ public function testShouldUseTraceableMessageProducerIfTraceableProducerOptionSe
255255

256256
self::assertInstanceOf(Reference::class, $producer->getArgument(0));
257257
self::assertEquals(
258-
'enqueue.client.traceable_message_producer.inner',
258+
'enqueue.client.traceable_producer.inner',
259259
(string) $producer->getArgument(0)
260260
);
261261
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
<?php
2+
3+
namespace Enqueue\Client\ConsumptionExtension;
4+
5+
use Enqueue\Client\SpoolProducer;
6+
use Enqueue\Consumption\Context;
7+
use Enqueue\Consumption\EmptyExtensionTrait;
8+
use Enqueue\Consumption\ExtensionInterface;
9+
10+
class FlushSpoolProducerExtension implements ExtensionInterface
11+
{
12+
use EmptyExtensionTrait;
13+
14+
/**
15+
* @var SpoolProducer
16+
*/
17+
private $producer;
18+
19+
/**
20+
* @param SpoolProducer $producer
21+
*/
22+
public function __construct(SpoolProducer $producer)
23+
{
24+
$this->producer = $producer;
25+
}
26+
27+
/**
28+
* {@inheritdoc}
29+
*/
30+
public function onPostReceived(Context $context)
31+
{
32+
$this->producer->flush();
33+
}
34+
35+
public function onInterrupted(Context $context)
36+
{
37+
$this->producer->flush();
38+
}
39+
}

pkg/enqueue/Client/SpoolProducer.php

+46
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
<?php
2+
3+
namespace Enqueue\Client;
4+
5+
class SpoolProducer implements ProducerInterface
6+
{
7+
/**
8+
* @var ProducerInterface
9+
*/
10+
private $realProducer;
11+
12+
/**
13+
* @var array
14+
*/
15+
private $queue;
16+
17+
/**
18+
* @param ProducerInterface $realProducer
19+
*/
20+
public function __construct(ProducerInterface $realProducer)
21+
{
22+
$this->realProducer = $realProducer;
23+
24+
$this->queue = new \SplQueue();
25+
}
26+
27+
/**
28+
* {@inheritdoc}
29+
*/
30+
public function send($topic, $message)
31+
{
32+
$this->queue->enqueue([$topic, $message]);
33+
}
34+
35+
/**
36+
* When it is called it sends all previously queued messages.
37+
*/
38+
public function flush()
39+
{
40+
while (false == $this->queue->isEmpty()) {
41+
list($topic, $message) = $this->queue->dequeue();
42+
43+
$this->realProducer->send($topic, $message);
44+
}
45+
}
46+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
<?php
2+
3+
namespace Enqueue\Symfony\Client;
4+
5+
use Enqueue\Client\SpoolProducer;
6+
use Symfony\Component\Console\ConsoleEvents;
7+
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
8+
use Symfony\Component\HttpKernel\KernelEvents;
9+
10+
class FlushSpoolProducerListener implements EventSubscriberInterface
11+
{
12+
/**
13+
* @var SpoolProducer
14+
*/
15+
private $producer;
16+
17+
/**
18+
* @param SpoolProducer $producer
19+
*/
20+
public function __construct(SpoolProducer $producer)
21+
{
22+
$this->producer = $producer;
23+
}
24+
25+
public function flushMessages()
26+
{
27+
$this->producer->flush();
28+
}
29+
30+
/**
31+
* {@inheritdoc}
32+
*/
33+
public static function getSubscribedEvents()
34+
{
35+
$events = [];
36+
37+
if (class_exists(KernelEvents::class)) {
38+
$events[KernelEvents::TERMINATE] = 'flushMessages';
39+
}
40+
41+
if (class_exists(ConsoleEvents::class)) {
42+
$events[ConsoleEvents::TERMINATE] = 'flushMessages';
43+
}
44+
45+
return $events;
46+
}
47+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
<?php
2+
3+
namespace Enqueue\Tests\Client\ConsumptionExtension;
4+
5+
use Enqueue\Client\ConsumptionExtension\FlushSpoolProducerExtension;
6+
use Enqueue\Client\SpoolProducer;
7+
use Enqueue\Consumption\Context;
8+
use Enqueue\Consumption\ExtensionInterface;
9+
use Enqueue\Test\ClassExtensionTrait;
10+
use PHPUnit\Framework\TestCase;
11+
12+
class FlushSpoolProducerExtensionTest extends TestCase
13+
{
14+
use ClassExtensionTrait;
15+
16+
public function testShouldImplementExtensionInterface()
17+
{
18+
$this->assertClassImplements(ExtensionInterface::class, FlushSpoolProducerExtension::class);
19+
}
20+
21+
public function testCouldBeConstructedWithSpoolProducerAsFirstArgument()
22+
{
23+
new FlushSpoolProducerExtension($this->createSpoolProducerMock());
24+
}
25+
26+
public function testShouldDoNothingOnStart()
27+
{
28+
$producer = $this->createSpoolProducerMock();
29+
$producer
30+
->expects(self::never())
31+
->method('flush')
32+
;
33+
34+
$extension = new FlushSpoolProducerExtension($producer);
35+
$extension->onStart($this->createContextMock());
36+
}
37+
38+
public function testShouldDoNothingOnBeforeReceive()
39+
{
40+
$producer = $this->createSpoolProducerMock();
41+
$producer
42+
->expects(self::never())
43+
->method('flush')
44+
;
45+
46+
$extension = new FlushSpoolProducerExtension($producer);
47+
$extension->onBeforeReceive($this->createContextMock());
48+
}
49+
50+
public function testShouldDoNothingOnPreReceived()
51+
{
52+
$producer = $this->createSpoolProducerMock();
53+
$producer
54+
->expects(self::never())
55+
->method('flush')
56+
;
57+
58+
$extension = new FlushSpoolProducerExtension($producer);
59+
$extension->onPreReceived($this->createContextMock());
60+
}
61+
62+
public function testShouldDoNothingOnResult()
63+
{
64+
$producer = $this->createSpoolProducerMock();
65+
$producer
66+
->expects(self::never())
67+
->method('flush')
68+
;
69+
70+
$extension = new FlushSpoolProducerExtension($producer);
71+
$extension->onResult($this->createContextMock());
72+
}
73+
74+
public function testShouldDoNothingOnIdle()
75+
{
76+
$producer = $this->createSpoolProducerMock();
77+
$producer
78+
->expects(self::never())
79+
->method('flush')
80+
;
81+
82+
$extension = new FlushSpoolProducerExtension($producer);
83+
$extension->onIdle($this->createContextMock());
84+
}
85+
86+
public function testShouldFlushSpoolProducerOnInterrupted()
87+
{
88+
$producer = $this->createSpoolProducerMock();
89+
$producer
90+
->expects(self::once())
91+
->method('flush')
92+
;
93+
94+
$extension = new FlushSpoolProducerExtension($producer);
95+
$extension->onInterrupted($this->createContextMock());
96+
}
97+
98+
public function testShouldFlushSpoolProducerOnPostReceived()
99+
{
100+
$producer = $this->createSpoolProducerMock();
101+
$producer
102+
->expects(self::once())
103+
->method('flush')
104+
;
105+
106+
$extension = new FlushSpoolProducerExtension($producer);
107+
$extension->onPostReceived($this->createContextMock());
108+
}
109+
110+
/**
111+
* @return \PHPUnit_Framework_MockObject_MockObject|Context
112+
*/
113+
private function createContextMock()
114+
{
115+
return $this->createMock(Context::class);
116+
}
117+
118+
/**
119+
* @return \PHPUnit_Framework_MockObject_MockObject|SpoolProducer
120+
*/
121+
private function createSpoolProducerMock()
122+
{
123+
return $this->createMock(SpoolProducer::class);
124+
}
125+
}

0 commit comments

Comments
 (0)