Make attestation verification behavior pluggable.

This commit is contained in:
Dain Nilsson 2021-01-11 14:35:09 +01:00
parent 03031a5bad
commit 1fe7864317
No known key found for this signature in database
GPG Key ID: F04367096FBA95E8
9 changed files with 136 additions and 58 deletions

View File

@ -36,6 +36,8 @@ from .base import ( # noqa: F401
InvalidSignature,
UnsupportedType,
UnsupportedAttestation,
UntrustedAttestation,
verify_x509_chain,
)
from .apple import AppleAttestation # noqa: F401
from .android import AndroidSafetynetAttestation # noqa: F401

View File

@ -32,6 +32,7 @@ from .base import (
AttestationType,
AttestationResult,
InvalidData,
catch_builtins,
)
from ..cose import CoseKey
from ..utils import sha256, websafe_decode
@ -49,6 +50,7 @@ class AndroidSafetynetAttestation(Attestation):
def __init__(self, allow_rooted=False):
self.allow_rooted = allow_rooted
@catch_builtins
def verify(self, statement, auth_data, client_data_hash):
jwt = statement["response"]
header, payload, sig = (websafe_decode(x) for x in jwt.split(b"."))

View File

@ -32,6 +32,7 @@ from .base import (
AttestationType,
AttestationResult,
InvalidData,
catch_builtins,
)
from ..utils import sha256
@ -46,6 +47,7 @@ OID_APPLE = x509.ObjectIdentifier("1.2.840.113635.100.8.2")
class AppleAttestation(Attestation):
FORMAT = "apple"
@catch_builtins
def verify(self, statement, auth_data, client_data_hash):
x5c = statement["x5c"]
expected_nonce = sha256(auth_data + client_data_hash)

View File

