Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can't stop snapshotter when it's running #26

Closed
basz opened this issue May 31, 2018 · 2 comments
Closed

Can't stop snapshotter when it's running #26

basz opened this issue May 31, 2018 · 2 comments

Comments

@basz
Copy link
Member

basz commented May 31, 2018

I'm running snapshot with the following script

<?php

declare(strict_types=1);

namespace HF\Api\Tools\Console\Command\EventStore;

use Ds\Set;
use HF\Api\Infrastructure\Repository\ARStreamDomainName;
use HF\Api\Infrastructure\Repository\ARStreamName;
use HF\Api\Tools\Console\Command\AbstractCommand;
use Prooph\EventSourcing\EventStoreIntegration\AggregateTranslator;
use Prooph\EventStore\EventStore;
use Prooph\EventStore\Pdo\Projection\PdoEventStoreProjector;
use Prooph\EventStore\Projection\ProjectionManager;
use Prooph\SnapshotStore\SnapshotStore;
use Prooph\Snapshotter\SnapshotReadModel;
use Prooph\Snapshotter\StreamSnapshotProjection;
use Psr\Container\ContainerInterface;
use Symfony\Component\Console\Input\InputDefinition;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;

class RunSnapshotterCommand extends AbstractCommand
{
    /**
     * @var ContainerInterface
     */
    private $container;

    /**
     * @var SnapshotStore
     */
    private $snapshotStore;

    /**
     * @var EventStore
     */
    private $eventStore;

    /**
     * @var ProjectionManager
     */
    private $projectionManager;

    /**
     * @var array
     */
    private $arRepositoryConfig;

    private $stopping = false;

    public function __construct(
        ContainerInterface $container,
        SnapshotStore $snapshotStore,
        EventStore $eventStore,
        ProjectionManager $projectionManager,
        array $arRepositoryConfig
    ) {
        $this->container = $container;
        $this->snapshotStore = $snapshotStore;
        $this->eventStore = $eventStore;
        $this->projectionManager = $projectionManager;
        $this->arRepositoryConfig = $arRepositoryConfig;

        parent::__construct();
    }

    protected function configure()
    {
        $this
            ->setName('event-store:run-snapshotter')
            ->setDescription('Runs a long living process to create snapshots of aggregate roots')
            ->setDefinition(
                new InputDefinition([
                    new InputOption('domain', 'd', InputOption::VALUE_OPTIONAL, 'Stream from a particalar domain'),
                    new InputOption('streamName', 's', InputOption::VALUE_OPTIONAL, 'Name of stream'),
                    new InputOption('oneRunner', 'o', InputOption::VALUE_OPTIONAL, 'use a single runner', true),
                ])
            );
    }

