Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor actors to use new client #104

Merged
merged 6 commits into from
Jul 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/config.php
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@
->constructorParameter('drain_timeout', get('dapr.actors.drain_timeout'))
->constructorParameter('drain_enabled', get('dapr.actors.drain_enabled')),
ActorRuntime::class => autowire()
->constructorParameter('logger', get('dapr.logger'))
->constructorParameter('deserializer', get('dapr.internal.deserializer')),
->constructorParameter('client', get(\Dapr\Client\DaprClient::class)),
ActorState::class => autowire()->constructorParameter('logger', get('dapr.logger')),
ActorProxy::class => autowire()->constructorParameter('logger', get('dapr.logger')),
ApplicationJson::class => autowire(),
Expand Down Expand Up @@ -143,6 +142,7 @@
TransactionalState::class => autowire()->constructorParameter('logger', get('dapr.logger')),

// default application settings
\Dapr\Actors\Internal\Caches\CacheInterface::class => autowire(\Dapr\Actors\Internal\Caches\MemoryCache::class),
'dapr.pubsub.default' => 'pubsub',
'dapr.actors.proxy.generation' => ProxyFactory::GENERATED,
'dapr.subscriptions' => [],
Expand Down
14 changes: 7 additions & 7 deletions src/index.php
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,11 @@ function (ProxyFactory $proxyFactory, DaprClient $client, LoggerInterface $logge
data: ['amount' => 2],
period: new DateInterval('PT10M')
);
$actor->create_reminder($reminder, $client);
$actor->create_reminder($reminder);
$logger->critical('Created reminder');
sleep(2);
$body = assert_equals($body, 3, $actor->get_count(), 'Reminder should increment');
$read_reminder = $actor->get_reminder('increment', $client);
$read_reminder = $actor->get_reminder('increment');
$logger->critical('Got reminder');
$body = assert_equals(
$body,
Expand All @@ -106,15 +106,15 @@ function (ProxyFactory $proxyFactory, DaprClient $client, LoggerInterface $logge
callback: 'increment',
data: 2
);
$actor->create_timer($timer, $client);
$actor->create_timer($timer);
$logger->critical('Created timer');
sleep(2);
$body = assert_equals($body, 5, $actor->get_count(), 'Timer should increment');

$actor->delete_timer('increment', $client);
$actor->delete_reminder('increment', $client);
$actor->delete_reminder('nope', $client);
$actor->delete_timer('nope', $client);
$actor->delete_timer('increment');
$actor->delete_reminder('increment');
$actor->delete_reminder('nope');
$actor->delete_timer('nope');
$logger->critical('Cleaned up');

$object = new SimpleObject();
Expand Down
114 changes: 62 additions & 52 deletions src/lib/Actors/ActorRuntime.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,17 @@

namespace Dapr\Actors;

use Dapr\Actors\Internal\Caches\CacheInterface;
use Dapr\Actors\Internal\Caches\FileCache;
use Dapr\Actors\Internal\Caches\NoCache;
use Dapr\Deserialization\IDeserializer;
use Dapr\Client\DaprClient;
use Dapr\exceptions\DaprException;
use Dapr\exceptions\Http\NotFound;
use Dapr\exceptions\SaveStateFailure;
use DI\Container;
use DI\DependencyException;
use DI\FactoryInterface;
use DI\NotFoundException;
use Exception;
use Psr\Log\LoggerInterface;
use Psr\Container\ContainerInterface;
use ReflectionClass;
use ReflectionException;
use ReflectionNamedType;
Expand All @@ -28,11 +27,10 @@
class ActorRuntime
{
public function __construct(
protected LoggerInterface $logger,
protected ActorConfig $actor_config,
protected FactoryInterface $factory,
protected Container $container,
protected IDeserializer $deserializer,
protected ContainerInterface $container,
protected DaprClient $client
) {
}

Expand All @@ -49,7 +47,7 @@ public function __construct(
public function do_method(IActor $actor, string $method, mixed $arg): mixed
{
$reflection = new ReflectionClass($actor);
$method = $reflection->getMethod($method);
$method = $reflection->getMethod($method);
if (empty($arg)) {
return $method->invoke($actor);
}
Expand All @@ -61,18 +59,18 @@ public function do_method(IActor $actor, string $method, mixed $arg): mixed

return $method->invokeArgs(
$actor,
[$parameter->getName() => $this->deserializer->detect_from_parameter($parameter, $arg)]
[$parameter->getName() => $this->client->deserializer->detect_from_parameter($parameter, $arg)]
);
}

public function deactivate_actor(IActor $actor, string $dapr_type): void
{
$id = $actor->get_id();
$activation_tracker = hash('sha256', $dapr_type.$id);
$id = $actor->get_id();
$activation_tracker = hash('sha256', $dapr_type . $id);
$activation_tracker = rtrim(
sys_get_temp_dir(),
DIRECTORY_SEPARATOR
).DIRECTORY_SEPARATOR.'dapr_'.$activation_tracker;
sys_get_temp_dir(),
DIRECTORY_SEPARATOR
) . DIRECTORY_SEPARATOR . 'dapr_' . $activation_tracker;

$is_activated = file_exists($activation_tracker);

Expand All @@ -86,21 +84,20 @@ public function deactivate_actor(IActor $actor, string $dapr_type): void
/**
* Resolve an actor, then calls your callback with the actor before committing any state changes
*
* @param string $dapr_type The dapr type to resolve
* @param string $id The id of the actor to resolve
* @param ActorReference $reference
* @param callable $loan A callback that takes an IActor
*
* @return mixed The result of you callback
* @throws NotFound
* @throws SaveStateFailure
*/
public function resolve_actor(string $dapr_type, string $id, callable $loan): mixed
public function resolve_actor(ActorReference $reference, callable $loan): mixed
{
try {
$reflection = $this->locate_actor($dapr_type);
$reflection = $this->locate_actor($reference);
$this->validate_actor($reflection);
$states = $this->get_states($reflection, $dapr_type, $id);
$actor = $this->get_actor($reflection, $dapr_type, $id, $states);
$states = $this->get_states($reflection, $reference, $this->client);
$actor = $this->get_actor($reflection, $reference, $states);
// @codeCoverageIgnoreStart
} catch (Exception $exception) {
throw new NotFound('Actor could not be located', previous: $exception);
Expand All @@ -123,18 +120,17 @@ public function resolve_actor(string $dapr_type, string $id, callable $loan): mi
/**
* Locates an actor implementation
*
* @param string $dapr_type The dapr type to locate
*
* @param ActorReference $reference The actor reference
* @return ReflectionClass
* @throws NotFound
* @throws ReflectionException
*/
protected function locate_actor(string $dapr_type): ReflectionClass
protected function locate_actor(ActorReference $reference): ReflectionClass
{
$type = $this->actor_config->get_actor_type_from_dapr_type($dapr_type);
if ( ! class_exists($type)) {
$type = $this->actor_config->get_actor_type_from_dapr_type($reference->get_actor_type());
if (!class_exists($type)) {
// @codeCoverageIgnoreStart
$this->logger->critical('Unable to locate an actor for {t}', ['t' => $type]);
$this->client->logger->critical('Unable to locate an actor for {t}', ['t' => $type]);
throw new NotFound();
// @codeCoverageIgnoreEnd
}
Expand All @@ -156,7 +152,7 @@ protected function validate_actor(ReflectionClass $reflection): bool
}

// @codeCoverageIgnoreStart
$this->logger->critical('Actor does not implement the IActor interface');
$this->client->logger->critical('Actor does not implement the IActor interface');
throw new NotFound();
// @codeCoverageIgnoreEnd
}
Expand All @@ -165,18 +161,19 @@ protected function validate_actor(ReflectionClass $reflection): bool
* Retrieves an array of states for a located actor
*
* @param ReflectionClass $reflection The class we're loading states for
* @param string $dapr_type The dapr type
* @param string $id The id
*
* @param ActorReference $reference
* @return array
* @throws ReflectionException
* @throws DependencyException
* @throws NotFoundException
* @throws ReflectionException
*/
protected function get_states(ReflectionClass $reflection, string $dapr_type, string $id): array
{
protected function get_states(
ReflectionClass $reflection,
ActorReference $reference,
DaprClient $client
): array {
$constructor = $reflection->getMethod('__construct');
$states = [];
$states = [];
foreach ($constructor->getParameters() as $parameter) {
$type = $parameter->getType();
if ($type instanceof ReflectionNamedType) {
Expand All @@ -185,10 +182,16 @@ protected function get_states(ReflectionClass $reflection, string $dapr_type, st
$reflected_type = new ReflectionClass($type_name);
if ($reflected_type->isSubclassOf(ActorState::class)) {
$state = $this->container->make($type_name);
$this->begin_transaction($state, $reflected_type, $dapr_type, $id);
$this->begin_transaction(
$state,
$reflected_type,
$reference,
$client,
$this->get_cache_for_actor($reference, $type_name)
);

$states[$parameter->name] = $state;
$this->logger?->debug('Found state {t}', ['t' => $type_name]);
$this->client->logger?->debug('Found state {t}', ['t' => $type_name]);
}
}
}
Expand All @@ -211,24 +214,31 @@ protected function get_states(ReflectionClass $reflection, string $dapr_type, st
protected function begin_transaction(
ActorState $state,
ReflectionClass $reflected_type,
string $dapr_type,
string $actor_id,
ActorReference $reference,
DaprClient $client,
CacheInterface $cache,
?ReflectionClass $original = null
): void {
if ($reflected_type->name !== ActorState::class) {
$this->begin_transaction(
$state,
$reflected_type->getParentClass(),
$dapr_type,
$actor_id,
$reference,
$client,
$cache,
$original ?? $reflected_type
);

return;
}
$begin_transaction = $reflected_type->getMethod('begin_transaction');
$begin_transaction->setAccessible(true);
$begin_transaction->invoke($state, $dapr_type, $actor_id);
$begin_transaction->invoke($state, $reference, $client, $cache);
}

protected function get_cache_for_actor(ActorReference $reference, string $type_name): CacheInterface
{
return $this->factory->make(CacheInterface::class, ['reference' => $reference, 'state_name' => $type_name]);
}

/**
Expand All @@ -243,23 +253,23 @@ protected function begin_transaction(
* @throws DependencyException
* @throws NotFoundException
*/
protected function get_actor(ReflectionClass $reflection, string $dapr_type, string $id, array $states): IActor
protected function get_actor(ReflectionClass $reflection, ActorReference $reference, array $states): IActor
{
$states['id'] = $id;
$this->container->set(ActorReference::class, new ActorReference($id, $dapr_type));
$actor = $this->factory->make($reflection->getName(), $states);
$activation_tracker = hash('sha256', $dapr_type.$id);
$states['id'] = $reference->get_actor_id();
$this->container->set(ActorReference::class, $reference);
$actor = $this->factory->make($reflection->getName(), $states);
$activation_tracker = hash('sha256', $reference->get_actor_type() . $reference->get_actor_id());
$activation_tracker = rtrim(
sys_get_temp_dir(),
DIRECTORY_SEPARATOR
).DIRECTORY_SEPARATOR.'dapr_'.$activation_tracker;
sys_get_temp_dir(),
DIRECTORY_SEPARATOR
) . DIRECTORY_SEPARATOR . 'dapr_' . $activation_tracker;

$is_activated = file_exists($activation_tracker);

if ( ! $is_activated) {
$this->logger?->info(
if (!$is_activated) {
$this->client->logger?->info(
'Activating {type}||{id}',
['type' => $dapr_type, 'id' => $id]
['type' => $reference->get_actor_type(), 'id' => $reference->get_actor_id()]
);
touch($activation_tracker);
$actor->on_activation();
Expand Down
Loading