Skip to content

Commit

Permalink
Monitor actor health (#45)
Browse files Browse the repository at this point in the history
* Add Overwatcher helper to monitor and restart actors

* Do not observer or calibrate while troubleshooting

* Add task to monitor actor health

* Check actors in pre-observing recipe

* Small fixes

* Prevent redundant parking of telescopes in post-observing recipe

* Do not override internal config with actor config

* Missing indentation

* Try to read out exposure before cancelling observation

* If health module fails to restart actors, shutdown and disable

* Update changelog

* Fix typing issue in DailyTasks

* Merge branch 'main' into albireox-actor-health

* Merge branch 'main' into albireox-actor-health

* Merge branch 'main' into albireox-actor-health

* Typo checking if internet and lco are set or not

* Rename actor.py to health.py and improve import circularity

* Do not check overwatcher health

* Allow to set the verbosity level for console output in Overwatcher actor
  • Loading branch information
albireox authored Jan 1, 2025
1 parent 5f55393 commit 2bd55e1
Show file tree
Hide file tree
Showing 11 changed files with 351 additions and 10 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@

## Next version

### 🚀 New

* [#45](https://github.com/sdss/lvmgort/pull/45) Added a `health` module that emits a heartbeat to `lvmbeat` and monitors actor health by ping, restarting them if found unresponsible. The pre-observing task now also perform that check.

### ✨ Improved

* [#44](https://vscode.dev/github/sdss/lvmgort/pull/44) RORR RID-019: disables the Overwatcher if rain is detected and requires a human to re-enable it when conditions are safe.
* [#44](https://github.com/sdss/lvmgort/pull/44) RORR RID-019: disables the Overwatcher if rain is detected and requires a human to re-enable it when conditions are safe.
* [#46](https://github.com/sdss/lvmgort/pull/46) RORR RID-017: treat lost connectivity to the internet or LCO as an unsafe condition and close.
* Create the night log during the pre-observing task.
* The `gort overwatcher` command now accepts a `--verbose` flat that allow to set the verbosity level of the messages output to the console while the Overwatcher is running (the file log level is always DEBUG). The default level is now WARNING.
Expand Down
10 changes: 10 additions & 0 deletions src/gort/devices/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,16 @@ async def are_idle(self):

return all(await self.call_device_method(Spectrograph.is_idle))

async def are_reading(self):
"""Returns :obj:`True` if any of the spectrographs are reading."""

return any(await self.call_device_method(Spectrograph.is_reading))

async def are_exposing(self):
"""Returns :obj:`True` if any of the spectrographs are exposing."""

return any(await self.call_device_method(Spectrograph.is_exposing))

async def are_errored(self):
"""Returns :obj:`True` if any of the spectrographs are errored."""

Expand Down
2 changes: 2 additions & 0 deletions src/gort/etc/lvmgort.yml
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,8 @@ overwatcher:
scheduler:
open_dome_buffer: 300

disabled_actors: null

slack:
notifications_channels:
- lvm-overwatcher
Expand Down
4 changes: 4 additions & 0 deletions src/gort/overwatcher/calibration.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,10 @@ async def get_next(self):
overwatcher = self.cals_overwatcher.overwatcher
open_dome_buffer = overwatcher.config["overwatcher.scheduler.open_dome_buffer"]

# Do not allow calibrations if the Overwatcher is troubleshooting.
if overwatcher.state.troubleshooting:
return None

now: float = time.time()
done_cals: set[str] = set()

Expand Down
102 changes: 101 additions & 1 deletion src/gort/overwatcher/health.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@

from typing import ClassVar

from lvmopstools.retrier import Retrier
from lvmopstools.utils import with_timeout

from gort.overwatcher.core import OverwatcherModule, OverwatcherModuleTask
from gort.overwatcher.helpers import get_failed_actors, restart_actors
from gort.tools import decap


Expand Down Expand Up @@ -49,9 +53,105 @@ async def task(self):
await asyncio.sleep(self.INTERVAL)


class ActorHealthMonitorTask(OverwatcherModuleTask["HealthOverwatcher"]):
"""Monitors the health of actors."""

name = "actor_health_monitor_task"
keep_alive = True
restart_on_error = True

INTERVAL: ClassVar[float] = 60

def __init__(self):
super().__init__()

async def task(self):
"""Monitors the health of actors."""

while True:
failed: list[str] = []

try:
failed = await get_failed_actors(
discard_disabled=True,
discard_overwatcher=True,
)
if len(failed) > 0:
if self.overwatcher.state.enabled:
await self.restart_actors(failed)
else:
self.log.info(
f"Found unresponsible actors: {', '.join(failed)}. "
"Not restarting because the Overwatcher is disabled."
)

except Exception as err:
self.log.error(
f"Failed to check actor health: {err}",
exc_info=err,
)

if len(failed) > 0:
await self.overwatcher.shutdown(
"actors found unresponsible and cannot be restarted.",
disable_overwatcher=True,
)

finally:
self.overwatcher.state.troubleshooting = False

await asyncio.sleep(self.INTERVAL)

@Retrier(max_attempts=2, delay=5)
async def restart_actors(self, failed_actors):
"""Restarts actors that have failed."""

ow = self.overwatcher
ow.state.troubleshooting = True

actors_join = ", ".join(failed_actors)
is_observing = ow.observer.is_observing
is_calibrating = ow.calibrations.get_running_calibration() is not None

await self.notify(f"Found unresponsible actors: {', '.join(failed_actors)}.")

if is_observing:
if (
self.gort.specs.last_exposure
and not self.gort.specs.last_exposure.done()
and not await self.gort.specs.are_reading()
):
await self.notify("Waiting to read exposures before cancelling.")
await with_timeout(
self.gort.specs.last_exposure, # type: ignore
timeout=60,
raise_on_timeout=False,
)

await ow.observer.stop_observing(
immediate=True,
reason=f"Found unresponsible actors: {actors_join}. "
"Cancelling observations and restarting them.",
)

if is_calibrating:
await self.notify(
f"Found unresponsible actors: {actors_join}. "
"Cancelling calibrations and restarting them.",
)
await ow.calibrations.cancel()

await self.notify("Restarting actors.")
await restart_actors(failed_actors, self.gort)

await self.notify("Actor restart complete. Resuming normal operations.")

ow.state.troubleshooting = False


class HealthOverwatcher(OverwatcherModule):
"""Monitors health."""

name = "health"
tasks = [EmitHeartbeatTask()]
tasks = [EmitHeartbeatTask(), ActorHealthMonitorTask()]
delay = 0
1 change: 1 addition & 0 deletions src/gort/overwatcher/helpers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@
from __future__ import annotations

from .dome import DomeHelper, DomeStatus
from .health import get_failed_actors, ping_actors, restart_actors
from .notifier import BasicNotifier, NotifierMixIn, OverwatcherProtocol
200 changes: 200 additions & 0 deletions src/gort/overwatcher/helpers/health.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego ([email protected])
# @Date: 2024-12-29
# @Filename: health.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

import asyncio
import time

from typing import TYPE_CHECKING, Sequence

from gort import config
from gort.tools import (
get_lvmapi_route,
kubernetes_list_deployments,
kubernetes_restart_deployment,
)


if TYPE_CHECKING:
from gort.gort import Gort


async def ping_actors() -> dict[str, bool]:
"""Pings all actors in the system."""

return await get_lvmapi_route("/actors/ping")


async def get_failed_actors(
discard_disabled: bool = False,
discard_overwatcher: bool = False,
) -> list[str]:
"""Returns a list of failed actors."""

disabled_actors = config["overwatcher.disabled_actors"] or []

actor_status = await ping_actors()

failed_actors = set([ac for ac in actor_status if not actor_status[ac]])

if discard_disabled:
for actor in disabled_actors: # Ignore actors we know are disabled.
failed_actors.discard(actor)

if discard_overwatcher:
failed_actors.discard("lvm.overwatcher")

return list(failed_actors)


async def restart_actors(
actors: str | Sequence[str],
gort: Gort,
post_restart_actions: bool = True,
timeout: float = 60,
) -> None:
"""Restarts one or more actors.
This is a high-level function that tries to restart a series of actors, along
with their deployments, in an optimal way. If multiple actors correspond to the
same deployment, only one deployment restart is triggered.
Parameters
----------
actors
The actor or actors to restart.
gort
The Gort instance. Used to refresh the command set in the affected actors.
post_restart_actions
Whether to run post-restart actions. This may include resetting spectrographs,
homing devices, etc.
timeout
The timeout in seconds to wait for the restart to complete.
"""

if isinstance(actors, str):
actors = [actors]

try:
actor_to_deployment = await get_lvmapi_route("/actors/actor-to-deployment")
deployment_to_actors = await get_lvmapi_route("/actors/deployment-to-actors")
except Exception as ee:
raise RuntimeError("Unable to retrieve actor-deployment mappings.") from ee

deployments_to_restart = set()
for actor in actors:
if actor not in actor_to_deployment:
raise ValueError(f"Actor {actor} not found in actor-to-deployment mapping.")
deployments_to_restart.add(actor_to_deployment[actor])

for deployment in deployments_to_restart:
if deployment not in deployment_to_actors:
raise ValueError(f"Actors for deployment {deployment!r} cannot be found.")

await restart_deployment(
gort,
deployment,
deployment_to_actors[deployment],
timeout=timeout,
)

for actor in actors:
if actor in gort.actors:
gort.log.debug(f"Refreshing command list for actor {actor!r} in Gort.")
await gort.actors[actor].refresh()
else:
gort.log.warning(f"Actor {actor!r} not found in Gort. Adding it.")
gort.add_actor(actor)
await gort.actors[actor].init()

if post_restart_actions:
gort.log.debug("Running post-restart actions.")
await run_post_restart_actions(gort, actors)


async def restart_deployment(
gort: Gort,
deployment: str,
actors: list[str],
timeout: float = 60,
):
"""Restarts a deployment, waiting until all its actors ping."""

start_time = time.time()

gort.log.warning(f"Restarting deployment {deployment}.")
await kubernetes_restart_deployment(deployment)

await asyncio.sleep(5)

while True:
if deployment in await kubernetes_list_deployments():
gort.log.info(f"Deployment {deployment} restarted.")
break

if time.time() - start_time > timeout:
raise RuntimeError(
f"Timed out waiting for deployment {deployment!r} to restart."
)

await asyncio.sleep(1)

while True:
actor_pings = {actor: False for actor in actors}

for actor in actors:
ping_cmd = await gort.send_command(actor, "ping", time_limit=2)
if ping_cmd.status.did_succeed:
actor_pings[actor] = True

if all(actor_pings.values()):
gort.log.info(f"All actors in deployment {deployment} are pinging.")
break

if time.time() - start_time > timeout:
raise RuntimeError(
f"Timed out waiting actors in deployment {deployment!r} to ping."
)


async def run_post_restart_actions(gort: Gort, actors: Sequence[str]):
"""Runs post-restart actions for a list of actors."""

# Start with the easy ones. If the AG actors or specs have been restarted,
# reset them.
agcam_restarted = any(".agcam" in actor for actor in actors)
scp_restarted = any("lvmscp" in actor for actor in actors)

if agcam_restarted:
gort.log.info("AG actors restarted. Reconnecting AG cameras.")
await gort.ags.reconnect()

if scp_restarted:
gort.log.info("SCP actors restarted. Resetting spectrographs.")
await gort.specs.abort()
await gort.specs.reset(full=True)

# Now re-home telescope devices.
for telescope in ["sci", "spec", "skye", "skyw"]:
for device in ["km", "fibsel", "foc"]:
if f"lvm.{telescope}.{device}" not in actors:
continue

gort.log.info(f"Re-homing {device!r} on telescope {telescope}.")

tel_dev = gort.telescopes[telescope]
dev = getattr(tel_dev, "focuser" if device == "foc" else device)
if dev is None:
continue

try:
await dev.home()
except Exception:
gort.log.warning(f"Failed to home {device!r} on telescope {telescope}.")
2 changes: 1 addition & 1 deletion src/gort/overwatcher/observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async def task(self):
elif self.module.is_observing or self.module.is_cancelling:
pass

elif not state.enabled:
elif not state.enabled or state.troubleshooting:
pass

elif not ephemeris:
Expand Down
Loading

0 comments on commit 2bd55e1

Please sign in to comment.