Cleanup supervisor/udev device action / add GPIO (#2488)

* Cleanup supervisor/udev device action / add GPIO

* Apply francks comment
This commit is contained in:
Pascal Vizeli 2021-01-31 14:24:22 +01:00 committed by GitHub
parent 5cc47c9222
commit babcc0de0c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 40 additions and 17 deletions

View File

@ -37,8 +37,18 @@ class PolicyGroup(str, Enum):
AUDIO = "audio"
class UdevAction(str, Enum):
"""Udev device action."""
class HardwareAction(str, Enum):
"""Hardware device action."""
ADD = "add"
REMOVE = "remove"
class UdevKernelAction(str, Enum):
"""Udev kernel device action."""
ADD = "add"
REMOVE = "remove"
CHANGE = "change"
BIND = "bind"
UNBIND = "unbind"

View File

@ -11,7 +11,7 @@ from ..const import CoreState
from ..coresys import CoreSys, CoreSysAttributes
from ..exceptions import HardwareNotFound
from ..resolution.const import UnhealthyReason
from .const import PolicyGroup, UdevAction, UdevSubsystem
from .const import HardwareAction, PolicyGroup, UdevKernelAction, UdevSubsystem
from .data import Device
_LOGGER: logging.Logger = logging.getLogger(__name__)
@ -71,23 +71,26 @@ class HwMonitor(CoreSysAttributes):
):
return
forward_action = None
hw_action = None
device = None
##
# Remove
if action in ("remove", "unbind") and udev is None:
if (
action in (UdevKernelAction.REMOVE, UdevKernelAction.UNBIND)
and udev is None
):
try:
device = self.sys_hardware.get_by_path(Path(kernel.sys_path))
except HardwareNotFound:
return
else:
self.sys_hardware.delete_device(device)
forward_action = UdevAction.REMOVE
hw_action = HardwareAction.REMOVE
##
# Add
if action in ("add", "bind") and udev is not None:
if action in (UdevKernelAction.ADD, UdevKernelAction.BIND) and udev is not None:
device = Device(
udev.sys_name,
Path(udev.device_node),
@ -97,38 +100,42 @@ class HwMonitor(CoreSysAttributes):
{attr: udev.properties[attr] for attr in udev.properties},
)
self.sys_hardware.update_device(device)
forward_action = UdevAction.ADD
hw_action = HardwareAction.ADD
# Process Action
if (
device
and forward_action
and hw_action
and self.sys_core.state in (CoreState.RUNNING, CoreState.FREEZE)
):
# New Sound device
if device.subsystem == UdevSubsystem.AUDIO:
self._action_sound(device, forward_action)
self._action_sound(device, hw_action)
# serial device
elif device.subsystem == UdevSubsystem.SERIAL:
self._action_tty(device, forward_action)
self._action_tty(device, hw_action)
# input device
elif device.subsystem == UdevSubsystem.INPUT:
self._action_input(device, forward_action)
self._action_input(device, hw_action)
# USB device
elif device.subsystem == UdevSubsystem.USB:
self._action_usb(device, forward_action)
self._action_usb(device, hw_action)
def _action_sound(self, device: Device, action: UdevAction):
# GPIO device
elif device.subsystem == UdevSubsystem.GPIO:
self._action_gpio(device, hw_action)
def _action_sound(self, device: Device, action: HardwareAction):
"""Process sound actions."""
if not self.sys_hardware.policy.is_match_cgroup(PolicyGroup.AUDIO, device):
return
_LOGGER.info("Detecting %s audio hardware - %s", action, device.path)
self.sys_loop.call_later(2, self.sys_create_task, self.sys_host.sound.update())
def _action_tty(self, device: Device, action: UdevAction):
def _action_tty(self, device: Device, action: HardwareAction):
"""Process tty actions."""
if not device.by_id or not self.sys_hardware.policy.is_match_cgroup(
PolicyGroup.UART, device
@ -138,7 +145,7 @@ class HwMonitor(CoreSysAttributes):
"Detecting %s serial hardware %s - %s", action, device.path, device.by_id
)
def _action_input(self, device: Device, action: UdevAction):
def _action_input(self, device: Device, action: HardwareAction):
"""Process input actions."""
if not device.by_id:
return
@ -146,8 +153,14 @@ class HwMonitor(CoreSysAttributes):
"Detecting %s serial hardware %s - %s", action, device.path, device.by_id
)
def _action_usb(self, device: Device, action: UdevAction):
def _action_usb(self, device: Device, action: HardwareAction):
"""Process usb actions."""
if not self.sys_hardware.policy.is_match_cgroup(PolicyGroup.USB, device):
return
_LOGGER.info("Detecting %s usb hardware %s", action, device.path)
def _action_gpio(self, device: Device, action: HardwareAction):
"""Process gpio actions."""
if not self.sys_hardware.policy.is_match_cgroup(PolicyGroup.GPIO, device):
return
_LOGGER.info("Detecting %s GPIO hardware %s", action, device.path)