Revert "Simplified"

This reverts commit fde4624e07.
This commit is contained in:
Aaron Bach 2019-07-11 12:54:18 -06:00
parent fde4624e07
commit 5ae57e64c2
1 changed files with 35 additions and 64 deletions

View File

@ -1,11 +1,8 @@
"""Support for Apache Kafka."""
import asyncio
from datetime import datetime
import json
import logging
from aiokafka import AIOKafkaProducer
from aiokafka.errors import KafkaError
import voluptuous as vol
from homeassistant.const import (
@ -31,24 +28,46 @@ CONFIG_SCHEMA = vol.Schema({
}, extra=vol.ALLOW_EXTRA)
async def async_setup(hass, config):
def setup(hass, config):
"""Activate the Apache Kafka integration."""
from aiokafka import AIOKafkaProducer
conf = config[DOMAIN]
topic_name = conf[CONF_TOPIC]
entities_filter = conf[CONF_FILTER]
kafka = hass.data[DOMAIN] = KafkaManager(
hass,
conf[CONF_IP_ADDRESS],
conf[CONF_PORT],
conf[CONF_TOPIC],
conf[CONF_FILTER])
producer = AIOKafkaProducer(
loop=hass.loop,
bootstrap_servers="{0}:{1}".format(
conf[CONF_IP_ADDRESS], conf[CONF_PORT]),
compression_type="gzip",
)
hass.bus.listen(EVENT_HOMEASSISTANT_STOP, kafka.shutdown())
encoder = DateTimeJSONEncoder()
try:
await kafka.start()
except asyncio.TimeoutError:
_LOGGER.error('Timed out while connecting to Kafka')
return False
async def send_to_pubsub(event):
"""Send states to Pub/Sub."""
await producer.start()
state = event.data.get('new_state')
if (state is None
or state.state in (STATE_UNKNOWN, '', STATE_UNAVAILABLE)
or not entities_filter(state.entity_id)):
return
as_dict = state.as_dict()
data = json.dumps(
obj=as_dict,
default=encoder.encode
).encode('utf-8')
try:
await producer.send_and_wait(topic_name, data)
finally:
producer.stop()
hass.bus.listen(EVENT_HOMEASSISTANT_STOP, producer.stop())
hass.bus.listen(EVENT_STATE_CHANGED, send_to_pubsub)
return True
@ -64,51 +83,3 @@ class DateTimeJSONEncoder(json.JSONEncoder):
if isinstance(o, datetime):
return o.isoformat()
return super().default(o)
class KafkaManager:
"""Define a manager to buffer events to Kafka."""
def __init__(
self,
hass,
ip_address,
port,
topic,
entities_filter):
"""Initialize."""
self._encoder = DateTimeJSONEncoder()
self._entities_filter = entities_filter
self._producer = AIOKafkaProducer(
loop=hass.loop,
bootstrap_servers="{0}:{1}".format(ip_address, port),
compression_type="gzip",
)
self._topic = topic
hass.bus.listen(EVENT_STATE_CHANGED, self._write_to_kafka)
def _encode_event(self, event):
"""Translate events into a binary JSON payload."""
state = event.data.get('new_state')
if (state is None
or state.state in (STATE_UNKNOWN, '', STATE_UNAVAILABLE)
or not self._entities_filter(state.entity_id)):
return
return json.dumps(
obj=state.as_dict(),
default=self._encoder.encode
).encode('utf-8')
async def _write_to_kafka(self, event):
"""Write a binary payload to Kafka."""
await self._producer.send_and_wait(self._topic, event)
async def start(self):
"""Start the Kafka manager."""
asyncio.wait_for(self._producer.start(), timeout=5)
async def shutdown(self):
"""Shut the manager down."""
await self._producer.stop()