@ -29,6 +29,8 @@ from enum import Enum, auto
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import padding, ec, rsa
from cryptography.exceptions import InvalidSignature as _InvalidSignature
from functools import wraps
import abc
@ -45,6 +47,10 @@ class InvalidSignature(InvalidAttestation):
pass
class UntrustedAttestation(InvalidAttestation):
pass
class UnsupportedType(InvalidAttestation):
def __init__(self, auth_data, fmt=None):
super(UnsupportedType, self).__init__(
@ -61,18 +67,35 @@ class AttestationResult(object):
self.attestation_type = attestation_type
self.trust_path = trust_path
def verify_trust_path(self, ca=None):
if not self.trust_path and not ca:
return
certs = [
x509.load_der_x509_certificate(der, default_backend())
for der in self.trust_path + ([ca] if ca else [])
]
class AttestationType(Enum):
BASIC = auto()
SELF = auto()
ATT_CA = auto()
ANON_CA = auto()
NONE = auto
def catch_builtins(f):
@wraps(f)
def inner(*args, **kwargs):
try:
return f(*args, **kwargs)
except (ValueError, KeyError, IndexError) as e:
raise InvalidData(e)
return inner
@catch_builtins
def verify_x509_chain(chain):
certs = [x509.load_der_x509_certificate(der, default_backend()) for der in chain]
cert = certs.pop(0)
while certs:
child = cert
cert = certs.pop(0)
while certs:
child = cert
cert = certs.pop(0)
pub = cert.public_key()
pub = cert.public_key()
try:
if isinstance(pub, rsa.RSAPublicKey):
pub.verify(
child.signature,
@ -86,14 +109,8 @@ class AttestationResult(object):
child.tbs_certificate_bytes,
ec.ECDSA(child.signature_hash_algorithm),
)
class AttestationType(Enum):
BASIC = auto()
SELF = auto()
ATT_CA = auto()
ANON_CA = auto()
NONE = auto
except _InvalidSignature:
raise InvalidSignature()
class Attestation(abc.ABC):

View File

@ -33,6 +33,7 @@ from .base import (
AttestationResult,
InvalidData,
InvalidSignature,
catch_builtins,
_validate_cert_common,
)
from ..cose import CoseKey
@ -83,6 +84,7 @@ def _validate_packed_cert(cert, aaguid):
class PackedAttestation(Attestation):
FORMAT = "packed"
@catch_builtins
def verify(self, statement, auth_data, client_data_hash):
if "ecdaaKeyId" in statement:
raise NotImplementedError("ECDAA not implemented")

View File

@ -35,6 +35,7 @@ from .base import (
AttestationResult,
InvalidData,
InvalidSignature,
catch_builtins,
_validate_cert_common,
)
from ..cose import CoseKey
@ -561,6 +562,7 @@ def _validate_tpm_cert(cert):
class TpmAttestation(Attestation):
FORMAT = "tpm"
@catch_builtins
def verify(self, statement, auth_data, client_data_hash):
if "ecdaaKeyId" in statement:
raise NotImplementedError("ECDAA not implemented")

View File

@ -32,6 +32,7 @@ from .base import (
AttestationType,
AttestationResult,
InvalidSignature,
catch_builtins,
)
from ..cose import ES256
@ -43,6 +44,7 @@ from cryptography.exceptions import InvalidSignature as _InvalidSignature
class FidoU2FAttestation(Attestation):
FORMAT = "fido-u2f"
@catch_builtins
def verify(self, statement, auth_data, client_data_hash):
cd = auth_data.credential_data
pk = b"\x04" + cd.public_key[-2] + cd.public_key[-3]

View File

@ -31,7 +31,13 @@ from .rpid import verify_rp_id, verify_app_id
from .cose import CoseKey
from .ctap2 import AttestedCredentialData
from .client import WEBAUTHN_TYPE
from .attestation import Attestation, FidoU2FAttestation, UnsupportedAttestation
from .attestation import (
Attestation,
UnsupportedAttestation,
UntrustedAttestation,
InvalidSignature,
verify_x509_chain,
)
from .utils import websafe_encode, websafe_decode
from .webauthn import (
AttestationConveyancePreference,
@ -47,22 +53,15 @@ from .webauthn import (
import os
import abc
from cryptography.hazmat.primitives import constant_time
from cryptography.exceptions import InvalidSignature
from cryptography.exceptions import InvalidSignature as _InvalidSignature
def _verify_origin_for_rp(rp_id):
return lambda o: verify_rp_id(rp_id, o)
def _default_attestations():
return [
cls()
for cls in Attestation.__subclasses__()
if getattr(cls, "FORMAT", "none") != "none"
]
def _validata_challenge(challenge):
if challenge is None:
challenge = os.urandom(32)
@ -100,20 +99,86 @@ def _wrap_credentials(creds):
]
def _ignore_attestation(attestation_object, client_data_hash):
"""Ignore attestation."""
def _default_attestations():
return [
cls()
for cls in Attestation.__subclasses__()
if getattr(cls, "FORMAT", "none") != "none"
]
class AttestationVerifier(abc.ABC):
"""Base class for verifying attestation.
Override the ca_lookup method to provide a trusted root certificate (or chain) used
to verify the trust path from the attestation.
"""
def __init__(self, attestation_types=None):
self._attestation_types = attestation_types or _default_attestations()
@abc.abstractmethod
def ca_lookup(self, attestation_result, auth_data):
"""Lookup a CA certificate to be used to verify a trust path.
:param attestation_result: The result of the attestation
"""
raise NotImplementedError()
def verify_attestation(self, attestation_object, client_data_hash):
"""Verify attestation.
:param attestation_object: dict containing attestation data.
:param client_data_hash: SHA256 hash of the ClientData bytes.
"""
att_verifier = UnsupportedAttestation(attestation_object.fmt)
for at in self._attestation_types:
if getattr(at, "FORMAT", None) == attestation_object.fmt:
att_verifier = at
break
# An unsupported format causes an exception to be thrown, which
# includes the auth_data. The caller may choose to handle this case
# and allow the registration.
result = att_verifier.verify(
attestation_object.att_statement,
attestation_object.auth_data,
client_data_hash,
)
# Lookup CA to use for trust path verification
ca = self.ca_lookup(result, attestation_object.auth_data)
if ca is None:
raise UntrustedAttestation("No root found for Authneticator")
# Validate the trust chain
try:
verify_x509_chain(result.trust_path + ca)
except InvalidSignature as e:
raise UntrustedAttestation(e)
def __call__(self, *args):
"""Allows passing an instance to Fido2Server as verify_attestation"""
self.verify_attestation(*args)
class Fido2Server(object):
"""FIDO2 server
"""FIDO2 server.
:param rp: Relying party data as `PublicKeyCredentialRpEntity` instance.
:param attestation: (optional) Requirement on authenticator attestation.
:param verify_origin: (optional) Alternative function to validate an origin.
:param attestation_types: (optional) List of `Attestation` subclasses to use
to verify attestation. By default, all available subclasses of
`Attestation` will be used, excluding the NoneAttestation format. This
parameter is ignored if `attestation` is set to `none`.
:param verify_attestation: (optional) function to validate attestation, which is
invoked with attestation_object and client_data_hash. It should return nothing
and raise an exception on failure. By default, attestation is ignored.
Attestation is also ignored if `attestation` is set to `none`.
"""
def __init__(
self, rp, attestation=None, verify_origin=None, attestation_types=None
self, rp, attestation=None, verify_origin=None, verify_attestation=None
):
self.rp = PublicKeyCredentialRpEntity._wrap(rp)
self._verify = verify_origin or _verify_origin_for_rp(self.rp.id)
@ -123,7 +188,7 @@ class Fido2Server(object):
PublicKeyCredentialParameters("public-key", alg)
for alg in CoseKey.supported_algorithms()
]
self._attestation_types = attestation_types or _default_attestations()
self._verify_attestation = verify_attestation or _ignore_attestation
def register_begin(
self,
@ -208,19 +273,7 @@ class Fido2Server(object):
)
if self.attestation not in (None, AttestationConveyancePreference.NONE):
att_verifier = UnsupportedAttestation(attestation_object.fmt)
for at in self._attestation_types:
if getattr(at, "FORMAT", None) == attestation_object.fmt:
att_verifier = at
break
# An unsupported format causes an exception to be thrown, which
# includes the auth_data. The caller may choose to handle this case
# and allow the registration.
att_verifier.verify(
attestation_object.att_statement,
attestation_object.auth_data,
client_data.hash,
)
self._verify_attestation(attestation_object, client_data.hash)
# We simply ignore attestation if self.attestation == 'none', as not all
# clients strip the attestation.
@ -292,7 +345,7 @@ class Fido2Server(object):
if cred.credential_id == credential_id:
try:
cred.public_key.verify(auth_data + client_data.hash, signature)
except InvalidSignature:
except _InvalidSignature:
raise ValueError("Invalid signature.")
return cred
raise ValueError("Unknown credential ID.")
@ -320,7 +373,6 @@ class U2FFido2Server(Fido2Server):
def __init__(self, app_id, rp, verify_u2f_origin=None, *args, **kwargs):
super(U2FFido2Server, self).__init__(rp, *args, **kwargs)
kwargs["attestation_types"] = [FidoU2FAttestation()]
if verify_u2f_origin:
kwargs["verify_origin"] = verify_u2f_origin
else:

View File

@ -42,6 +42,7 @@ from fido2.attestation import (
InvalidData,
InvalidSignature,
UnsupportedType,
verify_x509_chain,
)
from binascii import a2b_hex
@ -74,7 +75,6 @@ class TestAttestationObject(unittest.TestCase):
res = attestation.verify({}, auth_data, b"deadbeef" * 8)
self.assertEqual(res.attestation_type, AttestationType.NONE)
self.assertEqual(res.trust_path, [])
res.verify_trust_path()
with self.assertRaises(InvalidData):
attestation.verify({"not": "empty"}, auth_data, b"deadbeef" * 8)
@ -91,7 +91,6 @@ class TestAttestationObject(unittest.TestCase):
res = attestation.verify({}, auth_data, b"deadbeef" * 8)
self.assertEqual(res.attestation_type, AttestationType.NONE)
self.assertEqual(res.trust_path, [])
res.verify_trust_path()
with self.assertRaises(InvalidData):
attestation.verify({"not": "empty"}, auth_data, b"deadbeef" * 8)
@ -229,7 +228,7 @@ ee18128ed50dd7a855e54d2459db005""".replace(
res = attestation.verify(statement, auth_data, client_param)
self.assertEqual(res.attestation_type, AttestationType.ATT_CA)
res.verify_trust_path()
verify_x509_chain(res.trust_path)
def test_fido_u2f_attestation(self):
attestation = Attestation.for_type("fido-u2f")()
@ -257,7 +256,6 @@ ee18128ed50dd7a855e54d2459db005""".replace(
res = attestation.verify(statement, auth_data, client_param)
self.assertEqual(res.attestation_type, AttestationType.BASIC)
self.assertEqual(len(res.trust_path), 1)
res.verify_trust_path()
statement["sig"] = b"a" * len(statement["sig"])
with self.assertRaises(InvalidSignature):
@ -290,7 +288,6 @@ ee18128ed50dd7a855e54d2459db005""".replace(
res = attestation.verify(statement, auth_data, client_param)
self.assertEqual(res.attestation_type, AttestationType.BASIC)
self.assertEqual(len(res.trust_path), 1)
res.verify_trust_path()
statement["sig"] = b"a" * len(statement["sig"])
with self.assertRaises(InvalidSignature):
@ -316,7 +313,7 @@ ee18128ed50dd7a855e54d2459db005""".replace(
res = attestation.verify(statement, auth_data, client_param)
self.assertEqual(res.attestation_type, AttestationType.BASIC)
res.verify_trust_path(_GSR2_DER)
verify_x509_chain(res.trust_path + [_GSR2_DER])
def test_apple_attestation(self):
attestation = Attestation.for_type("apple")()
@ -346,4 +343,4 @@ ee18128ed50dd7a855e54d2459db005""".replace(
res = attestation.verify(statement, auth_data, client_param)
self.assertEqual(res.attestation_type, AttestationType.ANON_CA)
self.assertEqual(len(res.trust_path), 2)
res.verify_trust_path()
verify_x509_chain(res.trust_path)