Move ping classes to their own module (#102448)

This commit is contained in:
Jan-Philipp Benecke 2023-10-22 10:12:59 +02:00 committed by GitHub
parent 973b8900a9
commit 62d8472757
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 166 additions and 153 deletions

View File

@ -956,6 +956,7 @@ omit =
homeassistant/components/ping/__init__.py
homeassistant/components/ping/binary_sensor.py
homeassistant/components/ping/device_tracker.py
homeassistant/components/ping/helpers.py
homeassistant/components/pioneer/media_player.py
homeassistant/components/plaato/__init__.py
homeassistant/components/plaato/binary_sensor.py

View File

@ -1,14 +1,10 @@
"""Tracks the latency of a host by sending ICMP echo requests (ping)."""
from __future__ import annotations
import asyncio
from contextlib import suppress
from datetime import timedelta
import logging
import re
from typing import TYPE_CHECKING, Any
from typing import Any
from icmplib import NameLookupError, async_ping
import voluptuous as vol
from homeassistant.components.binary_sensor import (
@ -24,7 +20,8 @@ from homeassistant.helpers.restore_state import RestoreEntity
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from . import PingDomainData
from .const import DOMAIN, ICMP_TIMEOUT, PING_TIMEOUT
from .const import DOMAIN
from .helpers import PingDataICMPLib, PingDataSubProcess
_LOGGER = logging.getLogger(__name__)
@ -43,16 +40,6 @@ SCAN_INTERVAL = timedelta(minutes=5)
PARALLEL_UPDATES = 50
PING_MATCHER = re.compile(
r"(?P<min>\d+.\d+)\/(?P<avg>\d+.\d+)\/(?P<max>\d+.\d+)\/(?P<mdev>\d+.\d+)"
)
PING_MATCHER_BUSYBOX = re.compile(
r"(?P<min>\d+.\d+)\/(?P<avg>\d+.\d+)\/(?P<max>\d+.\d+)"
)
WIN32_PING_MATCHER = re.compile(r"(?P<min>\d+)ms.+(?P<max>\d+)ms.+(?P<avg>\d+)ms")
PLATFORM_SCHEMA = PARENT_PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_HOST): cv.string,
@ -142,140 +129,3 @@ class PingBinarySensor(RestoreEntity, BinarySensorEntity):
"avg": attributes[ATTR_ROUND_TRIP_TIME_AVG],
"mdev": attributes[ATTR_ROUND_TRIP_TIME_MDEV],
}
class PingData:
"""The base class for handling the data retrieval."""
def __init__(self, hass: HomeAssistant, host: str, count: int) -> None:
"""Initialize the data object."""
self.hass = hass
self._ip_address = host
self._count = count
self.data: dict[str, Any] | None = None
self.is_alive = False
class PingDataICMPLib(PingData):
"""The Class for handling the data retrieval using icmplib."""
def __init__(
self, hass: HomeAssistant, host: str, count: int, privileged: bool | None
) -> None:
"""Initialize the data object."""
super().__init__(hass, host, count)
self._privileged = privileged
async def async_update(self) -> None:
"""Retrieve the latest details from the host."""
_LOGGER.debug("ping address: %s", self._ip_address)
try:
data = await async_ping(
self._ip_address,
count=self._count,
timeout=ICMP_TIMEOUT,
privileged=self._privileged,
)
except NameLookupError:
self.is_alive = False
return
self.is_alive = data.is_alive
if not self.is_alive:
self.data = None
return
self.data = {
"min": data.min_rtt,
"max": data.max_rtt,
"avg": data.avg_rtt,
"mdev": "",
}
class PingDataSubProcess(PingData):
"""The Class for handling the data retrieval using the ping binary."""
def __init__(
self, hass: HomeAssistant, host: str, count: int, privileged: bool | None
) -> None:
"""Initialize the data object."""
super().__init__(hass, host, count)
self._ping_cmd = [
"ping",
"-n",
"-q",
"-c",
str(self._count),
"-W1",
self._ip_address,
]
async def async_ping(self) -> dict[str, Any] | None:
"""Send ICMP echo request and return details if success."""
pinger = await asyncio.create_subprocess_exec(
*self._ping_cmd,
stdin=None,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
close_fds=False, # required for posix_spawn
)
try:
async with asyncio.timeout(self._count + PING_TIMEOUT):
out_data, out_error = await pinger.communicate()
if out_data:
_LOGGER.debug(
"Output of command: `%s`, return code: %s:\n%s",
" ".join(self._ping_cmd),
pinger.returncode,
out_data,
)
if out_error:
_LOGGER.debug(
"Error of command: `%s`, return code: %s:\n%s",
" ".join(self._ping_cmd),
pinger.returncode,
out_error,
)
if pinger.returncode and pinger.returncode > 1:
# returncode of 1 means the host is unreachable
_LOGGER.exception(
"Error running command: `%s`, return code: %s",
" ".join(self._ping_cmd),
pinger.returncode,
)
if "max/" not in str(out_data):
match = PING_MATCHER_BUSYBOX.search(
str(out_data).rsplit("\n", maxsplit=1)[-1]
)
if TYPE_CHECKING:
assert match is not None
rtt_min, rtt_avg, rtt_max = match.groups()
return {"min": rtt_min, "avg": rtt_avg, "max": rtt_max, "mdev": ""}
match = PING_MATCHER.search(str(out_data).rsplit("\n", maxsplit=1)[-1])
if TYPE_CHECKING:
assert match is not None
rtt_min, rtt_avg, rtt_max, rtt_mdev = match.groups()
return {"min": rtt_min, "avg": rtt_avg, "max": rtt_max, "mdev": rtt_mdev}
except asyncio.TimeoutError:
_LOGGER.exception(
"Timed out running command: `%s`, after: %ss",
self._ping_cmd,
self._count + PING_TIMEOUT,
)
if pinger:
with suppress(TypeError):
await pinger.kill() # type: ignore[func-returns-value]
del pinger
return None
except AttributeError:
return None
async def async_update(self) -> None:
"""Retrieve the latest details from the host."""
self.data = await self.async_ping()
self.is_alive = self.data is not None

