Add Minio component (#23567)

* Add minio implementation

* Static check changes

* Added docstrings

* Update docstrings

* Update docstrings

* Fix linter errors

* Finally fix all docstring errors

* Create services.yaml

* Update CODEOWNERS

* Final changes

* Remove double underscores

* Minor changes

* Update config.yml

* Review changes

* Added tests

* Fix lint errors

* Move tests from unittest to pytest

* Add minio as test requirement

* Update test_minio_helper.py

* Better event thread handling, added hass test

* Update tests

* Fixed lint errors

* Update test_minio.py

* Review changes

* More review changes

* Removed tests

* Applied code style changes

* Reformat test code
This commit is contained in:
Tomas Kislan 2019-08-20 11:56:11 +02:00 committed by Martin Hjelmare
parent 93a800a612
commit eee2b2d543
12 changed files with 769 additions and 0 deletions

View File

@ -378,6 +378,7 @@ omit =
homeassistant/components/miflora/sensor.py
homeassistant/components/mikrotik/*
homeassistant/components/mill/climate.py
homeassistant/components/minio/*
homeassistant/components/mitemp_bt/sensor.py
homeassistant/components/mjpeg/camera.py
homeassistant/components/mobile_app/*

View File

@ -173,6 +173,7 @@ homeassistant/components/meteoalarm/* @rolfberkenbosch
homeassistant/components/miflora/* @danielhiversen @ChristianKuehnel
homeassistant/components/mill/* @danielhiversen
homeassistant/components/min_max/* @fabaff
homeassistant/components/minio/* @tkislan
homeassistant/components/mobile_app/* @robbiet480
homeassistant/components/monoprice/* @etsinko
homeassistant/components/moon/* @fabaff

View File

@ -0,0 +1,265 @@
"""Minio component."""
import logging
import os
import threading
from queue import Queue
from typing import List
import voluptuous as vol
from homeassistant.const import EVENT_HOMEASSISTANT_START, EVENT_HOMEASSISTANT_STOP
import homeassistant.helpers.config_validation as cv
from .minio_helper import create_minio_client, MinioEventThread
_LOGGER = logging.getLogger(__name__)
DOMAIN = "minio"
CONF_HOST = "host"
CONF_PORT = "port"
CONF_ACCESS_KEY = "access_key"
CONF_SECRET_KEY = "secret_key"
CONF_SECURE = "secure"
CONF_LISTEN = "listen"
CONF_LISTEN_BUCKET = "bucket"
CONF_LISTEN_PREFIX = "prefix"
CONF_LISTEN_SUFFIX = "suffix"
CONF_LISTEN_EVENTS = "events"
ATTR_BUCKET = "bucket"
ATTR_KEY = "key"
ATTR_FILE_PATH = "file_path"
DEFAULT_LISTEN_PREFIX = ""
DEFAULT_LISTEN_SUFFIX = ".*"
DEFAULT_LISTEN_EVENTS = "s3:ObjectCreated:*"
CONFIG_SCHEMA = vol.Schema(
{
DOMAIN: vol.Schema(
{
vol.Required(CONF_HOST): cv.string,
vol.Required(CONF_PORT): cv.port,
vol.Required(CONF_ACCESS_KEY): cv.string,
vol.Required(CONF_SECRET_KEY): cv.string,
vol.Required(CONF_SECURE): cv.boolean,
vol.Optional(CONF_LISTEN, default=[]): vol.All(
cv.ensure_list,
[
vol.Schema(
{
vol.Required(CONF_LISTEN_BUCKET): cv.string,
vol.Optional(
CONF_LISTEN_PREFIX, default=DEFAULT_LISTEN_PREFIX
): cv.string,
vol.Optional(
CONF_LISTEN_SUFFIX, default=DEFAULT_LISTEN_SUFFIX
): cv.string,
vol.Optional(
CONF_LISTEN_EVENTS, default=DEFAULT_LISTEN_EVENTS
): cv.string,
}
)
],
),
}
)
},
extra=vol.ALLOW_EXTRA,
)
BUCKET_KEY_SCHEMA = vol.Schema(
{vol.Required(ATTR_BUCKET): cv.template, vol.Required(ATTR_KEY): cv.template}
)
BUCKET_KEY_FILE_SCHEMA = BUCKET_KEY_SCHEMA.extend(
{vol.Required(ATTR_FILE_PATH): cv.template}
)
def setup(hass, config):
"""Set up MinioClient and event listeners."""
conf = config[DOMAIN]
host = conf[CONF_HOST]
port = conf[CONF_PORT]
access_key = conf[CONF_ACCESS_KEY]
secret_key = conf[CONF_SECRET_KEY]
secure = conf[CONF_SECURE]
queue_listener = QueueListener(hass)
queue = queue_listener.queue
hass.bus.listen_once(EVENT_HOMEASSISTANT_START, queue_listener.start_handler)
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, queue_listener.stop_handler)
def _setup_listener(listener_conf):
bucket = listener_conf[CONF_LISTEN_BUCKET]
prefix = listener_conf[CONF_LISTEN_PREFIX]
suffix = listener_conf[CONF_LISTEN_SUFFIX]
events = listener_conf[CONF_LISTEN_EVENTS]
minio_listener = MinioListener(
queue,
get_minio_endpoint(host, port),
access_key,
secret_key,
secure,
bucket,
prefix,
suffix,
events,
)
hass.bus.listen_once(EVENT_HOMEASSISTANT_START, minio_listener.start_handler)
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, minio_listener.stop_handler)
for listen_conf in conf[CONF_LISTEN]:
_setup_listener(listen_conf)
minio_client = create_minio_client(
get_minio_endpoint(host, port), access_key, secret_key, secure
)
def _render_service_value(service, key):
value = service.data[key]
value.hass = hass
return value.async_render()
def put_file(service):
"""Upload file service."""
bucket = _render_service_value(service, ATTR_BUCKET)
key = _render_service_value(service, ATTR_KEY)
file_path = _render_service_value(service, ATTR_FILE_PATH)
if not hass.config.is_allowed_path(file_path):
_LOGGER.error("Invalid file_path %s", file_path)
return
minio_client.fput_object(bucket, key, file_path)
def get_file(service):
"""Download file service."""
bucket = _render_service_value(service, ATTR_BUCKET)
key = _render_service_value(service, ATTR_KEY)
file_path = _render_service_value(service, ATTR_FILE_PATH)
if not hass.config.is_allowed_path(file_path):
_LOGGER.error("Invalid file_path %s", file_path)
return
minio_client.fget_object(bucket, key, file_path)
def remove_file(service):
"""Delete file service."""
bucket = _render_service_value(service, ATTR_BUCKET)
key = _render_service_value(service, ATTR_KEY)
minio_client.remove_object(bucket, key)
hass.services.register(DOMAIN, "put", put_file, schema=BUCKET_KEY_FILE_SCHEMA)
hass.services.register(DOMAIN, "get", get_file, schema=BUCKET_KEY_FILE_SCHEMA)
hass.services.register(DOMAIN, "remove", remove_file, schema=BUCKET_KEY_SCHEMA)
return True
def get_minio_endpoint(host: str, port: int) -> str:
"""Create minio endpoint from host and port."""
return "{}:{}".format(host, port)
class QueueListener(threading.Thread):
"""Forward events from queue into HASS event bus."""
def __init__(self, hass):
"""Create queue."""
super().__init__()
self._hass = hass
self._queue = Queue()
def run(self):
"""Listen to queue events, and forward them to HASS event bus."""
_LOGGER.info("Running QueueListener")
while True:
event = self._queue.get()
if event is None:
break
_, file_name = os.path.split(event[ATTR_KEY])
_LOGGER.debug(
"Sending event %s, %s, %s",
event["event_name"],
event[ATTR_BUCKET],
event[ATTR_KEY],
)
self._hass.bus.fire(DOMAIN, {"file_name": file_name, **event})
@property
def queue(self):
"""Return wrapped queue."""
return self._queue
def stop(self):
"""Stop run by putting None into queue and join the thread."""
_LOGGER.info("Stopping QueueListener")
self._queue.put(None)
self.join()
_LOGGER.info("Stopped QueueListener")
def start_handler(self, _):
"""Start handler helper method."""
self.start()
def stop_handler(self, _):
"""Stop handler helper method."""
self.stop()
class MinioListener:
"""MinioEventThread wrapper with helper methods."""
def __init__(
self,
queue: Queue,
endpoint: str,
access_key: str,
secret_key: str,
secure: bool,
bucket_name: str,
prefix: str,
suffix: str,
events: List[str],
):
"""Create Listener."""
self._queue = queue
self._endpoint = endpoint
self._access_key = access_key
self._secret_key = secret_key
self._secure = secure
self._bucket_name = bucket_name
self._prefix = prefix
self._suffix = suffix
self._events = events
self._minio_event_thread = None
def start_handler(self, _):
"""Create and start the event thread."""
self._minio_event_thread = MinioEventThread(
self._queue,
self._endpoint,
self._access_key,
self._secret_key,
self._secure,
self._bucket_name,
self._prefix,
self._suffix,
self._events,
)
self._minio_event_thread.start()
def stop_handler(self, _):
"""Issue stop and wait for thread to join."""
if self._minio_event_thread is not None:
self._minio_event_thread.stop()

View File

@ -0,0 +1,12 @@
{
"domain": "minio",
"name": "Minio",
"documentation": "https://www.home-assistant.io/components/minio",
"requirements": [
"minio==4.0.9"
],
"dependencies": [],
"codeowners": [
"@tkislan"
]
}

View File

@ -0,0 +1,209 @@
"""Minio helper methods."""
import time
from collections.abc import Iterable
import json
import logging
import re
import threading
from queue import Queue
from typing import Iterator, List
from urllib.parse import unquote
from minio import Minio
from urllib3.exceptions import HTTPError
_LOGGER = logging.getLogger(__name__)
_METADATA_RE = re.compile("x-amz-meta-(.*)", re.IGNORECASE)
def normalize_metadata(metadata: dict) -> dict:
"""Normalize object metadata by stripping the prefix."""
new_metadata = {}
for meta_key, meta_value in metadata.items():
match = _METADATA_RE.match(meta_key)
if not match:
continue
new_metadata[match.group(1).lower()] = meta_value
return new_metadata
def create_minio_client(
endpoint: str, access_key: str, secret_key: str, secure: bool
) -> Minio:
"""Create Minio client."""
return Minio(endpoint, access_key, secret_key, secure)
def get_minio_notification_response(
minio_client, bucket_name: str, prefix: str, suffix: str, events: List[str]
):
"""Start listening to minio events. Copied from minio-py."""
query = {"prefix": prefix, "suffix": suffix, "events": events}
# pylint: disable=protected-access
return minio_client._url_open(
"GET", bucket_name=bucket_name, query=query, preload_content=False
)
class MinioEventStreamIterator(Iterable):
"""Iterator wrapper over notification http response stream."""
def __iter__(self) -> Iterator:
"""Return self."""
return self
def __init__(self, response):
"""Init."""
self._response = response
self._stream = response.stream()
def __next__(self):
"""Get next not empty line."""
while True:
line = next(self._stream)
if line.strip():
event = json.loads(line.decode("utf-8"))
if event["Records"] is not None:
return event
def close(self):
"""Close the response."""
self._response.close()
class MinioEventThread(threading.Thread):
"""Thread wrapper around minio notification blocking stream."""
def __init__(
self,
queue: Queue,
endpoint: str,
access_key: str,
secret_key: str,
secure: bool,
bucket_name: str,
prefix: str,
suffix: str,
events: List[str],
):
"""Copy over all Minio client options."""
super().__init__()
self._queue = queue
self._endpoint = endpoint
self._access_key = access_key
self._secret_key = secret_key
self._secure = secure
self._bucket_name = bucket_name
self._prefix = prefix
self._suffix = suffix
self._events = events
self._event_stream_it = None
self._should_stop = False
def __enter__(self):
"""Start the thread."""
self.start()
def __exit__(self, exc_type, exc_val, exc_tb):
"""Stop and join the thread."""
self.stop()
def run(self):
"""Create MinioClient and run the loop."""
_LOGGER.info("Running MinioEventThread")
self._should_stop = False
minio_client = create_minio_client(
self._endpoint, self._access_key, self._secret_key, self._secure
)
while not self._should_stop:
_LOGGER.info("Connecting to minio event stream")
response = None
try:
response = get_minio_notification_response(
minio_client,
self._bucket_name,
self._prefix,
self._suffix,
self._events,
)
self._event_stream_it = MinioEventStreamIterator(response)
self._iterate_event_stream(self._event_stream_it, minio_client)
except json.JSONDecodeError:
if response:
response.close()
except HTTPError as error:
_LOGGER.error("Failed to connect to Minio endpoint: %s", error)
# Wait before attempting to connect again.
time.sleep(1)
except AttributeError:
# When response is closed, iterator will fail to access
# the underlying socket descriptor.
break
def _iterate_event_stream(self, event_stream_it, minio_client):
for event in event_stream_it:
for event_name, bucket, key, metadata in iterate_objects(event):
presigned_url = ""
try:
presigned_url = minio_client.presigned_get_object(bucket, key)
# Fail gracefully. If for whatever reason this stops working,
# it shouldn't prevent it from firing events.
# pylint: disable=broad-except
except Exception as error:
_LOGGER.error("Failed to generate presigned url: %s", error)
queue_entry = {
"event_name": event_name,
"bucket": bucket,
"key": key,
"presigned_url": presigned_url,
"metadata": metadata,
}
_LOGGER.debug("Queue entry, %s", queue_entry)
self._queue.put(queue_entry)
def stop(self):
"""Cancel event stream and join the thread."""
_LOGGER.debug("Stopping event thread")
self._should_stop = True
if self._event_stream_it is not None:
self._event_stream_it.close()
self._event_stream_it = None
_LOGGER.debug("Joining event thread")
self.join()
_LOGGER.debug("Event thread joined")
def iterate_objects(event):
"""
Iterate over file records of notification event.
Most of the time it should still be only one record.
"""
records = event.get("Records", [])
for record in records:
event_name = record.get("eventName")
bucket = record.get("s3", {}).get("bucket", {}).get("name")
key = record.get("s3", {}).get("object", {}).get("key")
metadata = normalize_metadata(
record.get("s3", {}).get("object", {}).get("userMetadata", {})
)
if not bucket or not key:
_LOGGER.warning("Invalid bucket and/or key, %s, %s", bucket, key)
continue
key = unquote(key)
yield event_name, bucket, key, metadata

View File

@ -0,0 +1,35 @@
get:
description: Download file from Minio.
fields:
bucket:
description: Bucket to use.
example: camera-files
key:
description: Object key of the file.
example: front_camera/2018/01/02/snapshot_12512514.jpg
file_path:
description: File path on local filesystem.
example: /data/camera_files/snapshot.jpg
put:
description: Upload file to Minio.
fields:
bucket:
description: Bucket to use.
example: camera-files
key:
description: Object key of the file.
example: front_camera/2018/01/02/snapshot_12512514.jpg
file_path:
description: File path on local filesystem.
example: /data/camera_files/snapshot.jpg
remove:
description: Delete file from Minio.
fields:
bucket:
description: Bucket to use.
example: camera-files
key:
description: Object key of the file.
example: front_camera/2018/01/02/snapshot_12512514.jpg

View File

@ -797,6 +797,9 @@ miflora==0.4.0
# homeassistant.components.mill
millheater==0.3.4
# homeassistant.components.minio
minio==4.0.9
# homeassistant.components.mitemp_bt
mitemp_bt==0.0.1

View File

@ -213,6 +213,9 @@ mbddns==0.1.2
# homeassistant.components.mfi
mficlient==0.3.0
# homeassistant.components.minio
minio==4.0.9
# homeassistant.components.discovery
# homeassistant.components.ssdp
netdisco==2.6.0

View File

@ -99,6 +99,7 @@ TEST_REQUIREMENTS = (
"pyMetno",
"mbddns",
"mficlient",
"minio",
"netdisco",
"numpy",
"oauth2client",

View File

@ -0,0 +1 @@
"""Tests for the minio component."""

View File

@ -0,0 +1,48 @@
"""Minio Test event."""
TEST_EVENT = {
"Records": [
{
"eventVersion": "2.0",
"eventSource": "minio:s3",
"awsRegion": "",
"eventTime": "2019-05-02T11:05:07Z",
"eventName": "s3:ObjectCreated:Put",
"userIdentity": {"principalId": "SO9KNO6YT9OGE39PQCZW"},
"requestParameters": {
"accessKey": "SO9KNO6YT9OGE39PQCZW",
"region": "",
"sourceIPAddress": "172.27.0.1",
},
"responseElements": {
"x-amz-request-id": "159AD8E6F6805783",
"x-minio-deployment-id": "90b265b8-bac5-413a-b12a-8915469fd769",
"x-minio-origin-endpoint": "http://172.27.0.2:9000",
},
"s3": {
"s3SchemaVersion": "1.0",
"configurationId": "Config",
"bucket": {
"name": "test",
"ownerIdentity": {"principalId": "SO9KNO6YT9OGE39PQCZW"},
"arn": "arn:aws:s3:::test",
},
"object": {
"key": "5jJkTAo.jpg",
"size": 108368,
"eTag": "1af324731637228cbbb0b2e8c07d4e50",
"contentType": "image/jpeg",
"userMetadata": {"content-type": "image/jpeg"},
"versionId": "1",
"sequencer": "159AD8E6F76DD9C4",
},
},
"source": {
"host": "",
"port": "",
"userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_3) "
"AppleWebKit/605.1.15 (KHTML, like Gecko) "
"Version/12.0.3 Safari/605.1.15",
},
}
]
}

View File

@ -0,0 +1,190 @@
"""Tests for Minio Hass related code."""
import asyncio
import json
from unittest.mock import MagicMock
import pytest
from asynctest import patch, call
from homeassistant.components.minio import (
QueueListener,
DOMAIN,
CONF_HOST,
CONF_PORT,
CONF_ACCESS_KEY,
CONF_SECRET_KEY,
CONF_SECURE,
CONF_LISTEN,
CONF_LISTEN_BUCKET,
)
from homeassistant.core import callback
from homeassistant.setup import async_setup_component
from tests.components.minio.common import TEST_EVENT
@pytest.fixture(name="minio_client")
def minio_client_fixture():
"""Patch Minio client."""
with patch("homeassistant.components.minio.minio_helper.Minio") as minio_mock:
minio_client_mock = minio_mock.return_value
yield minio_client_mock
@pytest.fixture(name="minio_client_event")
def minio_client_event_fixture():
"""Patch helper function for minio notification stream."""
with patch("homeassistant.components.minio.minio_helper.Minio") as minio_mock:
minio_client_mock = minio_mock.return_value
response_mock = MagicMock()
stream_mock = MagicMock()
stream_mock.__next__.side_effect = [
"",
"",
bytearray(json.dumps(TEST_EVENT), "utf-8"),
]
response_mock.stream.return_value = stream_mock
minio_client_mock._url_open.return_value = response_mock
yield minio_client_mock
async def test_minio_services(hass, caplog, minio_client):
"""Test Minio services."""
hass.config.whitelist_external_dirs = set("/tmp")
await async_setup_component(
hass,
DOMAIN,
{
DOMAIN: {
CONF_HOST: "localhost",
CONF_PORT: "9000",
CONF_ACCESS_KEY: "abcdef",
CONF_SECRET_KEY: "0123456789",
CONF_SECURE: "true",
}
},
)
await hass.async_start()
await hass.async_block_till_done()
assert "Setup of domain minio took" in caplog.text
# Call services
await hass.services.async_call(
DOMAIN,
"put",
{"file_path": "/tmp/some_file", "key": "some_key", "bucket": "some_bucket"},
blocking=True,
)
assert minio_client.fput_object.call_args == call(
"some_bucket", "some_key", "/tmp/some_file"
)
minio_client.reset_mock()
await hass.services.async_call(
DOMAIN,
"get",
{"file_path": "/tmp/some_file", "key": "some_key", "bucket": "some_bucket"},
blocking=True,
)
assert minio_client.fget_object.call_args == call(
"some_bucket", "some_key", "/tmp/some_file"
)
minio_client.reset_mock()
await hass.services.async_call(
DOMAIN, "remove", {"key": "some_key", "bucket": "some_bucket"}, blocking=True
)
assert minio_client.remove_object.call_args == call("some_bucket", "some_key")
minio_client.reset_mock()
async def test_minio_listen(hass, caplog, minio_client_event):
"""Test minio listen on notifications."""
minio_client_event.presigned_get_object.return_value = "http://url"
events = []
@callback
def event_callback(event):
"""Handle event callbback."""
events.append(event)
hass.bus.async_listen("minio", event_callback)
await async_setup_component(
hass,
DOMAIN,
{
DOMAIN: {
CONF_HOST: "localhost",
CONF_PORT: "9000",
CONF_ACCESS_KEY: "abcdef",
CONF_SECRET_KEY: "0123456789",
CONF_SECURE: "true",
CONF_LISTEN: [{CONF_LISTEN_BUCKET: "test"}],
}
},
)
await hass.async_start()
await hass.async_block_till_done()
assert "Setup of domain minio took" in caplog.text
while not events:
await asyncio.sleep(0)
assert 1 == len(events)
event = events[0]
assert DOMAIN == event.event_type
assert "s3:ObjectCreated:Put" == event.data["event_name"]
assert "5jJkTAo.jpg" == event.data["file_name"]
assert "test" == event.data["bucket"]
assert "5jJkTAo.jpg" == event.data["key"]
assert "http://url" == event.data["presigned_url"]
assert 0 == len(event.data["metadata"])
async def test_queue_listener():
"""Tests QueueListener firing events on Hass event bus."""
hass = MagicMock()
queue_listener = QueueListener(hass)
queue_listener.start()
queue_entry = {
"event_name": "s3:ObjectCreated:Put",
"bucket": "some_bucket",
"key": "some_dir/some_file.jpg",
"presigned_url": "http://host/url?signature=secret",
"metadata": {},
}
queue_listener.queue.put(queue_entry)
queue_listener.stop()
call_domain, call_event = hass.bus.fire.call_args[0]
expected_event = {
"event_name": "s3:ObjectCreated:Put",
"file_name": "some_file.jpg",
"bucket": "some_bucket",
"key": "some_dir/some_file.jpg",
"presigned_url": "http://host/url?signature=secret",
"metadata": {},
}
assert DOMAIN == call_domain
assert json.dumps(expected_event, sort_keys=True) == json.dumps(
call_event, sort_keys=True
)