Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update pubsub to use new client #98

Merged
merged 5 commits into from
Jul 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/config.php
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@
'subscriptions',
get('dapr.subscriptions')
),
Topic::class => autowire()->constructorParameter('logger', get('dapr.logger')),
Topic::class => autowire()
->constructorParameter('logger', get('dapr.logger'))
->constructorParameter('client', get(DaprClient::class)),
Tracing::class => autowire(),
Transaction::class => autowire(),
TransactionalState::class => autowire()->constructorParameter('logger', get('dapr.logger')),
Expand Down
116 changes: 55 additions & 61 deletions src/index.php
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
<?php

require_once __DIR__.'/../vendor/autoload.php';
require_once __DIR__.'/../tests/Fixtures/SimpleActor.php';
require_once __DIR__ . '/../vendor/autoload.php';
require_once __DIR__ . '/../tests/Fixtures/SimpleActor.php';

define('STORE', 'statestore');

use Dapr\Actors\ActorProxy;
use Dapr\Actors\ActorReference;
use Dapr\Actors\Generators\ProxyFactory;
use Dapr\Actors\IActor;
Expand All @@ -20,7 +19,6 @@
use Dapr\exceptions\StateAlreadyCommitted;
use Dapr\Formats;
use Dapr\PubSub\CloudEvent;
use Dapr\PubSub\Publish;
use Dapr\PubSub\Subscription;
use Dapr\PubSub\Topic;
use Dapr\State\Attributes\StateStore;
Expand Down Expand Up @@ -53,22 +51,22 @@ public function increment(int $amount = 1): void
new Subscription('pubsub', 'test', '/testsub'),
]
),
'dapr.actors' => [SimpleActor::class],
'dapr.actors' => [SimpleActor::class],
]
)
);