View File

@ -0,0 +1,162 @@
"""Ping classes shared between platforms."""
import asyncio
from contextlib import suppress
import logging
import re
from typing import TYPE_CHECKING, Any
from icmplib import NameLookupError, async_ping
from homeassistant.core import HomeAssistant
from .const import ICMP_TIMEOUT, PING_TIMEOUT
_LOGGER = logging.getLogger(__name__)
PING_MATCHER = re.compile(
r"(?P<min>\d+.\d+)\/(?P<avg>\d+.\d+)\/(?P<max>\d+.\d+)\/(?P<mdev>\d+.\d+)"
)
PING_MATCHER_BUSYBOX = re.compile(
r"(?P<min>\d+.\d+)\/(?P<avg>\d+.\d+)\/(?P<max>\d+.\d+)"
)
WIN32_PING_MATCHER = re.compile(r"(?P<min>\d+)ms.+(?P<max>\d+)ms.+(?P<avg>\d+)ms")
class PingData:
"""The base class for handling the data retrieval."""
data: dict[str, Any] | None = None
is_alive: bool = False
def __init__(self, hass: HomeAssistant, host: str, count: int) -> None:
"""Initialize the data object."""
self.hass = hass
self._ip_address = host
self._count = count
class PingDataICMPLib(PingData):
"""The Class for handling the data retrieval using icmplib."""
def __init__(
self, hass: HomeAssistant, host: str, count: int, privileged: bool | None
) -> None:
"""Initialize the data object."""
super().__init__(hass, host, count)
self._privileged = privileged
async def async_update(self) -> None:
"""Retrieve the latest details from the host."""
_LOGGER.debug("ping address: %s", self._ip_address)
try:
data = await async_ping(
self._ip_address,
count=self._count,
timeout=ICMP_TIMEOUT,
privileged=self._privileged,
)
except NameLookupError:
self.is_alive = False
return
self.is_alive = data.is_alive
if not self.is_alive:
self.data = None
return
self.data = {
"min": data.min_rtt,
"max": data.max_rtt,
"avg": data.avg_rtt,
"mdev": "",
}
class PingDataSubProcess(PingData):
"""The Class for handling the data retrieval using the ping binary."""
def __init__(
self, hass: HomeAssistant, host: str, count: int, privileged: bool | None
) -> None:
"""Initialize the data object."""
super().__init__(hass, host, count)
self._ping_cmd = [
"ping",
"-n",
"-q",
"-c",
str(self._count),
"-W1",
self._ip_address,
]
async def async_ping(self) -> dict[str, Any] | None:
"""Send ICMP echo request and return details if success."""
pinger = await asyncio.create_subprocess_exec(
*self._ping_cmd,
stdin=None,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
close_fds=False, # required for posix_spawn
)
try:
async with asyncio.timeout(self._count + PING_TIMEOUT):
out_data, out_error = await pinger.communicate()
if out_data:
_LOGGER.debug(
"Output of command: `%s`, return code: %s:\n%s",
" ".join(self._ping_cmd),
pinger.returncode,
out_data,
)
if out_error:
_LOGGER.debug(
"Error of command: `%s`, return code: %s:\n%s",
" ".join(self._ping_cmd),
pinger.returncode,
out_error,
)
if pinger.returncode and pinger.returncode > 1:
# returncode of 1 means the host is unreachable
_LOGGER.exception(
"Error running command: `%s`, return code: %s",
" ".join(self._ping_cmd),
pinger.returncode,
)
if "max/" not in str(out_data):
match = PING_MATCHER_BUSYBOX.search(
str(out_data).rsplit("\n", maxsplit=1)[-1]
)
if TYPE_CHECKING:
assert match is not None
rtt_min, rtt_avg, rtt_max = match.groups()
return {"min": rtt_min, "avg": rtt_avg, "max": rtt_max, "mdev": ""}
match = PING_MATCHER.search(str(out_data).rsplit("\n", maxsplit=1)[-1])
if TYPE_CHECKING:
assert match is not None
rtt_min, rtt_avg, rtt_max, rtt_mdev = match.groups()
return {"min": rtt_min, "avg": rtt_avg, "max": rtt_max, "mdev": rtt_mdev}
except asyncio.TimeoutError:
_LOGGER.exception(
"Timed out running command: `%s`, after: %ss",
self._ping_cmd,
self._count + PING_TIMEOUT,
)
if pinger:
with suppress(TypeError):
await pinger.kill() # type: ignore[func-returns-value]
del pinger
return None
except AttributeError:
return None
async def async_update(self) -> None:
"""Retrieve the latest details from the host."""
self.data = await self.async_ping()
self.is_alive = self.data is not None