    protected function execute(InputInterface $input, OutputInterface $output)
    {
        $options = $input->getArguments();

        $streamName = $input->getOption('streamName', false);
        $domain = $input->getOption('domain', false);
        $oneRunner = $input->getOption('oneRunner');

        $streamNames = new Set();

        if ($streamName) {
            try {
                $streamName = ARStreamName::byName(strtoupper($streamName));
                $streamNames->add($streamName);

                $streamNames = iterator_to_array(new \RecursiveIteratorIterator(new \RecursiveArrayIterator($domain->getValue())), false);
            } catch (\InvalidArgumentException $e) {
                $this->output->writeln(sprintf(
                    'Unknown stream name <error>`%s`</error>. Try one of <info>%s</info>',
                    $streamName,
                    implode(', ', array_map('strtolower', ARStreamName::getNames()))
                ));

                return self::EXIT_PROBLEM;
            }
        }

        if ($domain) {
            try {
                $domain = ARStreamDomainName::byName(strtoupper($domain));

                foreach (iterator_to_array(new \RecursiveIteratorIterator(new \RecursiveArrayIterator($domain->getValue())), false) as $domainName) {
                    $streamNames->add(ARStreamName::byName(strtoupper($domainName)));
                }
            } catch (\InvalidArgumentException $e) {
                $this->output->writeln(sprintf(
                    'Unknown domain <error>`%s`</error>. Try one of <info>%s</info>',
                    $domain,
                    implode(', ', array_map('strtolower', ARStreamDomainName::getNames()))
                ));

                return self::EXIT_PROBLEM;
            }
        }

        if (! $streamNames->count()) {
            $this->output->writeln('<error>No streams where selected!</error>');

            return self::EXIT_CANCELED;
        }

        // not for fs
        $streamNames->remove(ARStreamName::FILESYSTEM_FILE());

        $this->output->writeln(sprintf(
            'Starting snapshotter for <info>`%s`</info>',
            implode(', ', array_map('strtolower', $streamNames->toArray()))
        ));

        if (function_exists('pcntl_signal')) {
            pcntl_signal(SIGTERM, [$this, 'signalHandler']);
            pcntl_signal(SIGHUP, [$this, 'signalHandler']);
            pcntl_signal(SIGINT, [$this, 'signalHandler']);
        }

        $stopRunning = 0;
        while (! $stopRunning) {
            foreach ($streamNames as $streamName) {
                $stopRunning = $this->invokeProjection((string) $streamName);

                if ($stopRunning || $this->stopping) {
                    break;
                }
            }
            if (! $stopRunning && ! $this->stopping) {
                sleep(5);
            }

            if ($this->stopping) {
                break;
            }
        }

        $this->output->writeln(sprintf('Stopped...'));

        return self::EXIT_OK;
    }

    public function invokeProjection(string $streamName): int
    {
        $config = null;

        foreach ($this->arRepositoryConfig as $key => $config) {
            if (! isset($config['stream_name']) || $config['stream_name'] === $streamName) {
                break;
            }

            $config = null;
        }

        if (null === $config) {
            $this->output->writeln(sprintf('<error>Configuration problem for stream \'%s\'</error>', $streamName));

            return self::EXIT_PROBLEM;
        }

        $repositoryClass = class_implements($config['repository_class']);
        $repositoryClass = array_pop($repositoryClass);

        $snapshotReadModel = new SnapshotReadModel(
            $this->container->get($repositoryClass),
            new AggregateTranslator(),
            $this->snapshotStore,
            [$config['aggregate_type']]
        );

        $this->runningProjection = $readModelProjection = $this->projectionManager->createReadModelProjection(
            sprintf('snapshot:%s', $streamName),
            $snapshotReadModel,
            [
                PdoEventStoreProjector::OPTION_CACHE_SIZE => 1000, /* caches stream names from event store */
                PdoEventStoreProjector::OPTION_PERSIST_BLOCK_SIZE => 1000, /* size of handled events before persisting */
                PdoEventStoreProjector::OPTION_SLEEP => 0,
                PdoEventStoreProjector::OPTION_PCNTL_DISPATCH => function_exists('pcntl_signal'),
            ]
        );

        $projection = new StreamSnapshotProjection(
            $readModelProjection,
            $streamName
        );

        $projection(false);

        return self::EXIT_OK;
    }

    public function signalHandler(int $signal): void
    {
        $this->output->writeln(sprintf('Signal %s recieved, stopping...', $signal));

        $this->runningProjection->stop();

        $this->stopping = true;
    }
}

When the projection is running I cannot stop it with cntrl-c. Not even after the a persist block size is done. When it's idle (notice I'm running with $projection(false);) my stop handler kicks in.

Shouldn't PdoEventStoreProjector::OPTION_PCNTL_DISPATCH also be emitted...

@prolic
Copy link
Member

prolic commented Jun 1, 2018

Which version of pdo-event-store are you running?

@basz
Copy link
Member Author

basz commented Jun 1, 2018

Nvm 😬

@basz basz closed this as completed Jun 1, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants