Source code for lvmnps.actor.actor

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: Mingyeong YANG (mingyeong@khu.ac.kr), Florian Briegel (briegel@mpia.de)
# @Date: 2021-03-22
# @Filename: lvmnps/actor/actor.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

import asyncio
import os
from typing import ClassVar, Dict

import click

from clu import Command
from clu.actor import AMQPActor

from lvmnps.actor.commands import parser as nps_command_parser
from lvmnps.switch.factory import powerSwitchFactory


__all__ = ["NPSActor"]


[docs]class NPSActor(AMQPActor): """LVM network power switches base actor. Subclassed from the `.AMQPActor` class. """ parser: ClassVar[click.Group] = nps_command_parser BASE_CONFIG: ClassVar[str | Dict | None] = None def __init__(self, *args, **kwargs): if "schema" not in kwargs: kwargs["schema"] = os.path.join( os.path.dirname(__file__), "../etc/schema.json", ) super().__init__(*args, **kwargs) self.connect_timeout = 3
[docs] async def start(self): """Start the actor and connect the power switches.""" await super().start() if "timeouts" in self.config and "switch_connect" in self.config["timeouts"]: self.connect_timeout = self.config["timeouts"]["switch_connect"] assert len(self.parser_args) == 1 switches = list(self.parser_args[0].values()) # self.parser_args[0] is the list of switch instances tasks = [ asyncio.wait_for(switch.start(), timeout=self.connect_timeout) for switch in switches ] results = await asyncio.gather(*tasks, return_exceptions=True) valid_switches = [] for ii, result in enumerate(results): switch_name = switches[ii].name if isinstance(result, Exception): self.log.error( f"Unexpected exception of type {type(result)} while initialising " f"switch {switch_name}: {str(result)}" ) else: valid_switches.append(switches[ii]) self.parser_args[0] = {switch.name: switch for switch in valid_switches} all_names = [ outlet.name.lower() for switch in valid_switches for outlet in switch.outlets ] if len(all_names) != len(list(set(all_names))): self.log.warning("Repeated outlet names. This may lead to errors.") self.log.debug("Start done")
[docs] async def stop(self): """Stop the actor and disconnect the power switches.""" for switch in self.parser_args[0].values(): try: self.log.debug(f"Stop {switch.name} ...") await asyncio.wait_for(switch.stop(), timeout=self.connect_timeout) except Exception as ex: self.log.error(f"Unexpected exception of {type(ex)}: {ex}") return await super().stop()
[docs] @classmethod def from_config(cls, config, *args, **kwargs): """Creates an actor from a configuration file.""" if config is None: if cls.BASE_CONFIG is None: raise RuntimeError("The class does not have a base configuration.") config = cls.BASE_CONFIG instance = super(NPSActor, cls).from_config(config, *args, **kwargs) assert isinstance(instance, NPSActor) assert isinstance(instance.config, dict) switches = {} if "switches" in instance.config: for (key, swconfig) in instance.config["switches"].items(): if "name" in swconfig: name = swconfig["name"] else: name = key instance.log.info(f"Instance {name}: {swconfig}") try: switches[name] = powerSwitchFactory(name, swconfig, instance.log) except Exception as ex: instance.log.error(f"Power switch factory error {type(ex)}: {ex}") instance.parser_args = [switches] return instance
NPSCommand = Command[NPSActor]