Skip to content

Commit 01787a0

Browse files
authoredJan 9, 2019
Merge pull request #716 from uro/feature/174-monitoring-datadog-support
[monitoring] Add support of Datadog
2 parents 8b114cb + da5dd53 commit 01787a0

11 files changed

+261
-16
lines changed
 

‎composer.json

+2-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@
3333
"predis/predis": "^1.1",
3434
"thruway/pawl-transport": "^0.5.0",
3535
"voryx/thruway": "^0.5.3",
36-
"influxdb/influxdb-php": "^1.14"
36+
"influxdb/influxdb-php": "^1.14",
37+
"datadog/php-datadogstatsd": "^1.3"
3738
},
3839
"require-dev": {
3940
"phpunit/phpunit": "^5.5",

‎docs/images/datadog_monitoring.png

320 KB
Loading

‎docs/monitoring.md

+51-1
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@ With it, you can control how many messages were sent, how many processed success
1414
How many consumers are working, their up time, processed messages stats, memory usage and system load.
1515
The tool could be integrated with virtually any analytics and monitoring platform.
1616
There are several integration:
17+
* [Datadog StatsD](https://datadoghq.com)
1718
* [InfluxDB](https://www.influxdata.com/) and [Grafana](https://grafana.com/)
1819
* [WAMP (Web Application Messaging Protocol)](https://wamp-proto.org/)
19-
2020
We are working on a JS\WAMP based real-time UI tool, for more information please [contact us](opensource@forma-pro.com).
2121

2222
![Grafana Monitoring](images/grafana_monitoring.jpg)
@@ -30,6 +30,7 @@ We are working on a JS\WAMP based real-time UI tool, for more information please
3030
* [Consumption extension](#consumption-extension)
3131
* [Enqueue Client Extension](#enqueue-client-extension)
3232
* [InfluxDB Storage](#influxdb-storage)
33+
* [Datadog Storage](#datadog-storage)
3334
* [WAMP (Web Socket Messaging Protocol) Storage](#wamp-(web-socket-messaging-protocol)-storage)
3435
* [Symfony App](#symfony-app)
3536

@@ -237,6 +238,50 @@ There are available options:
237238
* 'measurementConsumers' => 'consumers',
238239
```
239240

241+
## Datadog storage
242+
243+
Install additional packages:
244+
245+
```
246+
composer req datadog/php-datadogstatsd:^1.3
247+
```
248+
249+
```php
250+
<?php
251+
use Enqueue\Monitoring\GenericStatsStorageFactory;
252+
253+
$statsStorage = (new GenericStatsStorageFactory())->create('datadog://127.0.0.1:8125');
254+
```
255+
256+
For best experience please adjust units and types in metric summary.
257+
258+
Example dashboard:
259+
260+
![Datadog monitoring](images/datadog_monitoring.png)
261+
262+
263+
There are available options (and all available metrics):
264+
265+
```
266+
* 'host' => '127.0.0.1',
267+
* 'port' => '8125',
268+
* 'batched' => true, // performance boost
269+
* 'global_tags' => '', // should contain keys and values
270+
* 'metric.messages.sent' => 'enqueue.messages.sent',
271+
* 'metric.messages.consumed' => 'enqueue.messages.consumed',
272+
* 'metric.messages.redelivered' => 'enqueue.messages.redelivered',
273+
* 'metric.messages.failed' => 'enqueue.messages.failed',
274+
* 'metric.consumers.started' => 'enqueue.consumers.started',
275+
* 'metric.consumers.finished' => 'enqueue.consumers.finished',
276+
* 'metric.consumers.failed' => 'enqueue.consumers.failed',
277+
* 'metric.consumers.received' => 'enqueue.consumers.received',
278+
* 'metric.consumers.acknowledged' => 'enqueue.consumers.acknowledged',
279+
* 'metric.consumers.rejected' => 'enqueue.consumers.rejected',
280+
* 'metric.consumers.requeued' => 'enqueue.consumers.requeued',
281+
* 'metric.consumers.memoryUsage' => 'enqueue.consumers.memoryUsage',
282+
```
283+
284+
240285
## WAMP (Web Socket Messaging Protocol) Storage
241286

242287
Install additional packages:
@@ -280,6 +325,11 @@ enqueue:
280325
transport: 'amqp://guest:guest@foo:5672/%2f'
281326
monitoring: 'wamp://127.0.0.1:9090?topic=stats'
282327
client: ~
328+
329+
datadog:
330+
transport: 'amqp://guest:guest@foo:5672/%2f'
331+
monitoring: 'datadog://127.0.0.1:8125?batched=false'
332+
client: ~
283333
```
284334
285335
[back to index](index.md)

‎phpunit.xml.dist

+4
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,10 @@
108108
<testsuite name="wamp transport">
109109
<directory>pkg/wamp/Tests</directory>
110110
</testsuite>
111+
112+
<testsuite name="monitoring">
113+
<directory>pkg/monitoring/Tests</directory>
114+
</testsuite>
111115
</testsuites>
112116

113117
<php>

‎pkg/monitoring/DatadogStorage.php

+173
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Enqueue\Monitoring;
6+
7+
use DataDog\BatchedDogStatsd;
8+
use DataDog\DogStatsd;
9+
use Enqueue\Client\Config;
10+
use Enqueue\Dsn\Dsn;
11+
12+
class DatadogStorage implements StatsStorage
13+
{
14+
/**
15+
* @var array
16+
*/
17+
private $config;
18+
19+
/**
20+
* @var BatchedDogStatsd
21+
*/
22+
private $datadog;
23+
24+
public function __construct($config = 'datadog:')
25+
{
26+
if (false === class_exists(DogStatsd::class)) {
27+
throw new \LogicException('Seems client library is not installed. Please install "datadog/php-datadogstatsd"');
28+
}
29+
30+
$this->config = $this->prepareConfig($config);
31+
32+
if (null === $this->datadog) {
33+
if (true === filter_var($this->config['batched'], FILTER_VALIDATE_BOOLEAN)) {
34+
$this->datadog = new BatchedDogStatsd($this->config);
35+
} else {
36+
$this->datadog = new DogStatsd($this->config);
37+
}
38+
}
39+
}
40+
41+
public function pushConsumerStats(ConsumerStats $stats): void
42+
{
43+
$queues = $stats->getQueues();
44+
array_walk($queues, function (string $queue) use ($stats) {
45+
$tags = [
46+
'queue' => $queue,
47+
'consumerId' => $stats->getConsumerId(),
48+
];
49+
50+
if ($stats->getFinishedAtMs()) {
51+
$values['finishedAtMs'] = $stats->getFinishedAtMs();
52+
}
53+
54+
$this->datadog->gauge($this->config['metric.consumers.started'], (int) $stats->isStarted(), 1, $tags);
55+
$this->datadog->gauge($this->config['metric.consumers.finished'], (int) $stats->isFinished(), 1, $tags);
56+
$this->datadog->gauge($this->config['metric.consumers.failed'], (int) $stats->isFailed(), 1, $tags);
57+
$this->datadog->gauge($this->config['metric.consumers.received'], $stats->getReceived(), 1, $tags);
58+
$this->datadog->gauge($this->config['metric.consumers.acknowledged'], $stats->getAcknowledged(), 1, $tags);
59+
$this->datadog->gauge($this->config['metric.consumers.rejected'], $stats->getRejected(), 1, $tags);
60+
$this->datadog->gauge($this->config['metric.consumers.requeued'], $stats->getRejected(), 1, $tags);
61+
$this->datadog->gauge($this->config['metric.consumers.memoryUsage'], $stats->getMemoryUsage(), 1, $tags);
62+
});
63+
}
64+
65+
public function pushSentMessageStats(SentMessageStats $stats): void
66+
{
67+
$tags = [
68+
'destination' => $stats->getDestination(),
69+
];
70+
71+
$properties = $stats->getProperties();
72+
if (false === empty($properties[Config::TOPIC])) {
73+
$tags['topic'] = $properties[Config::TOPIC];
74+
}
75+
76+
if (false === empty($properties[Config::COMMAND])) {
77+
$tags['command'] = $properties[Config::COMMAND];
78+
}
79+
80+
$this->datadog->increment($this->config['metric.messages.sent'], 1, $tags);
81+
}
82+
83+
public function pushConsumedMessageStats(ConsumedMessageStats $stats): void
84+
{
85+
$tags = [
86+
'queue' => $stats->getQueue(),
87+
'status' => $stats->getStatus(),
88+
];
89+
90+
if (ConsumedMessageStats::STATUS_FAILED === $stats->getStatus()) {
91+
$this->datadog->increment($this->config['metric.messages.failed'], 1, $tags);
92+
}
93+
94+
if ($stats->isRedelivered()) {
95+
$this->datadog->increment($this->config['metric.messages.redelivered'], 1, $tags);
96+
}
97+
98+
$runtime = $stats->getTimestampMs() - $stats->getReceivedAtMs();
99+
$this->datadog->histogram($this->config['metric.messages.consumed'], $runtime, 1, $tags);
100+
}
101+
102+
private function parseDsn(string $dsn): array
103+
{
104+
$dsn = Dsn::parseFirst($dsn);
105+
106+
if ('datadog' !== $dsn->getSchemeProtocol()) {
107+
throw new \LogicException(sprintf(
108+
'The given scheme protocol "%s" is not supported. It must be "datadog"',
109+
$dsn->getSchemeProtocol()
110+
));
111+
}
112+
113+
return array_filter(array_replace($dsn->getQuery(), [
114+
'host' => $dsn->getHost(),
115+
'port' => $dsn->getPort(),
116+
'global_tags' => $dsn->getString('global_tags'),
117+
'batched' => $dsn->getString('batched'),
118+
'metric.messages.sent' => $dsn->getString('metric.messages.sent'),
119+
'metric.messages.consumed' => $dsn->getString('metric.messages.consumed'),
120+
'metric.messages.redelivered' => $dsn->getString('metric.messages.redelivered'),
121+
'metric.messages.failed' => $dsn->getString('metric.messages.failed'),
122+
'metric.consumers.started' => $dsn->getString('metric.consumers.started'),
123+
'metric.consumers.finished' => $dsn->getString('metric.consumers.finished'),
124+
'metric.consumers.failed' => $dsn->getString('metric.consumers.failed'),
125+
'metric.consumers.received' => $dsn->getString('metric.consumers.received'),
126+
'metric.consumers.acknowledged' => $dsn->getString('metric.consumers.acknowledged'),
127+
'metric.consumers.rejected' => $dsn->getString('metric.consumers.rejected'),
128+
'metric.consumers.requeued' => $dsn->getString('metric.consumers.requeued'),
129+
'metric.consumers.memoryUsage' => $dsn->getString('metric.consumers.memoryUsage'),
130+
]), function ($value) {
131+
return null !== $value;
132+
});
133+
}
134+
135+
/**
136+
* @param $config
137+
*
138+
* @return array
139+
*/
140+
private function prepareConfig($config): array
141+
{
142+
if (empty($config)) {
143+
$config = $this->parseDsn('datadog:');
144+
} elseif (\is_string($config)) {
145+
$config = $this->parseDsn($config);
146+
} elseif (\is_array($config)) {
147+
$config = empty($config['dsn']) ? $config : $this->parseDsn($config['dsn']);
148+
} elseif ($config instanceof DogStatsd) {
149+
$this->datadog = $config;
150+
$config = [];
151+
} else {
152+
throw new \LogicException('The config must be either an array of options, a DSN string or null');
153+
}
154+
155+
return array_replace([
156+
'host' => 'localhost',
157+
'port' => 8125,
158+
'batched' => true,
159+
'metric.messages.sent' => 'enqueue.messages.sent',
160+
'metric.messages.consumed' => 'enqueue.messages.consumed',
161+
'metric.messages.redelivered' => 'enqueue.messages.redelivered',
162+
'metric.messages.failed' => 'enqueue.messages.failed',
163+
'metric.consumers.started' => 'enqueue.consumers.started',
164+
'metric.consumers.finished' => 'enqueue.consumers.finished',
165+
'metric.consumers.failed' => 'enqueue.consumers.failed',
166+
'metric.consumers.received' => 'enqueue.consumers.received',
167+
'metric.consumers.acknowledged' => 'enqueue.consumers.acknowledged',
168+
'metric.consumers.rejected' => 'enqueue.consumers.rejected',
169+
'metric.consumers.requeued' => 'enqueue.consumers.requeued',
170+
'metric.consumers.memoryUsage' => 'enqueue.consumers.memoryUsage',
171+
], $config);
172+
}
173+
}

‎pkg/monitoring/GenericStatsStorageFactory.php

+6-6
Original file line numberDiff line numberDiff line change
@@ -10,22 +10,22 @@ class GenericStatsStorageFactory implements StatsStorageFactory
1010
{
1111
public function create($config): StatsStorage
1212
{
13-
if (is_string($config)) {
13+
if (\is_string($config)) {
1414
$config = ['dsn' => $config];
1515
}
1616

17-
if (false == is_array($config)) {
17+
if (false === \is_array($config)) {
1818
throw new \InvalidArgumentException('The config must be either array or DSN string.');
1919
}
2020

21-
if (false == array_key_exists('dsn', $config)) {
21+
if (false === array_key_exists('dsn', $config)) {
2222
throw new \InvalidArgumentException('The config must have dsn key set.');
2323
}
2424

2525
$dsn = Dsn::parseFirst($config['dsn']);
2626

2727
if ($storageClass = $this->findStorageClass($dsn, Resources::getKnownStorages())) {
28-
return new $storageClass(1 === count($config) ? $config['dsn'] : $config);
28+
return new $storageClass(1 === \count($config) ? $config['dsn'] : $config);
2929
}
3030

3131
throw new \LogicException(sprintf('A given scheme "%s" is not supported.', $dsn->getScheme()));
@@ -41,7 +41,7 @@ private function findStorageClass(Dsn $dsn, array $factories): ?string
4141
continue;
4242
}
4343

44-
if (false == in_array($protocol, $info['schemes'], true)) {
44+
if (false === \in_array($protocol, $info['schemes'], true)) {
4545
continue;
4646
}
4747

@@ -53,7 +53,7 @@ private function findStorageClass(Dsn $dsn, array $factories): ?string
5353
}
5454

5555
foreach ($factories as $storageClass => $info) {
56-
if (false == in_array($protocol, $info['schemes'], true)) {
56+
if (false === \in_array($protocol, $info['schemes'], true)) {
5757
continue;
5858
}
5959

‎pkg/monitoring/README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ Queue Monitoring tool. Track sent, consumed messages. Consumers performances.
1414
* Could be used with any message queue library.
1515
* Could be intergrated to any PHP framework
1616
* Could send stats to any analytical platform
17-
* Supports Grafana and WAMP out of the box.
17+
* Supports Datadog, InfluxDb, Grafana and WAMP out of the box.
1818
* Provides integration for Enqueue
1919

2020
[![Gitter](https://badges.gitter.im/php-enqueue/Lobby.svg)](https://gitter.im/php-enqueue/Lobby)

‎pkg/monitoring/Resources.php

+5
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,11 @@ public static function getKnownStorages(): array
4242
'supportedSchemeExtensions' => [],
4343
];
4444

45+
$map[DatadogStorage::class] = [
46+
'schemes' => ['datadog'],
47+
'supportedSchemeExtensions' => [],
48+
];
49+
4550
self::$knownStorages = $map;
4651
}
4752

‎pkg/monitoring/Symfony/DependencyInjection/MonitoringFactory.php

+2-2
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ public static function getConfiguration(string $name = 'monitoring'): ArrayNodeD
4242
->info(sprintf('The "%s" option could accept a string DSN, an array with DSN key, or null. It accept extra options. To find out what option you can set, look at stats storage constructor doc block.', $name))
4343
->beforeNormalization()
4444
->always(function ($v) {
45-
if (is_array($v)) {
46-
if (isset($v['storage_factory_class']) && isset($v['storage_factory_service'])) {
45+
if (\is_array($v)) {
46+
if (isset($v['storage_factory_class'], $v['storage_factory_service'])) {
4747
throw new \LogicException('Both options storage_factory_class and storage_factory_service are set. Please choose one.');
4848
}
4949

0 commit comments

Comments
 (0)
Please sign in to comment.