Update azure_event_hub (#31448)

This commit is contained in:
Eduard van Valkenburg 2020-06-03 10:32:14 +02:00 committed by GitHub
parent a8e7bf6cf7
commit 5e2b87866e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 196 additions and 49 deletions

View File

@ -1,89 +1,223 @@
"""Support for Azure Event Hubs."""
import asyncio
import json
import logging
import time
from typing import Any, Dict
from azure.eventhub import EventData, EventHubClientAsync
from azure.eventhub import EventData
from azure.eventhub.aio import EventHubProducerClient, EventHubSharedKeyCredential
from azure.eventhub.exceptions import EventHubError
import voluptuous as vol
from homeassistant.const import (
EVENT_HOMEASSISTANT_STOP,
EVENT_STATE_CHANGED,
MATCH_ALL,
STATE_UNAVAILABLE,
STATE_UNKNOWN,
)
from homeassistant.core import Event, HomeAssistant
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entityfilter import FILTER_SCHEMA
from homeassistant.helpers.event import async_call_later
from homeassistant.helpers.json import JSONEncoder
from .const import (
ADDITIONAL_ARGS,
CONF_EVENT_HUB_CON_STRING,
CONF_EVENT_HUB_INSTANCE_NAME,
CONF_EVENT_HUB_NAMESPACE,
CONF_EVENT_HUB_SAS_KEY,
CONF_EVENT_HUB_SAS_POLICY,
CONF_FILTER,
CONF_MAX_DELAY,
CONF_SEND_INTERVAL,
DOMAIN,
)
_LOGGER = logging.getLogger(__name__)
DOMAIN = "azure_event_hub"
CONF_EVENT_HUB_NAMESPACE = "event_hub_namespace"
CONF_EVENT_HUB_INSTANCE_NAME = "event_hub_instance_name"
CONF_EVENT_HUB_SAS_POLICY = "event_hub_sas_policy"
CONF_EVENT_HUB_SAS_KEY = "event_hub_sas_key"
CONF_FILTER = "filter"
CONFIG_SCHEMA = vol.Schema(
{
DOMAIN: vol.Schema(
{
vol.Required(CONF_EVENT_HUB_NAMESPACE): cv.string,
vol.Required(CONF_EVENT_HUB_INSTANCE_NAME): cv.string,
vol.Required(CONF_EVENT_HUB_SAS_POLICY): cv.string,
vol.Required(CONF_EVENT_HUB_SAS_KEY): cv.string,
vol.Required(CONF_FILTER): FILTER_SCHEMA,
}
vol.Exclusive(CONF_EVENT_HUB_CON_STRING, "setup_methods"): cv.string,
vol.Exclusive(CONF_EVENT_HUB_NAMESPACE, "setup_methods"): cv.string,
vol.Optional(CONF_EVENT_HUB_INSTANCE_NAME): cv.string,
vol.Optional(CONF_EVENT_HUB_SAS_POLICY): cv.string,
vol.Optional(CONF_EVENT_HUB_SAS_KEY): cv.string,
vol.Optional(CONF_SEND_INTERVAL, default=5): cv.positive_int,
vol.Optional(CONF_MAX_DELAY, default=30): cv.positive_int,
vol.Optional(CONF_FILTER, default={}): FILTER_SCHEMA,
},
cv.has_at_least_one_key(
CONF_EVENT_HUB_CON_STRING, CONF_EVENT_HUB_NAMESPACE
),
)
},
extra=vol.ALLOW_EXTRA,
)
async def async_setup(hass: HomeAssistant, yaml_config: Dict[str, Any]):
async def async_setup(hass, yaml_config):
"""Activate Azure EH component."""
config = yaml_config[DOMAIN]
if config.get(CONF_EVENT_HUB_CON_STRING):
client_args = {"conn_str": config[CONF_EVENT_HUB_CON_STRING]}
conn_str_client = True
else:
client_args = {
"fully_qualified_namespace": f"{config[CONF_EVENT_HUB_NAMESPACE]}.servicebus.windows.net",
"credential": EventHubSharedKeyCredential(
policy=config[CONF_EVENT_HUB_SAS_POLICY],
key=config[CONF_EVENT_HUB_SAS_KEY],
),
"eventhub_name": config[CONF_EVENT_HUB_INSTANCE_NAME],
}
conn_str_client = False
event_hub_address = (
f"amqps://{config[CONF_EVENT_HUB_NAMESPACE]}"
f".servicebus.windows.net/{config[CONF_EVENT_HUB_INSTANCE_NAME]}"
instance = hass.data[DOMAIN] = AzureEventHub(
hass,
client_args,
conn_str_client,
config[CONF_FILTER],
config[CONF_SEND_INTERVAL],
config[CONF_MAX_DELAY],
)
entities_filter = config[CONF_FILTER]
client = EventHubClientAsync(
event_hub_address,
debug=True,
username=config[CONF_EVENT_HUB_SAS_POLICY],
password=config[CONF_EVENT_HUB_SAS_KEY],
)
async_sender = client.add_async_sender()
await client.run_async()
hass.async_create_task(instance.async_start())
return True
encoder = JSONEncoder()
async def async_send_to_event_hub(event: Event):
"""Send states to Event Hub."""
class AzureEventHub:
"""A event handler class for Azure Event Hub."""
def __init__(
self,
hass: HomeAssistant,
client_args: Dict[str, Any],
conn_str_client: bool,
entities_filter: vol.Schema,
send_interval: int,
max_delay: int,
):
"""Initialize the listener."""
self.hass = hass
self.queue = asyncio.PriorityQueue()
self._client_args = client_args
self._conn_str_client = conn_str_client
self._entities_filter = entities_filter
self._send_interval = send_interval
self._max_delay = max_delay + send_interval
self._listener_remover = None
self._next_send_remover = None
self.shutdown = False
async def async_start(self):
"""Start the recorder, suppress logging and register the callbacks and do the first send after five seconds, to capture the startup events."""
# suppress the INFO and below logging on the underlying packages, they are very verbose, even at INFO
logging.getLogger("uamqp").setLevel(logging.WARNING)
logging.getLogger("azure.eventhub").setLevel(logging.WARNING)
self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, self.async_shutdown)
self._listener_remover = self.hass.bus.async_listen(
MATCH_ALL, self.async_listen
)
# schedule the first send after 10 seconds to capture startup events, after that each send will schedule the next after the interval.
self._next_send_remover = async_call_later(self.hass, 10, self.async_send)
async def async_shutdown(self, _: Event):
"""Shut down the AEH by queueing None and calling send."""
if self._next_send_remover:
self._next_send_remover()
if self._listener_remover:
self._listener_remover()
await self.queue.put((3, (time.monotonic(), None)))
await self.async_send(None)
async def async_listen(self, event: Event):
"""Listen for new messages on the bus and queue them for AEH."""
await self.queue.put((2, (time.monotonic(), event)))
async def async_send(self, _):
"""Write preprocessed events to eventhub, with retry."""
client = self._get_client()
async with client:
while not self.queue.empty():
data_batch, dequeue_count = await self.fill_batch(client)
_LOGGER.debug(
"Sending %d event(s), out of %d events in the queue",
len(data_batch),
dequeue_count,
)
if data_batch:
try:
await client.send_batch(data_batch)
except EventHubError as exc:
_LOGGER.error("Error in sending events to Event Hub: %s", exc)
finally:
for _ in range(dequeue_count):
self.queue.task_done()
await client.close()
if not self.shutdown:
self._next_send_remover = async_call_later(
self.hass, self._send_interval, self.async_send
)
async def fill_batch(self, client):
"""Return a batch of events formatted for writing.
Uses get_nowait instead of await get, because the functions batches and doesn't wait for each single event, the send function is called.
Throws ValueError on add to batch when the EventDataBatch object reaches max_size. Put the item back in the queue and the next batch will include it.
"""
event_batch = await client.create_batch()
dequeue_count = 0
dropped = 0
while not self.shutdown:
try:
_, (timestamp, event) = self.queue.get_nowait()
except asyncio.QueueEmpty:
break
dequeue_count += 1
if not event:
self.shutdown = True
break
event_data = self._event_to_filtered_event_data(event)
if not event_data:
continue
if time.monotonic() - timestamp <= self._max_delay:
try:
event_batch.add(event_data)
except ValueError:
self.queue.put_nowait((1, (timestamp, event)))
break
else:
dropped += 1
if dropped:
_LOGGER.warning(
"Dropped %d old events, consider increasing the max_delay", dropped
)
return event_batch, dequeue_count
def _event_to_filtered_event_data(self, event: Event):
"""Filter event states and create EventData object."""
state = event.data.get("new_state")
if (
state is None
or state.state in (STATE_UNKNOWN, "", STATE_UNAVAILABLE)
or not entities_filter(state.entity_id)
or not self._entities_filter(state.entity_id)
):
return
return None
return EventData(json.dumps(obj=state, cls=JSONEncoder).encode("utf-8"))
event_data = EventData(
json.dumps(obj=state.as_dict(), default=encoder.encode).encode("utf-8")
)
await async_sender.send(event_data)
async def async_shutdown(event: Event):
"""Shut down the client."""
await client.stop_async()
hass.bus.async_listen(EVENT_STATE_CHANGED, async_send_to_event_hub)
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, async_shutdown)
return True
def _get_client(self):
"""Get a Event Producer Client."""
if self._conn_str_client:
return EventHubProducerClient.from_connection_string(
**self._client_args, **ADDITIONAL_ARGS
)
return EventHubProducerClient(**self._client_args, **ADDITIONAL_ARGS)

