Add metadata filtering.

This commit is contained in:
Dain Nilsson 2022-04-22 15:47:19 +02:00
parent 1d7e2b5a5e
commit 966e38ee72
No known key found for this signature in database
GPG Key ID: F04367096FBA95E8
2 changed files with 86 additions and 6 deletions

View File

@ -301,6 +301,36 @@ class MetadataBlobPayload(_CamelCaseDataObject):
entries: Sequence[MetadataBlobPayloadEntry]
EntryFilter = Callable[[MetadataBlobPayloadEntry], bool]
LookupFilter = Callable[[MetadataBlobPayloadEntry, Sequence[bytes]], bool]
def filter_revoked(entry: MetadataBlobPayloadEntry) -> bool:
"""Filters out any revoked metadata entry.
This filter will remove any metadata entry which has a status_report with
the REVOKED status.
"""
return not any(
r.status == AuthenticatorStatus.REVOKED for r in entry.status_reports
)
def filter_attestation_key_compromised(
entry: MetadataBlobPayloadEntry, certificate_chain: Sequence[bytes]
) -> bool:
"""Denies any attestation that has a compromised attestation key.
This filter checks the status reports of a metadata entry and ensures the
attestation isn't signed by a key which is marked as compromised.
"""
for r in entry.status_reports:
if r.status == AuthenticatorStatus.ATTESTATION_KEY_COMPROMISE:
if r.certificate in certificate_chain:
return False
return True
_last_entry: ContextVar[Optional[MetadataBlobPayloadEntry]] = ContextVar("_last_entry")
@ -308,21 +338,42 @@ class MdsAttestationVerifier(AttestationVerifier):
"""MDS3 implementation of an AttestationVerifier.
The entry_filter is an optional predicate used to filter which metadata entries to
allow. When set, all entries must pass this filter for the attestation to be
considered valid.
include in the lookup for verification. By default, a filter that removes any
entries that have a status report indicating the authenticator is REVOKED is used.
See: filter_revoked
The attestation_filter is an optional predicate used to filter metadata entries
while performing attestation validation, and may take into account the
Authenticators attestation trust_chain. By default, a filter that will fail any
verification that has a trust_chain where one of the certificates is marked as
compromised by the metadata statement is used.
See: filter_attestation_key_compromised
NOTE: The attestation_filter is not used when calling find_entry_by_aaguid nor
find_entry_by_chain as no attestation is being verified!
Setting either filter (including setting it to None) will replace it, removing
the default behavior.
:param blob: The MetadataBlobPayload to query for device metadata.
:param entry_filter: An optional filter to exclude entries from the result.
:param entry_filter: An optional filter to exclude entries from lookup.
:param attestation_filter: An optional filter to fail verification for a given
attestation.
:param attestation_types: A list of Attestation types to support.
"""
def __init__(
self,
blob: MetadataBlobPayload,
entry_filter: Optional[Callable[[MetadataBlobPayloadEntry], bool]] = None,
entry_filter: Optional[EntryFilter] = filter_revoked,
attestation_filter: Optional[LookupFilter] = filter_attestation_key_compromised,
attestation_types: Sequence[Attestation] = None,
):
super().__init__(attestation_types)
self._attestation_filter = attestation_filter or (
lambda a, b: True
) # No-op for None
entries = (
[e for e in blob.entries if entry_filter(e)]
if entry_filter
@ -339,6 +390,7 @@ class MdsAttestationVerifier(AttestationVerifier):
"""Find an entry by AAGUID.
Returns a MetadataBlobPayloadEntry with a matching aaguid field, if found.
This method does not take the attestation_filter into account.
"""
return self._aaguid_table.get(aaguid)
@ -350,6 +402,7 @@ class MdsAttestationVerifier(AttestationVerifier):
Returns a MetadataBlobPayloadEntry containing an
attestationCertificateKeyIdentifier which matches one of the certificates in the
given chain, if found.
This method does not take the attestation_filter into account.
"""
for der in certificate_chain:
cert = x509.load_der_x509_certificate(der, default_backend())
@ -370,6 +423,11 @@ class MdsAttestationVerifier(AttestationVerifier):
if entry:
logging.debug(f"Found entry: {entry}")
# Check attestation filter
if not self._attestation_filter(entry, result.trust_path):
logging.debug("Matched entry did not pass attestation filter")
return None
# Figure out which root to use
if not entry.metadata_statement:
logging.warn("Matched entry has no metadata_statement, can't validate!")
@ -394,7 +452,8 @@ class MdsAttestationVerifier(AttestationVerifier):
) -> Optional[MetadataBlobPayloadEntry]:
"""Lookup a Metadata entry based on an Attestation.
Returns the first Metadata entry matching the given attestation and verifies it.
Returns the first Metadata entry matching the given attestation and verifies it,
including checking it against the attestation_filter.
"""
token = _last_entry.set(None)
try:

View File

@ -257,6 +257,9 @@ Jj4B6PwIhAM3RtYg4CaGkcbFJrcJeCbAXCAC7LbfQSr8EdM79GyGw
).encode()
AAGUID = bytes.fromhex("0132d110bf4e4208a403ab4f5f12efe5")
def test_parse_blob():
data = parse_blob(EXAMPLE_BLOB, EXAMPLE_CA)
assert data.no == 15
@ -266,7 +269,7 @@ def test_parse_blob():
def test_find_by_aaguid():
data = parse_blob(EXAMPLE_BLOB, EXAMPLE_CA)
mds = MdsAttestationVerifier(data)
entry = mds.find_entry_by_aaguid(bytes.fromhex("0132d110bf4e4208a403ab4f5f12efe5"))
entry = mds.find_entry_by_aaguid(AAGUID)
assert (
entry.metadata_statement.description
== "FIDO Alliance Sample FIDO2 Authenticator"
@ -285,3 +288,21 @@ def test_find_by_chain_miss():
mds = MdsAttestationVerifier(data)
entry = mds.find_entry_by_chain([EXAMPLE_CA])
assert entry is None
def test_filter_entries():
data = parse_blob(EXAMPLE_BLOB, EXAMPLE_CA)
mds = MdsAttestationVerifier(data, entry_filter=lambda e: e.aaguid != AAGUID)
entry = mds.find_entry_by_aaguid(AAGUID)
assert entry is None
mds = MdsAttestationVerifier(data, entry_filter=lambda e: e.aaguid == AAGUID)
assert mds.find_entry_by_aaguid(AAGUID)
def test_lookup_filter_does_not_affect_find_entry_by_aaguid():
data = parse_blob(EXAMPLE_BLOB, EXAMPLE_CA)
mds = MdsAttestationVerifier(
data, attestation_filter=lambda e, _: e.aaguid != AAGUID
)
assert mds.find_entry_by_aaguid(AAGUID)