From c85c351b580bfb0c1833f66bb178c20caf381be4 Mon Sep 17 00:00:00 2001 From: Marc Neudert Date: Tue, 20 Aug 2024 20:59:42 +0200 Subject: [PATCH 01/10] Allow signal handling in console commands --- core/Plugin/ConsoleCommand.php | 40 +++++++++++++++++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/core/Plugin/ConsoleCommand.php b/core/Plugin/ConsoleCommand.php index a98d52f9973..74cc4ce1424 100644 --- a/core/Plugin/ConsoleCommand.php +++ b/core/Plugin/ConsoleCommand.php @@ -10,6 +10,7 @@ namespace Piwik\Plugin; use Symfony\Component\Console\Command\Command as SymfonyCommand; +use Symfony\Component\Console\Command\SignalableCommandInterface; use Symfony\Component\Console\Exception\LogicException; use Symfony\Component\Console\Helper\ProgressBar; use Symfony\Component\Console\Helper\QuestionHelper; @@ -28,7 +29,7 @@ * * @api */ -class ConsoleCommand extends SymfonyCommand +class ConsoleCommand extends SymfonyCommand implements SignalableCommandInterface { /** * @var ProgressBar|null @@ -153,6 +154,43 @@ final public function run(InputInterface $input, OutputInterface $output): int return parent::run($input, $output); } + /** + * Method is final to make it impossible to overwrite it in plugin commands + * use getSystemSignalsToHandle() instead. + * + * @return array + */ + final public function getSubscribedSignals(): array + { + return $this->getSystemSignalsToHandle(); + } + + /** + * Method is final to make it impossible to overwrite it in plugin commands + * use handleSystemSignal() instead. + */ + final public function handleSignal(int $signal): void + { + $this->handleSystemSignal($signal); + } + + /** + * Returns the list of system signals to subscribe. + * + * @return array + */ + public function getSystemSignalsToHandle(): array + { + return []; + } + + /** + * The method will be called when the application is signaled. + */ + public function handleSystemSignal(int $signal): void + { + } + /** * Adds a negatable option (e.g. --ansi / --no-ansi) * From ab978b32444de140d50ec4db902bc770e35ba0a8 Mon Sep 17 00:00:00 2001 From: Marc Neudert Date: Tue, 20 Aug 2024 21:18:01 +0200 Subject: [PATCH 02/10] Stop archiving after receiving SIGINT/SIGTERM --- core/CliMulti.php | 32 ++++++- core/CronArchive.php | 96 +++++++++++++++++-- core/Scheduler/Scheduler.php | 15 +++ plugins/CoreConsole/Commands/CoreArchiver.php | 24 ++++- 4 files changed, 156 insertions(+), 11 deletions(-) diff --git a/core/CliMulti.php b/core/CliMulti.php index 67c3c02449f..187b56076be 100644 --- a/core/CliMulti.php +++ b/core/CliMulti.php @@ -95,6 +95,11 @@ class CliMulti */ private $logger; + /** + * @var int|null + */ + private $signal = null; + public function __construct(?LoggerInterface $logger = null) { $this->supportsAsync = $this->supportsAsync(); @@ -103,6 +108,11 @@ public function __construct(?LoggerInterface $logger = null) $this->logger = $logger ?: new NullLogger(); } + public function handleSignal(int $signal): void + { + $this->signal = $signal; + } + /** * It will request all given URLs in parallel (async) using the CLI and wait until all requests are finished. * If multi cli is not supported (eg windows) it will initiate an HTTP request instead (not async). @@ -124,13 +134,21 @@ public function request(array $piwikUrls) } } - $chunks = array($piwikUrls); + $chunks = [$piwikUrls]; + if ($this->concurrentProcessesLimit) { $chunks = array_chunk($piwikUrls, $this->concurrentProcessesLimit); } - $results = array(); + $results = []; + foreach ($chunks as $urlsChunk) { + if (null !== $this->signal) { + $this->logSkippedRequests($urlsChunk); + + continue; + } + $results = array_merge($results, $this->requestUrls($urlsChunk)); } @@ -616,6 +634,16 @@ private function requestUrls(array $piwikUrls) return $results; } + private function logSkippedRequests(array $urls): void + { + foreach ($urls as $url) { + $this->logger->debug( + 'Skipped climulti:request after abort signal received: {url}', + ['url' => $url] + ); + } + } + private static function getSuperUserTokenAuth() { return Piwik::requestTemporarySystemAuthToken('CliMultiNonAsyncArchive', 36); diff --git a/core/CronArchive.php b/core/CronArchive.php index 64820e31137..20113176f90 100644 --- a/core/CronArchive.php +++ b/core/CronArchive.php @@ -36,6 +36,7 @@ use Piwik\Plugins\UsersManager\API as APIUsersManager; use Piwik\Plugins\UsersManager\UserPreferences; use Piwik\Log\LoggerInterface; +use Piwik\Scheduler\Scheduler; /** * ./console core:archive runs as a cron and is a useful tool for general maintenance, @@ -230,6 +231,28 @@ class CronArchive */ private $supportsAsync; + /** + * @var null|int + */ + private $signal = null; + + /** + * @var CliMulti|null + */ + private $cliMultiHandler = null; + + /** + * @var Scheduler|null + */ + private $scheduler = null; + + private $step = 0; + + private const STEP_INIT = 1; + private const STEP_ARCHIVING = 2; + private const STEP_SCHEDULED_TASKS = 3; + private const STEP_FINISH = 4; + /** * Constructor. * @@ -272,9 +295,16 @@ public function main() $self = $this; Access::doAsSuperUser(function () use ($self) { try { + $this->step = self::STEP_INIT; $self->init(); + + $this->step = self::STEP_ARCHIVING; $self->run(); + + $this->step = self::STEP_SCHEDULED_TASKS; $self->runScheduledTasks(); + + $this->step = self::STEP_FINISH; $self->end(); } catch (StopArchiverException $e) { $this->logger->info("Archiving stopped by stop archiver exception" . $e->getMessage()); @@ -282,6 +312,33 @@ public function main() }); } + public function handleSignal(int $signal): void + { + $this->logger->info('Received system signal to stop archiving: ' . $signal); + + $this->signal = $signal; + + // initialisation and finishing can be stopped directly. + if (in_array($this->step, [self::STEP_INIT, self::STEP_FINISH])) { + $this->logger->info('Archiving stopped'); + exit; + } + + // stop archiving + if (!empty($this->cliMultiHandler)) { + $this->logger->info('Trying to stop running cli processes...'); + $this->cliMultiHandler->handleSignal($signal); + } + + // stop scheduled tasks + if (!empty($this->scheduler)) { + $this->logger->info('Trying to stop running tasks...'); + $this->scheduler->handleSignal($signal); + } + + // Note: finishing the archiving process will be handled in `run()` + } + public function init() { $this->segmentArchiving = StaticContainer::get(SegmentArchiving::class); @@ -386,6 +443,11 @@ public function run() $queueConsumer->setMaxSitesToProcess($this->maxSitesToProcess); while (true) { + if (null !== $this->signal) { + $this->logger->info("Archiving will stop now because signal to abort received"); + return; + } + if ($this->isMaintenanceModeEnabled()) { $this->logger->info("Archiving will stop now because maintenance mode is enabled"); return; @@ -498,18 +560,29 @@ private function launchArchivingFor($archives, QueueConsumer $queueConsumer) return 0; // all URLs had no visits and were using the tracker } - $cliMulti = $this->makeCliMulti(); - $cliMulti->timeRequests(); + $this->cliMultiHandler = $this->makeCliMulti(); + $this->cliMultiHandler->timeRequests(); - $responses = $cliMulti->request($urls); + $responses = $this->cliMultiHandler->request($urls); $this->disconnectDb(); - $timers = $cliMulti->getTimers(); + $timers = $this->cliMultiHandler->getTimers(); $successCount = 0; foreach ($urls as $index => $url) { $content = array_key_exists($index, $responses) ? $responses[$index] : null; + + if (null !== $this->signal && empty($content)) { + // processes killed by system + $idinvalidation = $archivesBeingQueried[$index]['idinvalidation']; + + $this->model->releaseInProgressInvalidation($idinvalidation); + $this->logger->info('Archiving process killed, reset invalidation with id ' . $idinvalidation); + + continue; + } + $checkInvalid = $this->checkResponse($content, $url); $stats = json_decode($content, $assoc = true); @@ -527,7 +600,6 @@ private function launchArchivingFor($archives, QueueConsumer $queueConsumer) $visitsForPeriod = $this->getVisitsFromApiResponse($stats); - $this->logArchiveJobFinished( $url, $timers[$index], @@ -537,7 +609,6 @@ private function launchArchivingFor($archives, QueueConsumer $queueConsumer) !$checkInvalid ); - $this->deleteInvalidatedArchives($archivesBeingQueried[$index]); $this->repairInvalidationsIfNeeded($archivesBeingQueried[$index]); @@ -618,6 +689,11 @@ public function getErrors() */ public function end() { + if (null !== $this->signal) { + // Skip if abort signal has been received + return; + } + /** * This event is triggered after archiving. * @@ -650,6 +726,11 @@ public function logFatalError($m) public function runScheduledTasks() { + if (null !== $this->signal) { + // Skip running scheduled task if abort signal has been received + return; + } + $this->logSection("SCHEDULED TASKS"); if ($this->disableScheduledTasks) { @@ -673,7 +754,8 @@ public function runScheduledTasks() // enable/disable the task Rules::$disablePureOutdatedArchive = true; - CoreAdminHomeAPI::getInstance()->runScheduledTasks(); + $this->scheduler = StaticContainer::get(Scheduler::class); + $this->scheduler->run(); $this->logSection(""); } diff --git a/core/Scheduler/Scheduler.php b/core/Scheduler/Scheduler.php index 24daa998410..0defd9dec5e 100644 --- a/core/Scheduler/Scheduler.php +++ b/core/Scheduler/Scheduler.php @@ -81,6 +81,11 @@ class Scheduler */ private $lock; + /** + * @var int|null + */ + private $signal = null; + public function __construct(TaskLoader $loader, LoggerInterface $logger, ScheduledTaskLock $lock) { $this->timetable = new Timetable(); @@ -89,6 +94,11 @@ public function __construct(TaskLoader $loader, LoggerInterface $logger, Schedul $this->lock = $lock; } + public function handleSignal(int $signal): void + { + $this->signal = $signal; + } + /** * Executes tasks that are scheduled to run, then reschedules them. * @@ -121,6 +131,11 @@ public function run() // loop through each task foreach ($tasks as $task) { + if (in_array($this->signal, [\SIGINT, \SIGTERM], true)) { + $this->logger->info("Scheduler: Aborting due to received signal"); + return $executionResults; + } + // if the task does not have the current priority level, don't execute it yet if ($task->getPriority() != $priority) { continue; diff --git a/plugins/CoreConsole/Commands/CoreArchiver.php b/plugins/CoreConsole/Commands/CoreArchiver.php index bb793130557..6c14bea7f8a 100644 --- a/plugins/CoreConsole/Commands/CoreArchiver.php +++ b/plugins/CoreConsole/Commands/CoreArchiver.php @@ -15,6 +15,26 @@ class CoreArchiver extends ConsoleCommand { + /** + * @var CronArchive|null + */ + private $archiver = null; + + public function getSystemSignalsToHandle(): array + { + return [\SIGINT, \SIGTERM]; + } + + public function handleSystemSignal(int $signal): void + { + if (null === $this->archiver) { + // archiving has not yet started, stop immediately + exit; + } + + $this->archiver->handleSignal($signal); + } + protected function configure() { $this->configureArchiveCommand($this); @@ -30,8 +50,8 @@ protected function doExecute(): int $output->writeln('' . $message . ''); } - $archiver = $this->makeArchiver($input->getOption('url')); - $archiver->main(); + $this->archiver = $this->makeArchiver($input->getOption('url')); + $this->archiver->main(); return self::SUCCESS; } From 3ac8e2226d9acb58b1c88f4503e6a694281709ca Mon Sep 17 00:00:00 2001 From: Marc Neudert Date: Tue, 13 Aug 2024 22:59:09 +0200 Subject: [PATCH 03/10] Add tests for basic core:archive signal handling --- .../Fixtures/CoreArchiverProcessSignal.php | 158 +++++++++ .../CoreArchiverProcessSignal/StepControl.php | 131 +++++++ .../CoreArchiverProcessSignalTest.php | 319 ++++++++++++++++++ 3 files changed, 608 insertions(+) create mode 100644 plugins/CoreAdminHome/tests/Fixtures/CoreArchiverProcessSignal.php create mode 100644 plugins/CoreAdminHome/tests/Fixtures/CoreArchiverProcessSignal/StepControl.php create mode 100644 plugins/CoreAdminHome/tests/Integration/Commands/CoreArchiverProcessSignalTest.php diff --git a/plugins/CoreAdminHome/tests/Fixtures/CoreArchiverProcessSignal.php b/plugins/CoreAdminHome/tests/Fixtures/CoreArchiverProcessSignal.php new file mode 100644 index 00000000000..aade969bf04 --- /dev/null +++ b/plugins/CoreAdminHome/tests/Fixtures/CoreArchiverProcessSignal.php @@ -0,0 +1,158 @@ +inTestEnv = (bool) getenv(self::ENV_TRIGGER); + $this->inTestRequest = Request::fromRequest()->getBoolParameter(self::ENV_TRIGGER, false); + $this->today = Date::today()->toString(); + + $this->stepControl = new StepControl(); + } + + public function setUp(): void + { + Rules::setBrowserTriggerArchiving(false); + Config::getInstance()->General['process_new_segments_from'] = 'segment_creation_time'; + Fixture::createSuperUser(); + + $this->setUpWebsites(); + $this->setUpSegments(); + $this->trackVisits(); + } + + public function tearDown(): void + { + // empty + } + + public function provideContainerConfig(): array + { + if (!$this->inTestEnv && !$this->inTestRequest) { + return []; + } + + if ($this->inTestRequest) { + return [ + 'observers.global' => DI::add([ + [ + 'API.CoreAdminHome.archiveReports', + DI::value(Closure::fromCallable([$this->stepControl, 'handleAPIArchiveReports'])), + ], + ]), + ]; + } + + return [ + 'ini.tests.enable_logging' => 1, + 'log.handlers' => static function (Container $c) { + return [$c->get(EchoHandler::class)]; + }, + 'observers.global' => DI::add([ + [ + 'CronArchive.alterArchivingRequestUrl', + DI::value(static function (&$url) { + $url .= '&' . self::ENV_TRIGGER . '=1'; + }), + ], + [ + 'CronArchive.init.finish', + DI::value(Closure::fromCallable([$this->stepControl, 'handleCronArchiveStart'])), + ], + ]), + ]; + } + + private function setUpSegments(): void + { + APISegmentEditor::getInstance()->add( + self::TEST_SEGMENT_CH, + self::TEST_SEGMENT_CH, + $this->idSite, + $autoArchive = true, + $enabledAllUsers = true + ); + + APISegmentEditor::getInstance()->add( + self::TEST_SEGMENT_FF, + self::TEST_SEGMENT_FF, + $this->idSite, + $autoArchive = true, + $enabledAllUsers = true + ); + } + + private function setUpWebsites(): void + { + if (!self::siteCreated($this->idSite)) { + self::createWebsite('2021-01-01'); + } + } + + private function trackVisits(): void + { + $t = self::getTracker($this->idSite, $this->today, $defaultInit = true); + $t->setUrl('http://example.org/index.htm'); + + self::checkResponse($t->doTrackPageView('0')); + } +} diff --git a/plugins/CoreAdminHome/tests/Fixtures/CoreArchiverProcessSignal/StepControl.php b/plugins/CoreAdminHome/tests/Fixtures/CoreArchiverProcessSignal/StepControl.php new file mode 100644 index 00000000000..4151bc45ec3 --- /dev/null +++ b/plugins/CoreAdminHome/tests/Fixtures/CoreArchiverProcessSignal/StepControl.php @@ -0,0 +1,131 @@ +waitForSuccess(static function () use ($parameters): bool { + // force reading from database + Option::clearCachedOption(self::OPTION_ARCHIVE_REPORTS_BLOCKED); + + $option = Option::get(self::OPTION_ARCHIVE_REPORTS_BLOCKED) ?: ''; + $block = json_decode($option, true); + + if (!is_array($block)) { + return true; + } + + return ( + $block['segment'] !== urldecode($parameters['segment'] ?: '') + || $block['period'] !== $parameters['period'] + || $block['date'] !== $parameters['date'] + ); + }); + + if (!$continue) { + throw new RuntimeException('Waiting for ArchiveReports option took too long!'); + } + } + + /** + * DI hook intercepting the "CronArchive.init.finish" event. + */ + public function handleCronArchiveStart(): void + { + $continue = $this->waitForSuccess(static function (): bool { + // force reading from database + Option::clearCachedOption(self::OPTION_CRON_ARCHIVE_BLOCKED); + + return false === Option::get(self::OPTION_CRON_ARCHIVE_BLOCKED); + }); + + if (!$continue) { + throw new RuntimeException('Waiting for CronArchive option took too long!'); + } + } + + /** + * Remove all internal blocks. + */ + public function reset(): void + { + Option::deleteLike(self::OPTION_PREFIX . '%'); + } + + /** + * Allow proceeding past the "API.CoreAdminHome.archiveReports" event. + */ + public function unblockAPIArchiveReports(): void + { + Option::delete(self::OPTION_ARCHIVE_REPORTS_BLOCKED); + } + + /** + * Allow proceeding past the "CronArchive.init.start" event. + */ + public function unblockCronArchiveStart(): void + { + Option::delete(self::OPTION_CRON_ARCHIVE_BLOCKED); + } + + /** + * Wait until a callable returns true or a timeout is reached. + */ + public function waitForSuccess(callable $check, int $timeoutInSeconds = 10): bool + { + $start = time(); + + do { + $now = time(); + + if ($check()) { + return true; + } + + // 250 millisecond sleep + usleep(250 * 1000); + } while ($timeoutInSeconds > $now - $start); + + return false; + } +} diff --git a/plugins/CoreAdminHome/tests/Integration/Commands/CoreArchiverProcessSignalTest.php b/plugins/CoreAdminHome/tests/Integration/Commands/CoreArchiverProcessSignalTest.php new file mode 100644 index 00000000000..2ef922e1783 --- /dev/null +++ b/plugins/CoreAdminHome/tests/Integration/Commands/CoreArchiverProcessSignalTest.php @@ -0,0 +1,319 @@ +markTestSkipped('signal test cannot run without ext-pcntl'); + } + + parent::setUp(); + + self::$fixture->stepControl->reset(); + + $this->dataAccessModel = new Model(); + } + + /** + * @dataProvider getArchivingWithoutSignalData + */ + public function testArchivingWithoutSignalWorks(string $method): void + { + $this->setUpArchivingMethod($method); + + // let archiving run completely + $process = $this->startCoreArchiver(); + $process->setTimeout(60); + $process->wait(); + + self::assertFalse($process->isRunning()); + + $this->assertArchiveInvalidationCount($inProgress = 0, $total = 0); + + $processOutput = $process->getOutput(); + + self::assertStringContainsString('Starting archiving for', $processOutput); + self::assertStringContainsString('Archived website id 1, period = day', $processOutput); + self::assertStringContainsString('Archived website id 1, period = week', $processOutput); + self::assertStringContainsString('Archived website id 1, period = month', $processOutput); + self::assertStringContainsString('Archived website id 1, period = year', $processOutput); + self::assertStringContainsString('Done archiving!', $processOutput); + self::assertStringContainsString('Starting Scheduled tasks...', $processOutput); + } + + public function getArchivingWithoutSignalData(): iterable + { + yield 'symfony process' => [self::METHOD_ASYNC_CLI_SYMFONY]; + } + + /** + * @dataProvider getSigintDuringArchivingData + * + * @param array{segment: string, period: string, date: string} $blockSpec + */ + public function testSigintDuringArchiving(string $method, array $blockSpec): void + { + self::$fixture->stepControl->blockCronArchiveStart(); + self::$fixture->stepControl->blockAPIArchiveReports($blockSpec); + + $this->setUpArchivingMethod($method); + + $process = $this->startCoreArchiver(); + + self::$fixture->stepControl->unblockCronArchiveStart(); + + $this->waitForArchivingToStart($process, $blockSpec); + $this->assertArchiveInvalidationCount($inProgress = 1, $total = 12); + $this->sendSignalToProcess($process, \SIGINT); + + self::$fixture->stepControl->unblockAPIArchiveReports(); + + $this->waitForProcessToStop($process); + $this->assertArchivingOutput($process, $method, \SIGINT, $blockSpec); + $this->assertArchiveInvalidationCount($inProgress = 0, $total = 11); + } + + public function getSigintDuringArchivingData(): iterable + { + yield 'symfony process' => [ + 'method' => self::METHOD_ASYNC_CLI_SYMFONY, + 'blockSpec' => ['segment' => '', 'period' => 'day', 'date' => self::$fixture->today], + ]; + } + + /** + * @dataProvider getSigtermDuringArchivingData + * + * @param array{segment: string, period: string, date: string} $blockSpec + */ + public function testSigtermDuringArchiving(string $method, array $blockSpec): void + { + self::$fixture->stepControl->blockCronArchiveStart(); + self::$fixture->stepControl->blockAPIArchiveReports($blockSpec); + + $this->setUpArchivingMethod($method); + + $process = $this->startCoreArchiver(); + + self::$fixture->stepControl->unblockCronArchiveStart(); + + $this->waitForArchivingToStart($process, $blockSpec); + $this->assertArchiveInvalidationCount($inProgress = 1, $total = 12); + $this->sendSignalToProcess($process, \SIGTERM); + + self::$fixture->stepControl->unblockAPIArchiveReports(); + + $this->waitForProcessToStop($process); + $this->assertArchivingOutput($process, $method, \SIGTERM, $blockSpec); + + // SIGTERM currently behaves like SIGINT + // this should become a "4" when full handling is available + $this->assertArchiveInvalidationCount($inProgress = 0, $total = 11); + } + + public function getSigtermDuringArchivingData(): iterable + { + yield 'symfony process' => [ + 'method' => self::METHOD_ASYNC_CLI_SYMFONY, + 'blockSpec' => ['segment' => '', 'period' => 'day', 'date' => self::$fixture->today], + ]; + } + + private function assertArchiveInvalidationCount( + int $expectedInProgress, + int $expectedTotal + ): void { + $actualInProgress = $this->dataAccessModel->getInvalidationsInProgress(self::$fixture->idSite); + $actualTotal = (int) Db::fetchOne( + 'SELECT COUNT(*) FROM ' . Common::prefixTable('archive_invalidations') . ' WHERE idsite = ?', + [self::$fixture->idSite] + ); + + self::assertSame($expectedTotal, $actualTotal); + self::assertCount($expectedInProgress, $actualInProgress); + } + + /** + * @param array{segment: string, period: string, date: string} $blockSpec + */ + private function assertArchivingOutput( + ProcessSymfony $process, + string $method, + int $signal, + array $blockSpec + ): void { + $idSite = self::$fixture->idSite; + $processOutput = $process->getOutput(); + + self::assertRegExp('/Running command.*\[method = ' . $method . ']/', $processOutput); + + self::assertStringContainsString('Starting archiving for', $processOutput); + self::assertStringContainsString('Trying to stop running cli processes...', $processOutput); + self::assertStringContainsString('Archiving will stop now because signal to abort received', $processOutput); + + if (\SIGINT === $signal) { + self::assertStringContainsString(sprintf( + "Archived website id %u, period = %s, date = %s, segment = '%s'", + $idSite, + $blockSpec['period'], + $blockSpec['date'], + $blockSpec['segment'] + ), $processOutput); + } + } + + private function sendSignalToProcess(ProcessSymfony $process, int $signal): void + { + $process->signal($signal); + + $result = self::$fixture->stepControl->waitForSuccess( + static function () use ($process, $signal): bool { + return false !== strpos( + $process->getOutput(), + 'Received system signal to stop archiving: ' . $signal + ); + } + ); + + self::assertTrue($result, 'Process did not acknowledge signal'); + } + + private function setUpArchivingMethod(string $method): void + { + if (self::METHOD_ASYNC_CLI_SYMFONY === $method) { + $featureFlag = new CliMultiProcessSymfony(); + + $environment = self::$fixture->getTestEnvironment(); + $environment->overrideConfig( + 'FeatureFlags', + $featureFlag->getName() . '_feature', + 'enabled' + ); + + $environment->save(); + } + } + + private function startCoreArchiver(): ProcessSymfony + { + // exec is mandatory to send signals to the process + // not using array notation because "Fixture::getCliCommandBase" contains parameters + $process = ProcessSymfony::fromShellCommandline(sprintf( + 'exec %s core:archive -vvv', + Fixture::getCliCommandBase() + )); + + $process->setEnv([CoreArchiverProcessSignalFixture::ENV_TRIGGER => '1']); + $process->setTimeout(null); + $process->start(); + + self::assertTrue($process->isRunning()); + self::assertNotNull($process->getPid()); + + return $process; + } + + /** + * @param array{segment: string, period: string, date: string} $blockSpec + */ + private function waitForArchivingToStart(ProcessSymfony $process, array $blockSpec): void + { + $segment = new Segment($blockSpec['segment'], [self::$fixture->idSite]); + $doneFlag = Rules::getDoneFlagArchiveContainsAllPlugins($segment); + + $result = self::$fixture->stepControl->waitForSuccess(function () use ($doneFlag, $blockSpec): bool { + $invalidations = $this->dataAccessModel->getInvalidationsInProgress(self::$fixture->idSite); + + foreach ($invalidations as $invalidation) { + if ( + $invalidation['name'] === $doneFlag + && $invalidation['period'] == Piwik::$idPeriods[$blockSpec['period']] + && $invalidation['date1'] == $blockSpec['date'] + ) { + return true; + } + } + + return false; + }); + + self::assertTrue($result, 'Invalidation did not start for: ' . json_encode($blockSpec)); + + $result = self::$fixture->stepControl->waitForSuccess( + static function () use ($process, $blockSpec): bool { + $processOutput = $process->getOutput(); + + $needles = [ + 'Running command', + 'date=' . $blockSpec['date'], + 'period=' . $blockSpec['period'] + ]; + + if ('' !== $blockSpec['segment']) { + $needles[] = 'segment=' . urlencode($blockSpec['segment']); + } + + foreach ($needles as $needle) { + if (false === strpos($processOutput, $needle)) { + return false; + } + } + + return true; + } + ); + + self::assertTrue($result, 'Archiving did not start for: ' . json_encode($blockSpec)); + } + + private function waitForProcessToStop(ProcessSymfony $process): void + { + $result = self::$fixture->stepControl->waitForSuccess(static function () use ($process): bool { + return !$process->isRunning(); + }); + + self::assertTrue($result, 'Archiving process did not stop'); + self::assertSame(0, $process->getExitCode()); + } +} + +CoreArchiverProcessSignalTest::$fixture = new CoreArchiverProcessSignalFixture(); From 723f0b3752c3edaf7e8a9539cdaa74baea59a02c Mon Sep 17 00:00:00 2001 From: Marc Neudert Date: Tue, 20 Aug 2024 22:28:46 +0200 Subject: [PATCH 04/10] Abort symfony archiving requests for SIGTERM --- core/CliMulti.php | 15 +++++++++++++++ .../Commands/CoreArchiverProcessSignalTest.php | 12 ++++++------ 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/core/CliMulti.php b/core/CliMulti.php index 187b56076be..f3776edaa25 100644 --- a/core/CliMulti.php +++ b/core/CliMulti.php @@ -111,6 +111,21 @@ public function __construct(?LoggerInterface $logger = null) public function handleSignal(int $signal): void { $this->signal = $signal; + + if (\SIGTERM !== $signal) { + return; + } + + foreach ($this->processes as $process) { + if ($process instanceof ProcessSymfony) { + $this->logger->debug( + 'Aborting command: {command} [method = asyncCliSymfony]', + ['command' => $process->getCommandLine()] + ); + + $process->stop(0); + } + } } /** diff --git a/plugins/CoreAdminHome/tests/Integration/Commands/CoreArchiverProcessSignalTest.php b/plugins/CoreAdminHome/tests/Integration/Commands/CoreArchiverProcessSignalTest.php index 2ef922e1783..85ddd256264 100644 --- a/plugins/CoreAdminHome/tests/Integration/Commands/CoreArchiverProcessSignalTest.php +++ b/plugins/CoreAdminHome/tests/Integration/Commands/CoreArchiverProcessSignalTest.php @@ -140,14 +140,9 @@ public function testSigtermDuringArchiving(string $method, array $blockSpec): vo $this->assertArchiveInvalidationCount($inProgress = 1, $total = 12); $this->sendSignalToProcess($process, \SIGTERM); - self::$fixture->stepControl->unblockAPIArchiveReports(); - $this->waitForProcessToStop($process); $this->assertArchivingOutput($process, $method, \SIGTERM, $blockSpec); - - // SIGTERM currently behaves like SIGINT - // this should become a "4" when full handling is available - $this->assertArchiveInvalidationCount($inProgress = 0, $total = 11); + $this->assertArchiveInvalidationCount($inProgress = 0, $total = 12); } public function getSigtermDuringArchivingData(): iterable @@ -199,6 +194,11 @@ private function assertArchivingOutput( $blockSpec['segment'] ), $processOutput); } + + if (\SIGTERM === $signal) { + self::assertRegExp('/Aborting command.*\[method = ' . $method . ']/', $processOutput); + self::assertStringContainsString('Archiving process killed, reset invalidation', $processOutput); + } } private function sendSignalToProcess(ProcessSymfony $process, int $signal): void From c250c650f6bb510966b6c722458380f5a8005240 Mon Sep 17 00:00:00 2001 From: Marc Neudert Date: Wed, 21 Aug 2024 08:43:51 +0200 Subject: [PATCH 05/10] Add tests for non-symfony core:archive climulti methods --- .../CoreArchiverProcessSignalTest.php | 163 +++++++++++++----- 1 file changed, 122 insertions(+), 41 deletions(-) diff --git a/plugins/CoreAdminHome/tests/Integration/Commands/CoreArchiverProcessSignalTest.php b/plugins/CoreAdminHome/tests/Integration/Commands/CoreArchiverProcessSignalTest.php index 85ddd256264..f39f5745644 100644 --- a/plugins/CoreAdminHome/tests/Integration/Commands/CoreArchiverProcessSignalTest.php +++ b/plugins/CoreAdminHome/tests/Integration/Commands/CoreArchiverProcessSignalTest.php @@ -28,7 +28,10 @@ */ class CoreArchiverProcessSignalTest extends IntegrationTestCase { + private const METHOD_ASYNC_CLI = 'asyncCli'; private const METHOD_ASYNC_CLI_SYMFONY = 'asyncCliSymfony'; + private const METHOD_CURL = 'curl'; + private const METHOD_SYNC_CLI = 'syncCli'; /** * @var CoreArchiverProcessSignalFixture @@ -61,13 +64,13 @@ public function testArchivingWithoutSignalWorks(string $method): void $this->setUpArchivingMethod($method); // let archiving run completely - $process = $this->startCoreArchiver(); + $process = $this->startCoreArchiver($method); $process->setTimeout(60); $process->wait(); self::assertFalse($process->isRunning()); - $this->assertArchiveInvalidationCount($inProgress = 0, $total = 0); + $this->assertArchiveInvalidationCount(['inProgress' => 0, 'total' => 0]); $processOutput = $process->getOutput(); @@ -78,45 +81,91 @@ public function testArchivingWithoutSignalWorks(string $method): void self::assertStringContainsString('Archived website id 1, period = year', $processOutput); self::assertStringContainsString('Done archiving!', $processOutput); self::assertStringContainsString('Starting Scheduled tasks...', $processOutput); + + if (self::METHOD_CURL === $method) { + self::assertStringContainsString('Execute HTTP API request:', $processOutput); + } else { + self::assertRegExp('/Running command.*\[method = ' . $method . ']/', $processOutput); + } } public function getArchivingWithoutSignalData(): iterable { yield 'symfony process' => [self::METHOD_ASYNC_CLI_SYMFONY]; + yield 'default process (single process)' => [self::METHOD_SYNC_CLI]; + yield 'default process (multi process)' => [self::METHOD_ASYNC_CLI]; + yield 'curl' => [self::METHOD_CURL]; } /** * @dataProvider getSigintDuringArchivingData * * @param array{segment: string, period: string, date: string} $blockSpec + * @param array{inProgress: int, total: int} $invalidationCountIntermediate + * @param array{inProgress: int, total: int} $invalidationCountFinal */ - public function testSigintDuringArchiving(string $method, array $blockSpec): void - { + public function testSigintDuringArchiving( + string $method, + array $blockSpec, + array $invalidationCountIntermediate, + array $invalidationCountFinal + ): void { self::$fixture->stepControl->blockCronArchiveStart(); self::$fixture->stepControl->blockAPIArchiveReports($blockSpec); $this->setUpArchivingMethod($method); - $process = $this->startCoreArchiver(); + $process = $this->startCoreArchiver($method); self::$fixture->stepControl->unblockCronArchiveStart(); - $this->waitForArchivingToStart($process, $blockSpec); - $this->assertArchiveInvalidationCount($inProgress = 1, $total = 12); - $this->sendSignalToProcess($process, \SIGINT); + $this->waitForArchivingToStart($process, $method, $blockSpec); + $this->assertArchiveInvalidationCount($invalidationCountIntermediate); + $this->sendSignalToProcess($process, \SIGINT, $method); self::$fixture->stepControl->unblockAPIArchiveReports(); $this->waitForProcessToStop($process); $this->assertArchivingOutput($process, $method, \SIGINT, $blockSpec); - $this->assertArchiveInvalidationCount($inProgress = 0, $total = 11); + $this->assertArchiveInvalidationCount($invalidationCountFinal); } public function getSigintDuringArchivingData(): iterable { + $specToday = ['segment' => '', 'period' => 'day', 'date' => self::$fixture->today]; + yield 'symfony process' => [ 'method' => self::METHOD_ASYNC_CLI_SYMFONY, - 'blockSpec' => ['segment' => '', 'period' => 'day', 'date' => self::$fixture->today], + 'blockSpec' => $specToday, + 'invalidationCountIntermediate' => ['inProgress' => 1, 'total' => 12], + 'invalidationCountFinal' => ['inProgress' => 0, 'total' => 11], + ]; + + yield 'default process (single process)' => [ + 'method' => self::METHOD_SYNC_CLI, + 'blockSpec' => $specToday, + 'invalidationCountIntermediate' => ['inProgress' => 1, 'total' => 12], + 'invalidationCountFinal' => ['inProgress' => 0, 'total' => 11], + ]; + + // empty day segment will always run as a single process + // so we use a non-empty segment for testing asyncCli + yield 'default process (multi process)' => [ + 'method' => self::METHOD_ASYNC_CLI, + 'blockSpec' => [ + 'segment' => CoreArchiverProcessSignalFixture::TEST_SEGMENT_CH, + 'period' => 'day', + 'date' => self::$fixture->today, + ], + 'invalidationCountIntermediate' => ['inProgress' => 2, 'total' => 11], + 'invalidationCountFinal' => ['inProgress' => 0, 'total' => 9], + ]; + + yield 'curl' => [ + 'method' => self::METHOD_CURL, + 'blockSpec' => $specToday, + 'invalidationCountIntermediate' => ['inProgress' => 1, 'total' => 12], + 'invalidationCountFinal' => ['inProgress' => 0, 'total' => 11], ]; } @@ -132,17 +181,17 @@ public function testSigtermDuringArchiving(string $method, array $blockSpec): vo $this->setUpArchivingMethod($method); - $process = $this->startCoreArchiver(); + $process = $this->startCoreArchiver($method); self::$fixture->stepControl->unblockCronArchiveStart(); - $this->waitForArchivingToStart($process, $blockSpec); - $this->assertArchiveInvalidationCount($inProgress = 1, $total = 12); - $this->sendSignalToProcess($process, \SIGTERM); + $this->waitForArchivingToStart($process, $method, $blockSpec); + $this->assertArchiveInvalidationCount(['inProgress' => 1, 'total' => 12]); + $this->sendSignalToProcess($process, \SIGTERM, $method); $this->waitForProcessToStop($process); $this->assertArchivingOutput($process, $method, \SIGTERM, $blockSpec); - $this->assertArchiveInvalidationCount($inProgress = 0, $total = 12); + $this->assertArchiveInvalidationCount(['inProgress' => 0, 'total' => 12]); } public function getSigtermDuringArchivingData(): iterable @@ -153,18 +202,19 @@ public function getSigtermDuringArchivingData(): iterable ]; } - private function assertArchiveInvalidationCount( - int $expectedInProgress, - int $expectedTotal - ): void { + /** + * @param array{inProgress: int, total: int} $expectedCounts + */ + private function assertArchiveInvalidationCount(array $expectedCounts): void + { $actualInProgress = $this->dataAccessModel->getInvalidationsInProgress(self::$fixture->idSite); $actualTotal = (int) Db::fetchOne( 'SELECT COUNT(*) FROM ' . Common::prefixTable('archive_invalidations') . ' WHERE idsite = ?', [self::$fixture->idSite] ); - self::assertSame($expectedTotal, $actualTotal); - self::assertCount($expectedInProgress, $actualInProgress); + self::assertSame($expectedCounts['total'], $actualTotal); + self::assertCount($expectedCounts['inProgress'], $actualInProgress); } /** @@ -179,12 +229,22 @@ private function assertArchivingOutput( $idSite = self::$fixture->idSite; $processOutput = $process->getOutput(); - self::assertRegExp('/Running command.*\[method = ' . $method . ']/', $processOutput); + if (self::METHOD_CURL === $method) { + self::assertStringContainsString('Execute HTTP API request:', $processOutput); + } else { + self::assertRegExp('/Running command.*\[method = ' . $method . ']/', $processOutput); + } self::assertStringContainsString('Starting archiving for', $processOutput); - self::assertStringContainsString('Trying to stop running cli processes...', $processOutput); self::assertStringContainsString('Archiving will stop now because signal to abort received', $processOutput); + if (self::METHOD_CURL !== $method) { + // curl handling does not acknowledge signals properly + // so only check this output was posted for all other methods + self::assertStringContainsString('Received system signal to stop archiving: ' . $signal, $processOutput); + self::assertStringContainsString('Trying to stop running cli processes...', $processOutput); + } + if (\SIGINT === $signal) { self::assertStringContainsString(sprintf( "Archived website id %u, period = %s, date = %s, segment = '%s'", @@ -201,10 +261,20 @@ private function assertArchivingOutput( } } - private function sendSignalToProcess(ProcessSymfony $process, int $signal): void - { + private function sendSignalToProcess( + ProcessSymfony $process, + int $signal, + string $method + ): void { $process->signal($signal); + if (in_array($method, [self::METHOD_CURL, self::METHOD_SYNC_CLI], true)) { + // not all methods are able to acknowledge the signal at this point + // wait for 250 milliseconds and rely on final result assertions + usleep(250 * 1000); + return; + } + $result = self::$fixture->stepControl->waitForSuccess( static function () use ($process, $signal): bool { return false !== strpos( @@ -219,27 +289,30 @@ static function () use ($process, $signal): bool { private function setUpArchivingMethod(string $method): void { - if (self::METHOD_ASYNC_CLI_SYMFONY === $method) { - $featureFlag = new CliMultiProcessSymfony(); + $environment = self::$fixture->getTestEnvironment(); - $environment = self::$fixture->getTestEnvironment(); - $environment->overrideConfig( - 'FeatureFlags', - $featureFlag->getName() . '_feature', - 'enabled' - ); + $featureFlag = new CliMultiProcessSymfony(); + $featureFlagConfigName = $featureFlag->getName() . '_feature'; - $environment->save(); + if (self::METHOD_ASYNC_CLI_SYMFONY === $method) { + $environment->overrideConfig('FeatureFlags', $featureFlagConfigName, 'enabled'); + } else { + $environment->removeOverriddenConfig('FeatureFlags', $featureFlagConfigName); } + + $environment->forceCliMultiViaCurl = (int) (self::METHOD_CURL === $method); + + $environment->save(); } - private function startCoreArchiver(): ProcessSymfony + private function startCoreArchiver(string $method): ProcessSymfony { // exec is mandatory to send signals to the process // not using array notation because "Fixture::getCliCommandBase" contains parameters $process = ProcessSymfony::fromShellCommandline(sprintf( - 'exec %s core:archive -vvv', - Fixture::getCliCommandBase() + 'exec %s core:archive -vvv %s', + Fixture::getCliCommandBase(), + self::METHOD_SYNC_CLI === $method ? '--concurrent-requests-per-website=1' : '' )); $process->setEnv([CoreArchiverProcessSignalFixture::ENV_TRIGGER => '1']); @@ -255,8 +328,11 @@ private function startCoreArchiver(): ProcessSymfony /** * @param array{segment: string, period: string, date: string} $blockSpec */ - private function waitForArchivingToStart(ProcessSymfony $process, array $blockSpec): void - { + private function waitForArchivingToStart( + ProcessSymfony $process, + string $method, + array $blockSpec + ): void { $segment = new Segment($blockSpec['segment'], [self::$fixture->idSite]); $doneFlag = Rules::getDoneFlagArchiveContainsAllPlugins($segment); @@ -279,15 +355,20 @@ private function waitForArchivingToStart(ProcessSymfony $process, array $blockSp self::assertTrue($result, 'Invalidation did not start for: ' . json_encode($blockSpec)); $result = self::$fixture->stepControl->waitForSuccess( - static function () use ($process, $blockSpec): bool { + static function () use ($process, $method, $blockSpec): bool { $processOutput = $process->getOutput(); $needles = [ - 'Running command', 'date=' . $blockSpec['date'], 'period=' . $blockSpec['period'] ]; + if (self::METHOD_CURL === $method) { + $needles[] = 'Execute HTTP API request'; + } else { + $needles[] = 'Running command'; + } + if ('' !== $blockSpec['segment']) { $needles[] = 'segment=' . urlencode($blockSpec['segment']); } From a156ed6b1c8c119afa892d5354f4f6682937413f Mon Sep 17 00:00:00 2001 From: Marc Neudert Date: Mon, 26 Aug 2024 19:47:23 +0200 Subject: [PATCH 06/10] Add tests for core:archive sigint fallback if sigterm is not fully supported --- .../CoreArchiverProcessSignalTest.php | 65 ++++++++++++++++--- 1 file changed, 57 insertions(+), 8 deletions(-) diff --git a/plugins/CoreAdminHome/tests/Integration/Commands/CoreArchiverProcessSignalTest.php b/plugins/CoreAdminHome/tests/Integration/Commands/CoreArchiverProcessSignalTest.php index f39f5745644..46b47e3ece6 100644 --- a/plugins/CoreAdminHome/tests/Integration/Commands/CoreArchiverProcessSignalTest.php +++ b/plugins/CoreAdminHome/tests/Integration/Commands/CoreArchiverProcessSignalTest.php @@ -99,6 +99,7 @@ public function getArchivingWithoutSignalData(): iterable /** * @dataProvider getSigintDuringArchivingData + * @dataProvider getSigtermDuringArchivingUnsupportedFallbackToSigintData * * @param array{segment: string, period: string, date: string} $blockSpec * @param array{inProgress: int, total: int} $invalidationCountIntermediate @@ -108,8 +109,12 @@ public function testSigintDuringArchiving( string $method, array $blockSpec, array $invalidationCountIntermediate, - array $invalidationCountFinal + array $invalidationCountFinal, + bool $sigtermToSigintFallback = false ): void { + $signalToProcess = $sigtermToSigintFallback ? \SIGTERM : \SIGINT; + $signalOutput = \SIGINT; + self::$fixture->stepControl->blockCronArchiveStart(); self::$fixture->stepControl->blockAPIArchiveReports($blockSpec); @@ -121,12 +126,12 @@ public function testSigintDuringArchiving( $this->waitForArchivingToStart($process, $method, $blockSpec); $this->assertArchiveInvalidationCount($invalidationCountIntermediate); - $this->sendSignalToProcess($process, \SIGINT, $method); + $this->sendSignalToProcess($process, $signalToProcess, $method); self::$fixture->stepControl->unblockAPIArchiveReports(); $this->waitForProcessToStop($process); - $this->assertArchivingOutput($process, $method, \SIGINT, $blockSpec); + $this->assertArchivingOutput($process, $method, $signalToProcess, $signalOutput, $blockSpec); $this->assertArchiveInvalidationCount($invalidationCountFinal); } @@ -169,6 +174,45 @@ public function getSigintDuringArchivingData(): iterable ]; } + public function getSigtermDuringArchivingUnsupportedFallbackToSigintData(): iterable + { + // keep in sync with getSigintDuringArchivingData + // - remove "symfony process" case + // - add "sigtermToSigintFallback = true" to all cases + + $specToday = ['segment' => '', 'period' => 'day', 'date' => self::$fixture->today]; + + yield 'default process (single process) - signal fallback' => [ + 'method' => self::METHOD_SYNC_CLI, + 'blockSpec' => $specToday, + 'invalidationCountIntermediate' => ['inProgress' => 1, 'total' => 12], + 'invalidationCountFinal' => ['inProgress' => 0, 'total' => 11], + 'sigtermToSigintFallback' => true, + ]; + + // empty day segment will always run as a single process + // so we use a non-empty segment for testing asyncCli + yield 'default process (multi process) - signal fallback' => [ + 'method' => self::METHOD_ASYNC_CLI, + 'blockSpec' => [ + 'segment' => CoreArchiverProcessSignalFixture::TEST_SEGMENT_CH, + 'period' => 'day', + 'date' => self::$fixture->today, + ], + 'invalidationCountIntermediate' => ['inProgress' => 2, 'total' => 11], + 'invalidationCountFinal' => ['inProgress' => 0, 'total' => 9], + 'sigtermToSigintFallback' => true, + ]; + + yield 'curl - signal fallback' => [ + 'method' => self::METHOD_CURL, + 'blockSpec' => $specToday, + 'invalidationCountIntermediate' => ['inProgress' => 1, 'total' => 12], + 'invalidationCountFinal' => ['inProgress' => 0, 'total' => 11], + 'sigtermToSigintFallback' => true, + ]; + } + /** * @dataProvider getSigtermDuringArchivingData * @@ -190,7 +234,7 @@ public function testSigtermDuringArchiving(string $method, array $blockSpec): vo $this->sendSignalToProcess($process, \SIGTERM, $method); $this->waitForProcessToStop($process); - $this->assertArchivingOutput($process, $method, \SIGTERM, $blockSpec); + $this->assertArchivingOutput($process, $method, \SIGTERM, \SIGTERM, $blockSpec); $this->assertArchiveInvalidationCount(['inProgress' => 0, 'total' => 12]); } @@ -223,7 +267,8 @@ private function assertArchiveInvalidationCount(array $expectedCounts): void private function assertArchivingOutput( ProcessSymfony $process, string $method, - int $signal, + int $signalToProcess, + int $signalOutput, array $blockSpec ): void { $idSite = self::$fixture->idSite; @@ -241,11 +286,15 @@ private function assertArchivingOutput( if (self::METHOD_CURL !== $method) { // curl handling does not acknowledge signals properly // so only check this output was posted for all other methods - self::assertStringContainsString('Received system signal to stop archiving: ' . $signal, $processOutput); + self::assertStringContainsString( + 'Received system signal to stop archiving: ' . $signalToProcess, + $processOutput + ); + self::assertStringContainsString('Trying to stop running cli processes...', $processOutput); } - if (\SIGINT === $signal) { + if (\SIGINT === $signalOutput) { self::assertStringContainsString(sprintf( "Archived website id %u, period = %s, date = %s, segment = '%s'", $idSite, @@ -255,7 +304,7 @@ private function assertArchivingOutput( ), $processOutput); } - if (\SIGTERM === $signal) { + if (\SIGTERM === $signalOutput) { self::assertRegExp('/Aborting command.*\[method = ' . $method . ']/', $processOutput); self::assertStringContainsString('Archiving process killed, reset invalidation', $processOutput); } From 0975917c4c46121da7b17c796dde2ca4f6e248dc Mon Sep 17 00:00:00 2001 From: Marc Neudert Date: Mon, 26 Aug 2024 16:57:58 +0200 Subject: [PATCH 07/10] Add tests for signal handling during core:archive init --- .../CoreArchiverProcessSignalTest.php | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/plugins/CoreAdminHome/tests/Integration/Commands/CoreArchiverProcessSignalTest.php b/plugins/CoreAdminHome/tests/Integration/Commands/CoreArchiverProcessSignalTest.php index 46b47e3ece6..4bcea23bf0b 100644 --- a/plugins/CoreAdminHome/tests/Integration/Commands/CoreArchiverProcessSignalTest.php +++ b/plugins/CoreAdminHome/tests/Integration/Commands/CoreArchiverProcessSignalTest.php @@ -97,6 +97,40 @@ public function getArchivingWithoutSignalData(): iterable yield 'curl' => [self::METHOD_CURL]; } + /** + * @dataProvider getArchivingStoppedDuringInitData + */ + public function testArchivingStoppedDuringInit(int $signal): void + { + self::$fixture->stepControl->blockCronArchiveStart(); + + // we don't care for the exact method, pick one with low setup complexity + $this->setUpArchivingMethod(self::METHOD_ASYNC_CLI); + + $process = $this->startCoreArchiver(self::METHOD_ASYNC_CLI); + + // wait until initialization step is reached + $result = self::$fixture->stepControl->waitForSuccess(static function () use ($process): bool { + return false !== strpos($process->getOutput(), 'Async process archiving supported'); + }); + + self::assertTrue($result, 'Archiving initialization check did not succeed'); + + $this->sendSignalToProcess($process, $signal, self::METHOD_ASYNC_CLI); + $this->waitForProcessToStop($process); + + $processOutput = $process->getOutput(); + + self::assertStringContainsString('Received system signal to stop archiving: ' . $signal, $processOutput); + self::assertStringContainsString('Archiving stopped', $processOutput); + } + + public function getArchivingStoppedDuringInitData(): iterable + { + yield 'stop using sigint' => [\SIGINT]; + yield 'stop using sigterm' => [\SIGTERM]; + } + /** * @dataProvider getSigintDuringArchivingData * @dataProvider getSigtermDuringArchivingUnsupportedFallbackToSigintData From 45b76b6aaf51bb4f15a157db3c754cb3f4b3221f Mon Sep 17 00:00:00 2001 From: Marc Neudert Date: Mon, 26 Aug 2024 17:31:30 +0200 Subject: [PATCH 08/10] Add tests for signal handling during core:archive scheduled tasks --- .../Fixtures/CoreArchiverProcessSignal.php | 8 +++ .../CoreArchiverProcessSignal/StepControl.php | 53 +++++++++++++++++++ .../CoreArchiverProcessSignalTest.php | 46 ++++++++++++++++ 3 files changed, 107 insertions(+) diff --git a/plugins/CoreAdminHome/tests/Fixtures/CoreArchiverProcessSignal.php b/plugins/CoreAdminHome/tests/Fixtures/CoreArchiverProcessSignal.php index aade969bf04..c0629c1b815 100644 --- a/plugins/CoreAdminHome/tests/Fixtures/CoreArchiverProcessSignal.php +++ b/plugins/CoreAdminHome/tests/Fixtures/CoreArchiverProcessSignal.php @@ -118,6 +118,14 @@ public function provideContainerConfig(): array 'CronArchive.init.finish', DI::value(Closure::fromCallable([$this->stepControl, 'handleCronArchiveStart'])), ], + [ + 'ScheduledTasks.execute', + DI::value(Closure::fromCallable([$this->stepControl, 'handleScheduledTasksExecute'])), + ], + [ + 'ScheduledTasks.shouldExecuteTask', + DI::value(Closure::fromCallable([$this->stepControl, 'handleScheduledTasksShouldExecute'])), + ] ]), ]; } diff --git a/plugins/CoreAdminHome/tests/Fixtures/CoreArchiverProcessSignal/StepControl.php b/plugins/CoreAdminHome/tests/Fixtures/CoreArchiverProcessSignal/StepControl.php index 4151bc45ec3..57c2871b3a7 100644 --- a/plugins/CoreAdminHome/tests/Fixtures/CoreArchiverProcessSignal/StepControl.php +++ b/plugins/CoreAdminHome/tests/Fixtures/CoreArchiverProcessSignal/StepControl.php @@ -20,6 +20,8 @@ class StepControl private const OPTION_ARCHIVE_REPORTS_BLOCKED = self::OPTION_PREFIX . 'ArchiveReportsBlocked'; private const OPTION_CRON_ARCHIVE_BLOCKED = self::OPTION_PREFIX . 'CronArchiveBlocked'; + private const OPTION_SCHEDULED_TASKS_BLOCKED = self::OPTION_PREFIX . 'ScheduledTasksBlocked'; + private const OPTION_SCHEDULED_TASKS_EXECUTE = self::OPTION_PREFIX . 'ScheduledTasksExecute'; /** * Block proceeding from the "API.CoreAdminHome.archiveReports" event. @@ -31,6 +33,14 @@ public function blockAPIArchiveReports(array $blockSpec): void Option::set(self::OPTION_ARCHIVE_REPORTS_BLOCKED, json_encode($blockSpec)); } + /** + * Block proceeding from the "ScheduledTasks.execute" event. + */ + public function blockScheduledTasks(): void + { + Option::set(self::OPTION_SCHEDULED_TASKS_BLOCKED, true); + } + /** * Block proceeding from the "CronArchive.init.finish" event. */ @@ -39,6 +49,14 @@ public function blockCronArchiveStart(): void Option::set(self::OPTION_CRON_ARCHIVE_BLOCKED, true); } + /** + * Force scheduled tasks to execute. + */ + public function executeScheduledTasks(): void + { + Option::set(self::OPTION_SCHEDULED_TASKS_EXECUTE, true); + } + /** * DI hook intercepting the "API.CoreAdminHome.archiveReports" event. */ @@ -84,6 +102,33 @@ public function handleCronArchiveStart(): void } } + /** + * DI hook intercepting the "ScheduledTasks.execute" event. + */ + public function handleScheduledTasksExecute(): void + { + $continue = $this->waitForSuccess(static function (): bool { + // force reading from database + Option::clearCachedOption(self::OPTION_SCHEDULED_TASKS_BLOCKED); + + return false === Option::get(self::OPTION_SCHEDULED_TASKS_BLOCKED); + }); + + if (!$continue) { + throw new RuntimeException('Waiting for ScheduledTask option took too long!'); + } + } + + /** + * DI hook intercepting the "ScheduledTasks.shouldExecuteTask" event. + */ + public function handleScheduledTasksShouldExecute(bool &$shouldExecuteTask): void + { + if (Option::get(self::OPTION_SCHEDULED_TASKS_EXECUTE)) { + $shouldExecuteTask = true; + } + } + /** * Remove all internal blocks. */ @@ -108,6 +153,14 @@ public function unblockCronArchiveStart(): void Option::delete(self::OPTION_CRON_ARCHIVE_BLOCKED); } + /** + * Allow proceeding past the "ScheduledTasks.execute" event. + */ + public function unblockScheduledTasks(): void + { + Option::delete(self::OPTION_SCHEDULED_TASKS_BLOCKED); + } + /** * Wait until a callable returns true or a timeout is reached. */ diff --git a/plugins/CoreAdminHome/tests/Integration/Commands/CoreArchiverProcessSignalTest.php b/plugins/CoreAdminHome/tests/Integration/Commands/CoreArchiverProcessSignalTest.php index 4bcea23bf0b..3f85315130b 100644 --- a/plugins/CoreAdminHome/tests/Integration/Commands/CoreArchiverProcessSignalTest.php +++ b/plugins/CoreAdminHome/tests/Integration/Commands/CoreArchiverProcessSignalTest.php @@ -15,8 +15,10 @@ use Piwik\DataAccess\Model; use Piwik\Db; use Piwik\Piwik; +use Piwik\Plugins\CoreAdminHome\Tasks; use Piwik\Plugins\CoreAdminHome\tests\Fixtures\CoreArchiverProcessSignal as CoreArchiverProcessSignalFixture; use Piwik\Plugins\CoreConsole\FeatureFlags\CliMultiProcessSymfony; +use Piwik\Scheduler\Task; use Piwik\Segment; use Piwik\Tests\Framework\Fixture; use Piwik\Tests\Framework\TestCase\IntegrationTestCase; @@ -280,6 +282,50 @@ public function getSigtermDuringArchivingData(): iterable ]; } + /** + * @dataProvider getScheduledTasksStoppedData + */ + public function testScheduledTasksStopped(int $signal): void + { + self::$fixture->stepControl->blockScheduledTasks(); + self::$fixture->stepControl->executeScheduledTasks(); + + // we don't care for the exact method, pick one with low setup complexity + $this->setUpArchivingMethod(self::METHOD_ASYNC_CLI); + + $process = $this->startCoreArchiver(self::METHOD_ASYNC_CLI); + + // wait until scheduled tasks are running + $result = self::$fixture->stepControl->waitForSuccess(static function () use ($process): bool { + return false !== strpos($process->getOutput(), 'Scheduler: executing task'); + }, $timeoutInSeconds = 60); + + self::assertTrue($result, 'Scheduled tasks did not start'); + + $this->sendSignalToProcess($process, $signal, self::METHOD_ASYNC_CLI); + + self::$fixture->stepControl->unblockScheduledTasks(); + + $this->waitForProcessToStop($process); + + $processOutput = $process->getOutput(); + $expectedExecutedTask = Task::getTaskName(Tasks::class, 'invalidateOutdatedArchives', null); + $expectedSkippedTask = Task::getTaskName(Tasks::class, 'purgeOutdatedArchives', null); + + self::assertStringContainsString('executing task ' . $expectedExecutedTask, $processOutput); + self::assertStringNotContainsString('executing task ' . $expectedSkippedTask, $processOutput); + + self::assertStringContainsString('Received system signal to stop archiving: ' . $signal, $processOutput); + self::assertStringContainsString('Trying to stop running tasks...', $processOutput); + self::assertStringContainsString('Scheduler: Aborting due to received signal', $processOutput); + } + + public function getScheduledTasksStoppedData(): iterable + { + yield 'stop using sigint' => [\SIGINT]; + yield 'stop using sigterm' => [\SIGTERM]; + } + /** * @param array{inProgress: int, total: int} $expectedCounts */ From 291474569587e5b912315efd8e17d6deea2efff9 Mon Sep 17 00:00:00 2001 From: Marc Neudert Date: Tue, 27 Aug 2024 21:04:06 +0200 Subject: [PATCH 09/10] Stop scheduled tasks after receiving SIGINT/SIGTERM --- .../Commands/RunScheduledTasks.php | 41 +++++++++++++++++-- 1 file changed, 37 insertions(+), 4 deletions(-) diff --git a/plugins/CoreAdminHome/Commands/RunScheduledTasks.php b/plugins/CoreAdminHome/Commands/RunScheduledTasks.php index 01792a2e8c7..bb514386864 100644 --- a/plugins/CoreAdminHome/Commands/RunScheduledTasks.php +++ b/plugins/CoreAdminHome/Commands/RunScheduledTasks.php @@ -11,11 +11,45 @@ use Piwik\Container\StaticContainer; use Piwik\FrontController; +use Piwik\Log\LoggerInterface; use Piwik\Plugin\ConsoleCommand; use Piwik\Scheduler\Scheduler; class RunScheduledTasks extends ConsoleCommand { + /** + * @var LoggerInterface + */ + private $logger; + + /** + * @var Scheduler|null + */ + private $scheduler = null; + + public function __construct(LoggerInterface $logger = null) + { + parent::__construct(); + + $this->logger = $logger ?: StaticContainer::get(LoggerInterface::class); + } + + public function getSystemSignalsToHandle(): array + { + return [\SIGINT, \SIGTERM]; + } + + public function handleSystemSignal(int $signal): void + { + if (null === $this->scheduler) { + // scheduled tasks have not yet started, stop immediately + exit; + } + + $this->logger->info('Received system signal to stop scheduled tasks: ' . $signal); + $this->scheduler->handleSignal($signal); + } + protected function configure() { $this->setName('scheduled-tasks:run'); @@ -37,15 +71,14 @@ protected function doExecute(): int FrontController::getInstance()->init(); // TODO use dependency injection - /** @var Scheduler $scheduler */ - $scheduler = StaticContainer::get('Piwik\Scheduler\Scheduler'); + $this->scheduler = StaticContainer::get(Scheduler::class); $task = $input->getArgument('task'); if ($task) { - $this->runSingleTask($scheduler, $task); + $this->runSingleTask($this->scheduler, $task); } else { - $scheduler->run(); + $this->scheduler->run(); } $this->writeSuccessMessage('Scheduled Tasks executed'); From f5dcce5ccf302dad345492103efb6e07a0137d26 Mon Sep 17 00:00:00 2001 From: Marc Neudert Date: Tue, 27 Aug 2024 23:44:52 +0200 Subject: [PATCH 10/10] Add tests for signal handling during scheduled-tasks:run --- .../RunScheduledTasksProcessSignal.php | 83 +++++++++++ .../StepControl.php | 84 +++++++++++ .../RunScheduledTasksProcessSignalTest.php | 131 ++++++++++++++++++ 3 files changed, 298 insertions(+) create mode 100644 plugins/CoreAdminHome/tests/Fixtures/RunScheduledTasksProcessSignal.php create mode 100644 plugins/CoreAdminHome/tests/Fixtures/RunScheduledTasksProcessSignal/StepControl.php create mode 100644 plugins/CoreAdminHome/tests/Integration/Commands/RunScheduledTasksProcessSignalTest.php diff --git a/plugins/CoreAdminHome/tests/Fixtures/RunScheduledTasksProcessSignal.php b/plugins/CoreAdminHome/tests/Fixtures/RunScheduledTasksProcessSignal.php new file mode 100644 index 00000000000..06f1c313487 --- /dev/null +++ b/plugins/CoreAdminHome/tests/Fixtures/RunScheduledTasksProcessSignal.php @@ -0,0 +1,83 @@ +inTestEnv = (bool) getenv(self::ENV_TRIGGER); + + $this->stepControl = new StepControl(); + } + + public function setUp(): void + { + Fixture::createSuperUser(); + + if (!self::siteCreated($this->idSite)) { + self::createWebsite('2021-01-01'); + } + } + + public function tearDown(): void + { + // empty + } + + public function provideContainerConfig(): array + { + if (!$this->inTestEnv) { + return []; + } + + return [ + 'ini.tests.enable_logging' => 1, + 'log.handlers' => static function (Container $c) { + return [$c->get(EchoHandler::class)]; + }, + 'observers.global' => DI::add([ + [ + 'ScheduledTasks.execute', + DI::value(Closure::fromCallable([$this->stepControl, 'handleScheduledTasksExecute'])), + ], + ]), + ]; + } +} diff --git a/plugins/CoreAdminHome/tests/Fixtures/RunScheduledTasksProcessSignal/StepControl.php b/plugins/CoreAdminHome/tests/Fixtures/RunScheduledTasksProcessSignal/StepControl.php new file mode 100644 index 00000000000..41df6653560 --- /dev/null +++ b/plugins/CoreAdminHome/tests/Fixtures/RunScheduledTasksProcessSignal/StepControl.php @@ -0,0 +1,84 @@ +waitForSuccess(static function (): bool { + // force reading from database + Option::clearCachedOption(self::OPTION_SCHEDULED_TASKS_BLOCKED); + + return false === Option::get(self::OPTION_SCHEDULED_TASKS_BLOCKED); + }); + + if (!$continue) { + throw new RuntimeException('Waiting for ScheduledTask option took too long!'); + } + } + + /** + * Remove all internal blocks. + */ + public function reset(): void + { + Option::deleteLike(self::OPTION_PREFIX . '%'); + } + + /** + * Allow proceeding past the "ScheduledTasks.execute" event. + */ + public function unblockScheduledTasks(): void + { + Option::delete(self::OPTION_SCHEDULED_TASKS_BLOCKED); + } + + /** + * Wait until a callable returns true or a timeout is reached. + */ + public function waitForSuccess(callable $check, int $timeoutInSeconds = 10): bool + { + $start = time(); + + do { + $now = time(); + + if ($check()) { + return true; + } + + // 250 millisecond sleep + usleep(250 * 1000); + } while ($timeoutInSeconds > $now - $start); + + return false; + } +} diff --git a/plugins/CoreAdminHome/tests/Integration/Commands/RunScheduledTasksProcessSignalTest.php b/plugins/CoreAdminHome/tests/Integration/Commands/RunScheduledTasksProcessSignalTest.php new file mode 100644 index 00000000000..d141465eff2 --- /dev/null +++ b/plugins/CoreAdminHome/tests/Integration/Commands/RunScheduledTasksProcessSignalTest.php @@ -0,0 +1,131 @@ +markTestSkipped('signal test cannot run without ext-pcntl'); + } + + parent::setUp(); + + self::$fixture->stepControl->reset(); + } + + /** + * @dataProvider getScheduledTasksStoppedData + */ + public function testScheduledTasksStopped(int $signal): void + { + self::$fixture->stepControl->blockScheduledTasks(); + + $process = $this->startScheduledTasks(); + + // wait until scheduled tasks are running + $result = self::$fixture->stepControl->waitForSuccess(static function () use ($process): bool { + return false !== strpos($process->getOutput(), 'Scheduler: executing task'); + }, $timeoutInSeconds = 30); + + self::assertTrue($result, 'Scheduled tasks did not start'); + + $this->sendSignalToProcess($process, $signal); + + self::$fixture->stepControl->unblockScheduledTasks(); + + $this->waitForProcessToStop($process); + + $processOutput = $process->getOutput(); + $expectedExecutedTask = Task::getTaskName(Tasks::class, 'invalidateOutdatedArchives', null); + $expectedSkippedTask = Task::getTaskName(Tasks::class, 'purgeOutdatedArchives', null); + + self::assertStringContainsString('executing task ' . $expectedExecutedTask, $processOutput); + self::assertStringNotContainsString('executing task ' . $expectedSkippedTask, $processOutput); + + self::assertStringContainsString( + 'Received system signal to stop scheduled tasks: ' . $signal, + $processOutput + ); + + self::assertStringContainsString('Scheduler: Aborting due to received signal', $processOutput); + } + + public function getScheduledTasksStoppedData(): iterable + { + yield 'stop using sigint' => [\SIGINT]; + yield 'stop using sigterm' => [\SIGTERM]; + } + + private function sendSignalToProcess(ProcessSymfony $process, int $signal): void + { + $process->signal($signal); + + $result = self::$fixture->stepControl->waitForSuccess( + static function () use ($process, $signal): bool { + return false !== strpos( + $process->getOutput(), + 'Received system signal to stop scheduled tasks: ' . $signal + ); + } + ); + + self::assertTrue($result, 'Process did not acknowledge signal'); + } + + private function startScheduledTasks(): ProcessSymfony + { + // exec is mandatory to send signals to the process + // not using array notation because "Fixture::getCliCommandBase" contains parameters + $process = ProcessSymfony::fromShellCommandline(sprintf( + 'exec %s scheduled-tasks:run -vvv --force', + Fixture::getCliCommandBase() + )); + + $process->setEnv([RunScheduledTasksProcessSignalFixture::ENV_TRIGGER => '1']); + $process->setTimeout(null); + $process->start(); + + self::assertTrue($process->isRunning()); + self::assertNotNull($process->getPid()); + + return $process; + } + + private function waitForProcessToStop(ProcessSymfony $process): void + { + $result = self::$fixture->stepControl->waitForSuccess(static function () use ($process): bool { + return !$process->isRunning(); + }); + + self::assertTrue($result, 'Archiving process did not stop'); + self::assertSame(0, $process->getExitCode()); + } +} + +RunScheduledTasksProcessSignalTest::$fixture = new RunScheduledTasksProcessSignalFixture();