$app->get(
'/test/actors',
function (ProxyFactory $proxyFactory, DaprClient $client, LoggerInterface $logger) {
$id = uniqid(prefix: 'actor_');
$id = uniqid(prefix: 'actor_');
$reference = new ActorReference($id, 'SimpleActor');

/**
* @var ISimpleActor|IActor $actor
*/
$actor = $reference->bind(ISimpleActor::class, $proxyFactory);
$body = [];
$body = [];

$logger->critical('Created actor proxy');
$body = assert_equals($body, 0, $actor->get_count(), 'Empty actor should have no data');
Expand All @@ -78,7 +76,7 @@ function (ProxyFactory $proxyFactory, DaprClient $client, LoggerInterface $logge

// get the actor proxy again
$reference = ActorReference::get($actor);
$actor = $reference->bind(ISimpleActor::class, $proxyFactory);
$actor = $reference->bind(ISimpleActor::class, $proxyFactory);

$reminder = new Reminder(
name: 'increment',
Expand All @@ -89,7 +87,7 @@ function (ProxyFactory $proxyFactory, DaprClient $client, LoggerInterface $logge
$actor->create_reminder($reminder, $client);
$logger->critical('Created reminder');
sleep(2);
$body = assert_equals($body, 3, $actor->get_count(), 'Reminder should increment');
$body = assert_equals($body, 3, $actor->get_count(), 'Reminder should increment');
$read_reminder = $actor->get_reminder('increment', $client);
$logger->critical('Got reminder');
$body = assert_equals(
Expand Down Expand Up @@ -117,13 +115,13 @@ function (ProxyFactory $proxyFactory, DaprClient $client, LoggerInterface $logge
$actor->delete_timer('nope', $client);
$logger->critical('Cleaned up');

$object = new SimpleObject();
$object = new SimpleObject();
$object->bar = ['hello', 'world'];
$object->foo = "hello world";
$actor->set_object($object);
$saved_object = $actor->get_object();
$body = assert_equals($body, $object->bar, $saved_object->bar, "[object] saved array should match");
$body = assert_equals($body, $object->foo, $saved_object->foo, "[object] saved string should match");
$body = assert_equals($body, $object->bar, $saved_object->bar, "[object] saved array should match");
$body = assert_equals($body, $object->foo, $saved_object->foo, "[object] saved string should match");

$body = assert_equals($body, true, $actor->a_function(), 'actor can return a simple value');

Expand All @@ -134,7 +132,7 @@ function (ProxyFactory $proxyFactory, DaprClient $client, LoggerInterface $logge
$app->get(
'/test/state',
function (StateManager $stateManager) {
$body = [];
$body = [];
$state = new SimpleState();
$stateManager->save_object($state);
$body = assert_equals($body, null, $state->data, 'state is empty');
Expand All @@ -149,17 +147,17 @@ function (StateManager $stateManager) {
$body = assert_equals($body, 'data', $state->data, 'properly loaded saved state');

$prefix = uniqid();
$state = new SimpleState();
$state = new SimpleState();
$stateManager->load_object($state, $prefix);
$body = assert_not_equals($body, 'data', $state->data, 'prefix should work');

$random_key = uniqid();
$state = $stateManager->load_state('statestore', $random_key, 'hello');
$body = assert_equals($body, 'hello', $state->value, 'single key read with default');
$state = $stateManager->load_state('statestore', $random_key, 'hello');
$body = assert_equals($body, 'hello', $state->value, 'single key read with default');

$stateManager->save_state('statestore', $state);
$state2 = $stateManager->load_state('statestore', $random_key, 'world');
$body = assert_equals($body, 'hello', $state2->value, 'single key write');
$body = assert_equals($body, 'hello', $state2->value, 'single key write');

return $body;
}
Expand All @@ -168,19 +166,19 @@ function (StateManager $stateManager) {
$app->get(
'/test/state/concurrency',
function (StateManager $stateManager) {
$last = new #[StateStore(STORE, StrongLastWrite::class)] class extends SimpleState {
$last = new #[StateStore(STORE, StrongLastWrite::class)] class extends SimpleState {
};
$first = new #[StateStore(STORE, StrongFirstWrite::class)] class extends SimpleState {
};
$body = [];
$body = assert_equals($body, 0, $last->counter, 'initial value correct');
$body = [];
$body = assert_equals($body, 0, $last->counter, 'initial value correct');
$stateManager->save_object($last);
$stateManager->load_object($last);
$stateManager->load_object($first);
$body = assert_equals($body, 0, $last->counter, 'Starting from 0');

$first->counter = 1;
$last->counter = 2;
$last->counter = 2;
$stateManager->save_object($last);
$stateManager->load_object($last);
$body = assert_equals($body, 2, $last->counter, 'last-write update succeeds');
Expand All @@ -203,10 +201,10 @@ function (StateManager $stateManager, \DI\Container $container) {
$reset_state = $container->make(TState::class);
$stateManager->save_object($reset_state);
($transaction = $container->make(TState::class))->begin();
$body = [];
$body = assert_equals($body, 0, $transaction->counter, 'initial count = 0');
$body = [];
$body = assert_equals($body, 0, $transaction->counter, 'initial count = 0');
$transaction->counter += 1;
$body = assert_equals(
$body = assert_equals(
$body,
1,
$transaction->counter,
Expand Down Expand Up @@ -271,12 +269,8 @@ function () use ($one) {
$app->get(
'/test/pubsub',
function (FactoryInterface $container) {
$publisher = $container->make(Publish::class, ['pubsub' => 'pubsub']);
/**
* @var Topic $topic
*/
$topic = $publisher->topic(topic: 'test');
$body = [];
$topic = new Topic('pubsub', 'test', \Dapr\Client\DaprClient::clientBuilder()->build());
$body = [];

$topic->publish(['test_event']);
sleep(5);
Expand All @@ -298,19 +292,19 @@ function (FactoryInterface $container) {
);

$return = ['simple-test' => $body];
$body = [];
$body = [];

$event = new CloudEvent();
$event->id = "123";
$event->source = "http://example.com";
$event->type = "com.example.test";
$event = new CloudEvent();
$event->id = "123";
$event->source = "http://example.com";
$event->type = "com.example.test";
$event->data_content_type = 'application/json';
$event->subject = 'yolo';
$event->time = new DateTime();
$event->data = ['yolo'];
$event->subject = 'yolo';
$event->time = new DateTime();
$event->data = ['yolo'];
$topic->publish($event);
sleep(5);
$body = assert_equals(
$body = assert_equals(
$body,
true,
file_exists('/tmp/sub-received'),
Expand All @@ -319,20 +313,20 @@ function (FactoryInterface $container) {
$body["Received this raw data"] = json_decode($raw_event = file_get_contents('/tmp/sub-received'));
unlink('/tmp/sub-received');
//unset($event->time);
$event->topic = "test";
$event->pubsub_name = "pubsub";
$event->topic = "test";
$event->pubsub_name = "pubsub";
$body["Expecting this data"] = json_decode($event->to_json());
$received = CloudEvent::parse($raw_event);
$received = CloudEvent::parse($raw_event);
unset($received->trace_id);
$body['Received this decoded data'] = json_decode($received->to_json());
$body = assert_equals(
$body['Received this decoded data'] = json_decode($received->to_json());
$body = assert_equals(
$body,
$event->to_json(),
$received->to_json(),
'Event should be the same event we sent, minus the trace id.'
);
$return['Testing custom cloud event'] = $body;
$body = [];
$body = [];

$topic->publish(
json_decode(
Expand All @@ -353,7 +347,7 @@ function (FactoryInterface $container) {
)
);
sleep(2);
$body = assert_equals(
$body = assert_equals(
$body,
true,
file_exists('/tmp/sub-received'),
Expand Down Expand Up @@ -381,14 +375,14 @@ function (FactoryInterface $container) {
$app->get(
'/test/invoke',
function (DaprClient $client) {
$body = [];
$body = [];
$result = $client->post("/invoke/dev/method/say_something", "My Message");
$body = assert_equals($body, 200, $result->code, 'Should receive a 200 response');
$body = assert_equals($body, 200, $result->code, 'Should receive a 200 response');

$json = '{"ok": true}';

$result = $client->post('/invoke/dev/method/test_json', $json);
$body = assert_equals($body, 200, $result->code, 'Static function should receive json string');
$body = assert_equals($body, 200, $result->code, 'Static function should receive json string');

return $body;
}
Expand All @@ -408,8 +402,8 @@ function (#[FromBody] string $body) {
$app->get(
'/test/binding',
function () {
$body = [];
$cron_file = sys_get_temp_dir().'/cron';
$body = [];
$cron_file = sys_get_temp_dir() . '/cron';
//Binding::invoke_output('cron', 'delete');
$body = assert_equals($body, true, file_exists($cron_file), 'we should have received at least one cron');
// see https://github.com/dapr/components-contrib/issues/639
Expand All @@ -421,7 +415,7 @@ function () {
return $body;
}
);
$app->post('/cron', fn() => touch(sys_get_temp_dir().'/cron'));
$app->post('/cron', fn() => touch(sys_get_temp_dir() . '/cron'));
$app->post(
'/testsub',
function (
Expand All @@ -442,20 +436,20 @@ function (
'/do_tests',
function (DaprClient $client) {
$test_results = [
'/test/actors' => null,
'/test/binding' => null,
'/test/invoke' => null,
'/test/pubsub' => null,
'/test/actors' => null,
'/test/binding' => null,
'/test/invoke' => null,
'/test/pubsub' => null,
'/test/state/concurrency' => null,
'/test/state' => null,
'/test/state' => null,
];

foreach (array_keys($test_results) as $suite) {
$result = $client->get('/invoke/dev/method'.$suite);
$body = [];
$body = assert_equals($body, 200, $result->code, 'test completed successfully');
$result = $client->get('/invoke/dev/method' . $suite);
$body = [];
$body = assert_equals($body, 200, $result->code, 'test completed successfully');
$test_results[$suite] = [
'status' => $body,
'status' => $body,
'results' => $result->data,
];
}
Expand Down
14 changes: 10 additions & 4 deletions src/lib/Client/DaprClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Dapr\Serialization\SerializationConfig;
use GuzzleHttp\Promise\PromiseInterface;
use Psr\Http\Message\ResponseInterface;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;

/**
Expand All @@ -17,8 +18,11 @@
*/
abstract class DaprClient
{
public function __construct(public IDeserializer $deserializer, public ISerializer $serializer)
{
public function __construct(
public IDeserializer $deserializer,
public ISerializer $serializer,
public LoggerInterface $logger
) {
}

public static function clientBuilder(): DaprClientBuilder
Expand Down Expand Up @@ -83,7 +87,8 @@ abstract public function publishEvent(
string $pubsubName,
string $topicName,
mixed $data,
array $metadata = []
array $metadata = [],
string $contentType = 'application/json'
): void;

/**
Expand All @@ -98,7 +103,8 @@ abstract public function publishEventAsync(
string $pubsubName,
string $topicName,
mixed $data,
array $metadata = []
array $metadata = [],
string $contentType = 'application/json'
): PromiseInterface;

/**
Expand Down
3 changes: 2 additions & 1 deletion src/lib/Client/DaprClientBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ public function build(): DaprClient
return new DaprHttpClient(
$this->defaultHttpHost,
new Deserializer($this->deserializationConfig, $this->logger),
new Serializer($this->serializationConfig, $this->logger)
new Serializer($this->serializationConfig, $this->logger),
$this->logger
);
}
}
11 changes: 8 additions & 3 deletions src/lib/Client/DaprHttpClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use Dapr\Deserialization\IDeserializer;
use Dapr\Serialization\ISerializer;
use GuzzleHttp\Client;
use Psr\Log\LoggerInterface;

/**
* Class DaprHttpClient
Expand All @@ -20,9 +21,13 @@ class DaprHttpClient extends DaprClient

private Client $httpClient;

public function __construct(private string $baseHttpUri, IDeserializer $deserializer, ISerializer $serializer)
{
parent::__construct($deserializer, $serializer);
public function __construct(
private string $baseHttpUri,
IDeserializer $deserializer,
ISerializer $serializer,
LoggerInterface $logger
) {
parent::__construct($deserializer, $serializer, $logger);
if (str_ends_with($this->baseHttpUri, '/')) {
$this->baseHttpUri = rtrim($this->baseHttpUri, '/');
}
Expand Down
Loading