View File

@ -0,0 +1,13 @@
"""Constants and shared schema for the Azure Event Hub integration."""
DOMAIN = "azure_event_hub"
CONF_EVENT_HUB_NAMESPACE = "event_hub_namespace"
CONF_EVENT_HUB_INSTANCE_NAME = "event_hub_instance_name"
CONF_EVENT_HUB_SAS_POLICY = "event_hub_sas_policy"
CONF_EVENT_HUB_SAS_KEY = "event_hub_sas_key"
CONF_EVENT_HUB_CON_STRING = "event_hub_connection_string"
CONF_SEND_INTERVAL = "send_interval"
CONF_MAX_DELAY = "max_delay"
CONF_FILTER = "filter"
ADDITIONAL_ARGS = {"logging_enable": False}

View File

@ -2,6 +2,6 @@
"domain": "azure_event_hub",
"name": "Azure Event Hub",
"documentation": "https://www.home-assistant.io/integrations/azure_event_hub",
"requirements": ["azure-eventhub==1.3.1"],
"requirements": ["azure-eventhub==5.1.0"],
"codeowners": ["@eavanvalkenburg"]
}

View File

@ -309,7 +309,7 @@ avri-api==0.1.7
axis==29
# homeassistant.components.azure_event_hub
azure-eventhub==1.3.1
azure-eventhub==5.1.0
# homeassistant.components.azure_service_bus
azure-servicebus==0.50.1