diff --git a/homeassistant/components/stream/__init__.py b/homeassistant/components/stream/__init__.py index b842eb7fb78..ef68ea7bcae 100644 --- a/homeassistant/components/stream/__init__.py +++ b/homeassistant/components/stream/__init__.py @@ -57,9 +57,15 @@ from .const import ( SOURCE_TIMEOUT, STREAM_RESTART_INCREMENT, STREAM_RESTART_RESET_TIME, - TARGET_SEGMENT_DURATION_NON_LL_HLS, ) -from .core import PROVIDERS, IdleTimer, KeyFrameConverter, StreamOutput, StreamSettings +from .core import ( + PROVIDERS, + STREAM_SETTINGS_NON_LL_HLS, + IdleTimer, + KeyFrameConverter, + StreamOutput, + StreamSettings, +) from .diagnostics import Diagnostics from .hls import HlsStreamOutput, async_setup_hls @@ -224,14 +230,7 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: hls_part_timeout=2 * conf[CONF_PART_DURATION], ) else: - hass.data[DOMAIN][ATTR_SETTINGS] = StreamSettings( - ll_hls=False, - min_segment_duration=TARGET_SEGMENT_DURATION_NON_LL_HLS - - SEGMENT_DURATION_ADJUSTER, - part_target_duration=TARGET_SEGMENT_DURATION_NON_LL_HLS, - hls_advance_part_limit=3, - hls_part_timeout=TARGET_SEGMENT_DURATION_NON_LL_HLS, - ) + hass.data[DOMAIN][ATTR_SETTINGS] = STREAM_SETTINGS_NON_LL_HLS # Setup HLS hls_endpoint = async_setup_hls(hass) diff --git a/homeassistant/components/stream/core.py b/homeassistant/components/stream/core.py index 09d9a9d5031..8c456af91aa 100644 --- a/homeassistant/components/stream/core.py +++ b/homeassistant/components/stream/core.py @@ -5,6 +5,7 @@ import asyncio from collections import deque from collections.abc import Callable, Coroutine, Iterable import datetime +import logging from typing import TYPE_CHECKING, Any from aiohttp import web @@ -16,13 +17,20 @@ from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback from homeassistant.helpers.event import async_call_later from homeassistant.util.decorator import Registry -from .const import ATTR_STREAMS, DOMAIN +from .const import ( + ATTR_STREAMS, + DOMAIN, + SEGMENT_DURATION_ADJUSTER, + TARGET_SEGMENT_DURATION_NON_LL_HLS, +) if TYPE_CHECKING: from av import CodecContext, Packet from . import Stream +_LOGGER = logging.getLogger(__name__) + PROVIDERS: Registry[str, type[StreamOutput]] = Registry() @@ -37,6 +45,15 @@ class StreamSettings: hls_part_timeout: float = attr.ib() +STREAM_SETTINGS_NON_LL_HLS = StreamSettings( + ll_hls=False, + min_segment_duration=TARGET_SEGMENT_DURATION_NON_LL_HLS - SEGMENT_DURATION_ADJUSTER, + part_target_duration=TARGET_SEGMENT_DURATION_NON_LL_HLS, + hls_advance_part_limit=3, + hls_part_timeout=TARGET_SEGMENT_DURATION_NON_LL_HLS, +) + + @attr.s(slots=True) class Part: """Represent a segment part.""" @@ -426,12 +443,22 @@ class KeyFrameConverter: return packet = self.packet self.packet = None - # decode packet (flush afterwards) - frames = self._codec_context.decode(packet) - for _i in range(2): - if frames: + for _ in range(2): # Retry once if codec context needs to be flushed + try: + # decode packet (flush afterwards) + frames = self._codec_context.decode(packet) + for _i in range(2): + if frames: + break + frames = self._codec_context.decode(None) break - frames = self._codec_context.decode(None) + except EOFError: + _LOGGER.debug("Codec context needs flushing, attempting to reopen") + self._codec_context.close() + self._codec_context.open() + else: + _LOGGER.debug("Unable to decode keyframe") + return if frames: frame = frames[0] if width and height: diff --git a/homeassistant/components/stream/fmp4utils.py b/homeassistant/components/stream/fmp4utils.py index f136784cf87..313f5632841 100644 --- a/homeassistant/components/stream/fmp4utils.py +++ b/homeassistant/components/stream/fmp4utils.py @@ -2,6 +2,10 @@ from __future__ import annotations from collections.abc import Generator +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from io import BytesIO def find_box( @@ -135,3 +139,11 @@ def get_codec_string(mp4_bytes: bytes) -> str: codecs.append(codec) return ",".join(codecs) + + +def read_init(bytes_io: BytesIO) -> bytes: + """Read the init from a mp4 file.""" + bytes_io.seek(24) + moov_len = int.from_bytes(bytes_io.read(4), byteorder="big") + bytes_io.seek(0) + return bytes_io.read(24 + moov_len) diff --git a/homeassistant/components/stream/worker.py b/homeassistant/components/stream/worker.py index e46d83542f7..aa49a6de7ae 100644 --- a/homeassistant/components/stream/worker.py +++ b/homeassistant/components/stream/worker.py @@ -5,11 +5,12 @@ from collections import defaultdict, deque from collections.abc import Callable, Generator, Iterator, Mapping import contextlib import datetime -from io import BytesIO +from io import SEEK_END, BytesIO import logging from threading import Event from typing import Any, cast +import attr import av from homeassistant.core import HomeAssistant @@ -24,8 +25,16 @@ from .const import ( SEGMENT_CONTAINER_FORMAT, SOURCE_TIMEOUT, ) -from .core import KeyFrameConverter, Part, Segment, StreamOutput, StreamSettings +from .core import ( + STREAM_SETTINGS_NON_LL_HLS, + KeyFrameConverter, + Part, + Segment, + StreamOutput, + StreamSettings, +) from .diagnostics import Diagnostics +from .fmp4utils import read_init from .hls import HlsStreamOutput _LOGGER = logging.getLogger(__name__) @@ -108,7 +117,7 @@ class StreamMuxer: hass: HomeAssistant, video_stream: av.video.VideoStream, audio_stream: av.audio.stream.AudioStream | None, - audio_bsf: av.BitStreamFilterContext | None, + audio_bsf: av.BitStreamFilter | None, stream_state: StreamState, stream_settings: StreamSettings, ) -> None: @@ -120,6 +129,7 @@ class StreamMuxer: self._input_video_stream: av.video.VideoStream = video_stream self._input_audio_stream: av.audio.stream.AudioStream | None = audio_stream self._audio_bsf = audio_bsf + self._audio_bsf_context: av.BitStreamFilterContext = None self._output_video_stream: av.video.VideoStream = None self._output_audio_stream: av.audio.stream.AudioStream | None = None self._segment: Segment | None = None @@ -151,7 +161,7 @@ class StreamMuxer: **{ # Removed skip_sidx - see https://github.com/home-assistant/core/pull/39970 # "cmaf" flag replaces several of the movflags used, but too recent to use for now - "movflags": "frag_custom+empty_moov+default_base_moof+frag_discont+negative_cts_offsets+skip_trailer", + "movflags": "frag_custom+empty_moov+default_base_moof+frag_discont+negative_cts_offsets+skip_trailer+delay_moov", # Sometimes the first segment begins with negative timestamps, and this setting just # adjusts the timestamps in the output from that segment to start from 0. Helps from # having to make some adjustments in test_durations @@ -164,7 +174,7 @@ class StreamMuxer: # Fragment durations may exceed the 15% allowed variance but it seems ok **( { - "movflags": "empty_moov+default_base_moof+frag_discont+negative_cts_offsets+skip_trailer", + "movflags": "empty_moov+default_base_moof+frag_discont+negative_cts_offsets+skip_trailer+delay_moov", # Create a fragment every TARGET_PART_DURATION. The data from each fragment is stored in # a "Part" that can be combined with the data from all the other "Part"s, plus an init # section, to reconstitute the data in a "Segment". @@ -194,8 +204,11 @@ class StreamMuxer: # Check if audio is requested output_astream = None if input_astream: + if self._audio_bsf: + self._audio_bsf_context = self._audio_bsf.create() + self._audio_bsf_context.set_input_stream(input_astream) output_astream = container.add_stream( - template=self._audio_bsf or input_astream + template=self._audio_bsf_context or input_astream ) return container, output_vstream, output_astream @@ -238,15 +251,29 @@ class StreamMuxer: self._part_has_keyframe |= packet.is_keyframe elif packet.stream == self._input_audio_stream: - if self._audio_bsf: - self._audio_bsf.send(packet) - while packet := self._audio_bsf.recv(): + if self._audio_bsf_context: + self._audio_bsf_context.send(packet) + while packet := self._audio_bsf_context.recv(): packet.stream = self._output_audio_stream self._av_output.mux(packet) return packet.stream = self._output_audio_stream self._av_output.mux(packet) + def create_segment(self) -> None: + """Create a segment when the moov is ready.""" + self._segment = Segment( + sequence=self._stream_state.sequence, + stream_id=self._stream_state.stream_id, + init=read_init(self._memory_file), + # Fetch the latest StreamOutputs, which may have changed since the + # worker started. + stream_outputs=self._stream_state.outputs, + start_time=self._start_time, + ) + self._memory_file_pos = self._memory_file.tell() + self._memory_file.seek(0, SEEK_END) + def check_flush_part(self, packet: av.Packet) -> None: """Check for and mark a part segment boundary and record its duration.""" if self._memory_file_pos == self._memory_file.tell(): @@ -254,16 +281,10 @@ class StreamMuxer: if self._segment is None: # We have our first non-zero byte position. This means the init has just # been written. Create a Segment and put it to the queue of each output. - self._segment = Segment( - sequence=self._stream_state.sequence, - stream_id=self._stream_state.stream_id, - init=self._memory_file.getvalue(), - # Fetch the latest StreamOutputs, which may have changed since the - # worker started. - stream_outputs=self._stream_state.outputs, - start_time=self._start_time, - ) - self._memory_file_pos = self._memory_file.tell() + self.create_segment() + # When using delay_moov, the moov is not written until a moof is also ready + # Flush the moof + self.flush(packet, last_part=False) else: # These are the ends of the part segments self.flush(packet, last_part=False) @@ -297,6 +318,10 @@ class StreamMuxer: # Closing the av_output will write the remaining buffered data to the # memory_file as a new moof/mdat. self._av_output.close() + # With delay_moov, this may be the first time the file pointer has + # moved, so the segment may not yet have been created + if not self._segment: + self.create_segment() elif not self._part_has_keyframe: # Parts which are not the last part or an independent part should # not have durations below 0.85 of the part target duration. @@ -305,6 +330,9 @@ class StreamMuxer: self._part_start_dts + 0.85 * self._stream_settings.part_target_duration / packet.time_base, ) + # Undo dts adjustments if we don't have ll_hls + if not self._stream_settings.ll_hls: + adjusted_dts = packet.dts assert self._segment self._memory_file.seek(self._memory_file_pos) self._hass.loop.call_soon_threadsafe( @@ -445,10 +473,7 @@ def get_audio_bitstream_filter( _LOGGER.debug( "ADTS AAC detected. Adding aac_adtstoaac bitstream filter" ) - bsf = av.BitStreamFilter("aac_adtstoasc") - bsf_context = bsf.create() - bsf_context.set_input_stream(audio_stream) - return bsf_context + return av.BitStreamFilter("aac_adtstoasc") break return None @@ -489,7 +514,12 @@ def stream_worker( audio_stream = None # Disable ll-hls for hls inputs if container.format.name == "hls": - stream_settings.ll_hls = False + for field in attr.fields(StreamSettings): + setattr( + stream_settings, + field.name, + getattr(STREAM_SETTINGS_NON_LL_HLS, field.name), + ) stream_state.diagnostics.set_value("container_format", container.format.name) stream_state.diagnostics.set_value("video_codec", video_stream.name) if audio_stream: diff --git a/tests/components/stream/test_worker.py b/tests/components/stream/test_worker.py index d887a165b44..94d77e7657e 100644 --- a/tests/components/stream/test_worker.py +++ b/tests/components/stream/test_worker.py @@ -755,7 +755,9 @@ async def test_durations(hass, worker_finished_stream): }, ) - source = generate_h264_video(duration=SEGMENT_DURATION + 1) + source = generate_h264_video( + duration=round(SEGMENT_DURATION + target_part_duration + 1) + ) worker_finished, mock_stream = worker_finished_stream with patch("homeassistant.components.stream.Stream", wraps=mock_stream):