stream.hls: fix maps with and without keys

Add the `key` attribute to the `Map` class, so segment maps can be
encrypted independently or left as plain text, depending on the order
of the `EXT-X-MAP` and `EXT-X-KEY` tags in the playlist.

TODO:
Rewrite the `HLSSegment`, `HLSStreamWriter` and `HLSStreamWorker`
classes based on the logic of the DASH implementation, where
init segments are queued by the worker separately.
This commit is contained in:
bastimeyer 2024-02-24 18:02:07 +01:00 committed by Sebastian Meyer
parent ef25765c8d
commit de981a4a64
4 changed files with 56 additions and 4 deletions

View File

@ -243,9 +243,12 @@ class HLSStreamWriter(SegmentedStreamWriter[HLSSegment, Response]):
self.reader.pause()
def _write(self, segment: HLSSegment, result: Response, is_map: bool):
if segment.key and segment.key.method != "NONE":
# TODO: Rewrite HLSSegment, HLSStreamWriter and HLSStreamWorker based on independent initialization section segments,
# similar to the DASH implementation
key = segment.map.key if is_map and segment.map else segment.key
if key and key.method != "NONE":
try:
decryptor = self.create_decryptor(segment.key, segment.num)
decryptor = self.create_decryptor(key, segment.num)
except (StreamError, ValueError) as err:
log.error(f"Failed to create decryptor: {err}")
self.close()

View File

@ -353,6 +353,7 @@ class M3U8Parser(Generic[TM3U8_co, THLSSegment_co, THLSPlaylist_co], metaclass=M
byterange = self.parse_byterange(attr.get("BYTERANGE", ""))
self._map = Map(
uri=self.uri(uri),
key=self._key,
byterange=byterange,
)

View File

@ -46,6 +46,7 @@ class Key(NamedTuple):
# EXT-X-MAP
class Map(NamedTuple):
uri: str
key: Optional[Key]
byterange: Optional[ByteRange]

View File

@ -1,3 +1,4 @@
import itertools
import os
import typing
import unittest
@ -59,7 +60,7 @@ class TagMapEnc(EncryptedBase, TagMap):
class TagKey(Tag):
path = "encryption.key"
_id = itertools.count()
def __init__(self, method="NONE", uri=None, iv=None, keyformat=None, keyformatversions=None):
attrs = {"METHOD": method}
@ -73,6 +74,7 @@ class TagKey(Tag):
attrs["KEYFORMATVERSIONS"] = self.val_quoted_string(keyformatversions)
super().__init__("EXT-X-KEY", attrs)
self.uri = uri
self.path = f"encryption{next(self._id)}.key"
def url(self, namespace):
return self.uri.format(namespace=namespace) if self.uri else super().url(namespace)
@ -718,7 +720,6 @@ class TestHLSStreamEncrypted(TestMixinStreamHLS, unittest.TestCase):
self.mock("GET", self.url(map1), content=map1.content)
self.mock("GET", self.url(map2), content=map2.content)
# noinspection PyTypeChecker
segments = self.subject([
Playlist(0, [key, map1] + [SegmentEnc(num, aesKey, aesIv) for num in range(2)]),
Playlist(2, [key, map2] + [SegmentEnc(num, aesKey, aesIv) for num in range(2, 4)], end=True),
@ -732,6 +733,52 @@ class TestHLSStreamEncrypted(TestMixinStreamHLS, unittest.TestCase):
map1, segments[0], segments[1], map2, segments[2], segments[3],
], prop="content_plain")
def test_hls_encrypted_aes128_with_differently_encrypted_map(self):
aesKey1, aesIv1, key1 = self.gen_key() # init key
aesKey2, aesIv2, key2 = self.gen_key() # media key
map1 = TagMapEnc(1, namespace=self.id(), key=aesKey1, iv=aesIv1)
map2 = TagMapEnc(2, namespace=self.id(), key=aesKey1, iv=aesIv1)
self.mock("GET", self.url(map1), content=map1.content)
self.mock("GET", self.url(map2), content=map2.content)
segments = self.subject([
Playlist(0, [key1, map1, key2] + [SegmentEnc(num, aesKey2, aesIv2) for num in range(2)]),
Playlist(2, [key1, map2, key2] + [SegmentEnc(num, aesKey2, aesIv2) for num in range(2, 4)], end=True),
])
self.await_write(1 + 2 + 1 + 2) # 1 map, 2 segments, 1 map, 2 segments
data = self.await_read(read_all=True)
self.await_close()
assert data == self.content([
map1, segments[0], segments[1], map2, segments[2], segments[3],
], prop="content_plain")
def test_hls_encrypted_aes128_with_plaintext_map(self):
aesKey, aesIv, key = self.gen_key()
map1 = TagMap(1, namespace=self.id())
map2 = TagMap(2, namespace=self.id())
self.mock("GET", self.url(map1), content=map1.content)
self.mock("GET", self.url(map2), content=map2.content)
segments = self.subject([
Playlist(0, [map1, key] + [SegmentEnc(num, aesKey, aesIv) for num in range(2)]),
Playlist(2, [map2, key] + [SegmentEnc(num, aesKey, aesIv) for num in range(2, 4)], end=True),
])
self.await_write(1 + 2 + 1 + 2) # 1 map, 2 segments, 1 map, 2 segments
data = self.await_read(read_all=True)
self.await_close()
assert data == (
map1.content
+ segments[0].content_plain
+ segments[1].content_plain
+ map2.content
+ segments[2].content_plain
+ segments[3].content_plain
)
def test_hls_encrypted_aes128_key_uri_override(self):
aesKey, aesIv, key = self.gen_key(uri="http://real-mocked/{namespace}/encryption.key?foo=bar")
aesKeyInvalid = bytes(ord(aesKey[i:i + 1]) ^ 0xFF for i in range(16))