diff --git a/CHANGELOG.md b/CHANGELOG.md index bfa8d4fb..fcfa774e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,9 +2,13 @@ ## Next version +### 🚀 New + +* [#45](https://github.com/sdss/lvmgort/pull/45) Added a `health` module that emits a heartbeat to `lvmbeat` and monitors actor health by ping, restarting them if found unresponsible. The pre-observing task now also perform that check. + ### ✨ Improved -* [#44](https://vscode.dev/github/sdss/lvmgort/pull/44) RORR RID-019: disables the Overwatcher if rain is detected and requires a human to re-enable it when conditions are safe. +* [#44](https://github.com/sdss/lvmgort/pull/44) RORR RID-019: disables the Overwatcher if rain is detected and requires a human to re-enable it when conditions are safe. * [#46](https://github.com/sdss/lvmgort/pull/46) RORR RID-017: treat lost connectivity to the internet or LCO as an unsafe condition and close. * Create the night log during the pre-observing task. * The `gort overwatcher` command now accepts a `--verbose` flat that allow to set the verbosity level of the messages output to the console while the Overwatcher is running (the file log level is always DEBUG). The default level is now WARNING. diff --git a/src/gort/devices/spec.py b/src/gort/devices/spec.py index a6673d65..8ab4fe05 100644 --- a/src/gort/devices/spec.py +++ b/src/gort/devices/spec.py @@ -381,6 +381,16 @@ async def are_idle(self): return all(await self.call_device_method(Spectrograph.is_idle)) + async def are_reading(self): + """Returns :obj:`True` if any of the spectrographs are reading.""" + + return any(await self.call_device_method(Spectrograph.is_reading)) + + async def are_exposing(self): + """Returns :obj:`True` if any of the spectrographs are exposing.""" + + return any(await self.call_device_method(Spectrograph.is_exposing)) + async def are_errored(self): """Returns :obj:`True` if any of the spectrographs are errored.""" diff --git a/src/gort/etc/lvmgort.yml b/src/gort/etc/lvmgort.yml index 62951410..790cd774 100644 --- a/src/gort/etc/lvmgort.yml +++ b/src/gort/etc/lvmgort.yml @@ -287,6 +287,8 @@ overwatcher: scheduler: open_dome_buffer: 300 + disabled_actors: null + slack: notifications_channels: - lvm-overwatcher diff --git a/src/gort/overwatcher/calibration.py b/src/gort/overwatcher/calibration.py index d1842663..2e669b54 100644 --- a/src/gort/overwatcher/calibration.py +++ b/src/gort/overwatcher/calibration.py @@ -391,6 +391,10 @@ async def get_next(self): overwatcher = self.cals_overwatcher.overwatcher open_dome_buffer = overwatcher.config["overwatcher.scheduler.open_dome_buffer"] + # Do not allow calibrations if the Overwatcher is troubleshooting. + if overwatcher.state.troubleshooting: + return None + now: float = time.time() done_cals: set[str] = set() diff --git a/src/gort/overwatcher/health.py b/src/gort/overwatcher/health.py index e4d1186b..6f791ce5 100644 --- a/src/gort/overwatcher/health.py +++ b/src/gort/overwatcher/health.py @@ -12,7 +12,11 @@ from typing import ClassVar +from lvmopstools.retrier import Retrier +from lvmopstools.utils import with_timeout + from gort.overwatcher.core import OverwatcherModule, OverwatcherModuleTask +from gort.overwatcher.helpers import get_failed_actors, restart_actors from gort.tools import decap @@ -49,9 +53,105 @@ async def task(self): await asyncio.sleep(self.INTERVAL) +class ActorHealthMonitorTask(OverwatcherModuleTask["HealthOverwatcher"]): + """Monitors the health of actors.""" + + name = "actor_health_monitor_task" + keep_alive = True + restart_on_error = True + + INTERVAL: ClassVar[float] = 60 + + def __init__(self): + super().__init__() + + async def task(self): + """Monitors the health of actors.""" + + while True: + failed: list[str] = [] + + try: + failed = await get_failed_actors( + discard_disabled=True, + discard_overwatcher=True, + ) + if len(failed) > 0: + if self.overwatcher.state.enabled: + await self.restart_actors(failed) + else: + self.log.info( + f"Found unresponsible actors: {', '.join(failed)}. " + "Not restarting because the Overwatcher is disabled." + ) + + except Exception as err: + self.log.error( + f"Failed to check actor health: {err}", + exc_info=err, + ) + + if len(failed) > 0: + await self.overwatcher.shutdown( + "actors found unresponsible and cannot be restarted.", + disable_overwatcher=True, + ) + + finally: + self.overwatcher.state.troubleshooting = False + + await asyncio.sleep(self.INTERVAL) + + @Retrier(max_attempts=2, delay=5) + async def restart_actors(self, failed_actors): + """Restarts actors that have failed.""" + + ow = self.overwatcher + ow.state.troubleshooting = True + + actors_join = ", ".join(failed_actors) + is_observing = ow.observer.is_observing + is_calibrating = ow.calibrations.get_running_calibration() is not None + + await self.notify(f"Found unresponsible actors: {', '.join(failed_actors)}.") + + if is_observing: + if ( + self.gort.specs.last_exposure + and not self.gort.specs.last_exposure.done() + and not await self.gort.specs.are_reading() + ): + await self.notify("Waiting to read exposures before cancelling.") + await with_timeout( + self.gort.specs.last_exposure, # type: ignore + timeout=60, + raise_on_timeout=False, + ) + + await ow.observer.stop_observing( + immediate=True, + reason=f"Found unresponsible actors: {actors_join}. " + "Cancelling observations and restarting them.", + ) + + if is_calibrating: + await self.notify( + f"Found unresponsible actors: {actors_join}. " + "Cancelling calibrations and restarting them.", + ) + await ow.calibrations.cancel() + + await self.notify("Restarting actors.") + await restart_actors(failed_actors, self.gort) + + await self.notify("Actor restart complete. Resuming normal operations.") + + ow.state.troubleshooting = False + + class HealthOverwatcher(OverwatcherModule): """Monitors health.""" name = "health" - tasks = [EmitHeartbeatTask()] + tasks = [EmitHeartbeatTask(), ActorHealthMonitorTask()] delay = 0 diff --git a/src/gort/overwatcher/helpers/__init__.py b/src/gort/overwatcher/helpers/__init__.py index b2b80394..fa46162d 100644 --- a/src/gort/overwatcher/helpers/__init__.py +++ b/src/gort/overwatcher/helpers/__init__.py @@ -9,4 +9,5 @@ from __future__ import annotations from .dome import DomeHelper, DomeStatus +from .health import get_failed_actors, ping_actors, restart_actors from .notifier import BasicNotifier, NotifierMixIn, OverwatcherProtocol diff --git a/src/gort/overwatcher/helpers/health.py b/src/gort/overwatcher/helpers/health.py new file mode 100644 index 00000000..7372bf81 --- /dev/null +++ b/src/gort/overwatcher/helpers/health.py @@ -0,0 +1,200 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# @Author: José Sánchez-Gallego (gallegoj@uw.edu) +# @Date: 2024-12-29 +# @Filename: health.py +# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause) + +from __future__ import annotations + +import asyncio +import time + +from typing import TYPE_CHECKING, Sequence + +from gort import config +from gort.tools import ( + get_lvmapi_route, + kubernetes_list_deployments, + kubernetes_restart_deployment, +) + + +if TYPE_CHECKING: + from gort.gort import Gort + + +async def ping_actors() -> dict[str, bool]: + """Pings all actors in the system.""" + + return await get_lvmapi_route("/actors/ping") + + +async def get_failed_actors( + discard_disabled: bool = False, + discard_overwatcher: bool = False, +) -> list[str]: + """Returns a list of failed actors.""" + + disabled_actors = config["overwatcher.disabled_actors"] or [] + + actor_status = await ping_actors() + + failed_actors = set([ac for ac in actor_status if not actor_status[ac]]) + + if discard_disabled: + for actor in disabled_actors: # Ignore actors we know are disabled. + failed_actors.discard(actor) + + if discard_overwatcher: + failed_actors.discard("lvm.overwatcher") + + return list(failed_actors) + + +async def restart_actors( + actors: str | Sequence[str], + gort: Gort, + post_restart_actions: bool = True, + timeout: float = 60, +) -> None: + """Restarts one or more actors. + + This is a high-level function that tries to restart a series of actors, along + with their deployments, in an optimal way. If multiple actors correspond to the + same deployment, only one deployment restart is triggered. + + Parameters + ---------- + actors + The actor or actors to restart. + gort + The Gort instance. Used to refresh the command set in the affected actors. + post_restart_actions + Whether to run post-restart actions. This may include resetting spectrographs, + homing devices, etc. + timeout + The timeout in seconds to wait for the restart to complete. + + """ + + if isinstance(actors, str): + actors = [actors] + + try: + actor_to_deployment = await get_lvmapi_route("/actors/actor-to-deployment") + deployment_to_actors = await get_lvmapi_route("/actors/deployment-to-actors") + except Exception as ee: + raise RuntimeError("Unable to retrieve actor-deployment mappings.") from ee + + deployments_to_restart = set() + for actor in actors: + if actor not in actor_to_deployment: + raise ValueError(f"Actor {actor} not found in actor-to-deployment mapping.") + deployments_to_restart.add(actor_to_deployment[actor]) + + for deployment in deployments_to_restart: + if deployment not in deployment_to_actors: + raise ValueError(f"Actors for deployment {deployment!r} cannot be found.") + + await restart_deployment( + gort, + deployment, + deployment_to_actors[deployment], + timeout=timeout, + ) + + for actor in actors: + if actor in gort.actors: + gort.log.debug(f"Refreshing command list for actor {actor!r} in Gort.") + await gort.actors[actor].refresh() + else: + gort.log.warning(f"Actor {actor!r} not found in Gort. Adding it.") + gort.add_actor(actor) + await gort.actors[actor].init() + + if post_restart_actions: + gort.log.debug("Running post-restart actions.") + await run_post_restart_actions(gort, actors) + + +async def restart_deployment( + gort: Gort, + deployment: str, + actors: list[str], + timeout: float = 60, +): + """Restarts a deployment, waiting until all its actors ping.""" + + start_time = time.time() + + gort.log.warning(f"Restarting deployment {deployment}.") + await kubernetes_restart_deployment(deployment) + + await asyncio.sleep(5) + + while True: + if deployment in await kubernetes_list_deployments(): + gort.log.info(f"Deployment {deployment} restarted.") + break + + if time.time() - start_time > timeout: + raise RuntimeError( + f"Timed out waiting for deployment {deployment!r} to restart." + ) + + await asyncio.sleep(1) + + while True: + actor_pings = {actor: False for actor in actors} + + for actor in actors: + ping_cmd = await gort.send_command(actor, "ping", time_limit=2) + if ping_cmd.status.did_succeed: + actor_pings[actor] = True + + if all(actor_pings.values()): + gort.log.info(f"All actors in deployment {deployment} are pinging.") + break + + if time.time() - start_time > timeout: + raise RuntimeError( + f"Timed out waiting actors in deployment {deployment!r} to ping." + ) + + +async def run_post_restart_actions(gort: Gort, actors: Sequence[str]): + """Runs post-restart actions for a list of actors.""" + + # Start with the easy ones. If the AG actors or specs have been restarted, + # reset them. + agcam_restarted = any(".agcam" in actor for actor in actors) + scp_restarted = any("lvmscp" in actor for actor in actors) + + if agcam_restarted: + gort.log.info("AG actors restarted. Reconnecting AG cameras.") + await gort.ags.reconnect() + + if scp_restarted: + gort.log.info("SCP actors restarted. Resetting spectrographs.") + await gort.specs.abort() + await gort.specs.reset(full=True) + + # Now re-home telescope devices. + for telescope in ["sci", "spec", "skye", "skyw"]: + for device in ["km", "fibsel", "foc"]: + if f"lvm.{telescope}.{device}" not in actors: + continue + + gort.log.info(f"Re-homing {device!r} on telescope {telescope}.") + + tel_dev = gort.telescopes[telescope] + dev = getattr(tel_dev, "focuser" if device == "foc" else device) + if dev is None: + continue + + try: + await dev.home() + except Exception: + gort.log.warning(f"Failed to home {device!r} on telescope {telescope}.") diff --git a/src/gort/overwatcher/observer.py b/src/gort/overwatcher/observer.py index b35745c1..822ac544 100644 --- a/src/gort/overwatcher/observer.py +++ b/src/gort/overwatcher/observer.py @@ -61,7 +61,7 @@ async def task(self): elif self.module.is_observing or self.module.is_cancelling: pass - elif not state.enabled: + elif not state.enabled or state.troubleshooting: pass elif not ephemeris: diff --git a/src/gort/overwatcher/overwatcher.py b/src/gort/overwatcher/overwatcher.py index 87706dff..19527be1 100644 --- a/src/gort/overwatcher/overwatcher.py +++ b/src/gort/overwatcher/overwatcher.py @@ -20,7 +20,6 @@ from sdsstools.utils import GatheringTaskGroup from gort.exceptions import GortError -from gort.gort import Gort from gort.overwatcher.alerts import ActiveAlert from gort.overwatcher.core import OverwatcherBaseTask, OverwatcherModule from gort.overwatcher.helpers import DomeHelper @@ -31,6 +30,7 @@ if TYPE_CHECKING: + from gort.gort import Gort from gort.overwatcher.helpers.notifier import NotificationLevel @@ -90,12 +90,15 @@ async def task(self): try: is_safe, _ = ow.alerts.is_safe() is_night = ow.ephemeris.is_night() + is_troubleshooting = ow.troubleshooter.is_troubleshooting() ow.state.night = is_night ow.state.safe = is_safe ow.state.observing = ow.observer.is_observing ow.state.focusing = ow.observer.focusing - ow.state.troubleshooting = ow.troubleshooter.is_troubleshooting() + ow.state.troubleshooting = ( + ow.state.troubleshooting or is_troubleshooting + ) running_calibration = ow.calibrations.get_running_calibration() ow.state.calibrating = running_calibration is not None @@ -294,6 +297,7 @@ def __init__( dry_run: bool = False, **kwargs, ): + from gort import Gort # Needs to be imported here to avoid circular imports. from gort.overwatcher import ( AlertsOverwatcher, CalibrationsOverwatcher, diff --git a/src/gort/overwatcher/transparency.py b/src/gort/overwatcher/transparency.py index 0bb72aa8..60a87cb0 100644 --- a/src/gort/overwatcher/transparency.py +++ b/src/gort/overwatcher/transparency.py @@ -20,8 +20,7 @@ from sdsstools.utils import GatheringTaskGroup from gort.exceptions import GortError -from gort.overwatcher.core import OverwatcherModuleTask -from gort.overwatcher.overwatcher import OverwatcherModule +from gort.overwatcher.core import OverwatcherModule, OverwatcherModuleTask from gort.tools import cancel_task, decap, get_lvmapi_route diff --git a/src/gort/recipes/operations.py b/src/gort/recipes/operations.py index 56965ddd..0286f3c1 100644 --- a/src/gort/recipes/operations.py +++ b/src/gort/recipes/operations.py @@ -16,6 +16,7 @@ from sdsstools.utils import GatheringTaskGroup +from gort.overwatcher.helpers import get_failed_actors, restart_actors from gort.tools import decap, get_lvmapi_route, overwatcher_is_running from .base import BaseRecipe @@ -292,9 +293,22 @@ class PreObservingRecipe(BaseRecipe): name = "pre-observing" - async def recipe(self): + async def recipe(self, check_actors: bool = True): """Runs the pre-observing sequence.""" + if check_actors: + self.gort.log.info("Checking actors.") + failed_actors = await get_failed_actors(discard_disabled=True) + + if len(failed_actors) > 0: + self.gort.log.warning(f"Failed to ping actors: {failed_actors}.") + self.gort.log.info("Restarting actors.") + await restart_actors(list(failed_actors), self.gort) + await asyncio.sleep(5) + self.gort.log.info("Restart complete.") + else: + self.gort.log.info("All actors are pinging.") + # Run a clean-up first in case there are any issues with the specs. await self.gort.cleanup(readout=False) @@ -342,7 +356,7 @@ class PostObservingRecipe(BaseRecipe): email_route: ClassVar[str] = "/logs/night-logs/0/email" - async def recipe(self, send_email: bool = True): + async def recipe(self, send_email: bool = True, force_park: bool = False): """Runs the post-observing sequence.""" tasks = [] @@ -351,7 +365,10 @@ async def recipe(self, send_email: bool = True): if not closed: tasks.append(self.gort.enclosure.close(retry_without_parking=True)) - tasks.append(self.gort.telescopes.park()) + parked = [await tel.is_parked() for tel in self.gort.telescopes.values()] + if force_park or not all(parked): + tasks.append(self.gort.telescopes.park()) + tasks.append(self.gort.nps.calib.all_off()) tasks.append(self.gort.guiders.stop())