2018-07-03 14:57:00 +02:00
|
|
|
# Copyright (c) 2018 Yubico AB
|
|
|
|
# All rights reserved.
|
|
|
|
#
|
|
|
|
# Redistribution and use in source and binary forms, with or
|
|
|
|
# without modification, are permitted provided that the following
|
|
|
|
# conditions are met:
|
|
|
|
#
|
|
|
|
# 1. Redistributions of source code must retain the above copyright
|
|
|
|
# notice, this list of conditions and the following disclaimer.
|
|
|
|
# 2. Redistributions in binary form must reproduce the above
|
|
|
|
# copyright notice, this list of conditions and the following
|
|
|
|
# disclaimer in the documentation and/or other materials provided
|
|
|
|
# with the distribution.
|
|
|
|
#
|
|
|
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
|
|
|
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
|
|
|
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
|
|
|
|
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
|
|
|
|
# COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
|
|
|
|
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
|
|
|
|
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
|
|
|
|
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
|
|
|
|
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
|
|
|
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
|
|
|
|
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
|
|
|
# POSSIBILITY OF SUCH DAMAGE.
|
|
|
|
|
|
|
|
from __future__ import absolute_import, unicode_literals
|
|
|
|
|
|
|
|
from .rpid import verify_rp_id
|
|
|
|
from .cose import ES256
|
2018-08-21 17:07:16 +02:00
|
|
|
from .client import WEBAUTHN_TYPE
|
2018-07-03 14:57:00 +02:00
|
|
|
from .utils import sha256
|
2018-11-30 13:22:07 +01:00
|
|
|
from .ctap2 import AttestedCredentialData
|
2018-07-03 14:57:00 +02:00
|
|
|
|
|
|
|
import os
|
2018-08-21 17:07:16 +02:00
|
|
|
import six
|
|
|
|
from enum import Enum, unique
|
2018-07-03 14:57:00 +02:00
|
|
|
from cryptography.hazmat.primitives import constant_time
|
|
|
|
|
|
|
|
|
|
|
|
def _verify_origin_for_rp(rp_id):
|
|
|
|
return lambda o: verify_rp_id(rp_id, o)
|
|
|
|
|
|
|
|
|
2018-08-21 17:07:16 +02:00
|
|
|
@unique
|
|
|
|
class ATTESTATION(six.text_type, Enum):
|
|
|
|
NONE = 'none'
|
|
|
|
INDIRECT = 'indirect'
|
|
|
|
DIRECT = 'direct'
|
|
|
|
|
|
|
|
|
2018-08-21 17:38:25 +02:00
|
|
|
@unique
|
|
|
|
class USER_VERIFICATION(six.text_type, Enum):
|
|
|
|
DISCOURAGED = 'discouraged'
|
|
|
|
PREFERRED = 'preferred'
|
|
|
|
REQUIRED = 'required'
|
|
|
|
|
|
|
|
|
2018-08-24 16:45:08 +02:00
|
|
|
class RelyingParty(object):
|
|
|
|
"""Representation of relying party data.
|
|
|
|
|
|
|
|
See https://www.w3.org/TR/webauthn/#sctn-rp-credential-params for details.
|
|
|
|
|
|
|
|
:ivar ident: Unique identifier of the relying party,
|
|
|
|
see https://www.w3.org/TR/webauthn/#rp-id for details
|
|
|
|
:ivar name: Name of the relying party
|
|
|
|
:ivar icon: URL with the relying party icon
|
|
|
|
"""
|
|
|
|
|
|
|
|
def __init__(self, ident, name=None, icon=None):
|
|
|
|
self.ident = ident
|
|
|
|
self.name = name or ident
|
|
|
|
self.icon = icon
|
|
|
|
|
|
|
|
@property
|
|
|
|
def id_hash(self):
|
|
|
|
"""Return SHA256 hash of the identifier."""
|
|
|
|
return sha256(self.ident.encode())
|
|
|
|
|
|
|
|
|
2018-07-03 14:57:00 +02:00
|
|
|
class Fido2Server(object):
|
2018-08-24 16:45:08 +02:00
|
|
|
"""FIDO2 server
|
|
|
|
|
|
|
|
:ivar rp: Relying party data as `RelyingParty` instance.
|
|
|
|
"""
|
|
|
|
|
2018-08-21 17:38:25 +02:00
|
|
|
def __init__(
|
|
|
|
self,
|
|
|
|
rp,
|
2018-08-22 13:48:50 +02:00
|
|
|
attestation=ATTESTATION.NONE,
|
2018-08-21 17:38:25 +02:00
|
|
|
verify_origin=None,
|
|
|
|
):
|
2018-08-21 17:07:16 +02:00
|
|
|
self.rp = rp
|
2018-08-24 16:45:08 +02:00
|
|
|
self._verify = verify_origin or _verify_origin_for_rp(rp.ident)
|
2018-07-03 14:57:00 +02:00
|
|
|
self.timeout = 30
|
2018-08-22 13:48:50 +02:00
|
|
|
self.attestation = ATTESTATION(attestation)
|
2018-08-21 17:07:16 +02:00
|
|
|
self.allowed_algorithms = [ES256.ALGORITHM]
|
|
|
|
|
2018-11-30 13:22:07 +01:00
|
|
|
def register_begin(self, user, credentials=None, resident_key=False,
|
|
|
|
user_verification=USER_VERIFICATION.PREFERRED):
|
|
|
|
"""Return a PublicKeyCredentialCreationOptions registration object and
|
|
|
|
the internal state dictionary that needs to be passed as is to the
|
|
|
|
corresponding `register_complete` call.
|
|
|
|
|
|
|
|
:param user: The dict containing the user data.
|
|
|
|
:param credentials: The list of previously registered credentials.
|
|
|
|
:param resident_key: True to request a resident credential.
|
|
|
|
:param user_verification: The desired USER_VERIFICATION level.
|
|
|
|
:return: Registration data, internal state."""
|
2018-08-21 17:07:16 +02:00
|
|
|
if not self.allowed_algorithms:
|
|
|
|
raise ValueError('Server has no allowed algorithms.')
|
2018-07-03 14:57:00 +02:00
|
|
|
|
2018-11-30 13:22:07 +01:00
|
|
|
uv = USER_VERIFICATION(user_verification)
|
2018-07-03 14:57:00 +02:00
|
|
|
challenge = os.urandom(32)
|
2018-08-24 16:45:08 +02:00
|
|
|
|
|
|
|
# Serialize RP
|
|
|
|
rp_data = {'id': self.rp.ident, 'name': self.rp.name}
|
|
|
|
if self.rp.icon:
|
|
|
|
rp_data['icon'] = self.rp.icon
|
|
|
|
|
2018-11-30 13:22:07 +01:00
|
|
|
data = {
|
2018-07-03 14:57:00 +02:00
|
|
|
'publicKey': {
|
2018-08-24 16:45:08 +02:00
|
|
|
'rp': rp_data,
|
2018-07-03 14:57:00 +02:00
|
|
|
'user': user,
|
|
|
|
'challenge': challenge,
|
|
|
|
'pubKeyCredParams': [
|
|
|
|
{
|
|
|
|
'type': 'public-key',
|
|
|
|
'alg': alg
|
2018-08-21 17:07:16 +02:00
|
|
|
} for alg in self.allowed_algorithms
|
2018-07-03 14:57:00 +02:00
|
|
|
],
|
|
|
|
'excludeCredentials': [
|
|
|
|
{
|
|
|
|
'type': 'public-key',
|
|
|
|
'id': cred.credential_id
|
|
|
|
} for cred in credentials or []
|
|
|
|
],
|
|
|
|
'timeout': int(self.timeout * 1000),
|
2018-08-21 17:38:25 +02:00
|
|
|
'attestation': self.attestation,
|
|
|
|
'authenticatorSelection': {
|
2018-09-17 15:39:11 +02:00
|
|
|
'requireResidentKey': resident_key,
|
2018-11-30 13:22:07 +01:00
|
|
|
'userVerification': uv
|
2018-08-21 17:38:25 +02:00
|
|
|
}
|
2018-07-03 14:57:00 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-11-30 13:22:07 +01:00
|
|
|
state = self._make_internal_state(challenge, uv)
|
|
|
|
|
|
|
|
return data, state
|
|
|
|
|
|
|
|
def register_complete(self, state, client_data, attestation_object):
|
|
|
|
"""Verify the correctness of the registration data received from
|
|
|
|
the client.
|
|
|
|
|
|
|
|
:param state: The state data returned by the corresponding
|
|
|
|
`register_begin`.
|
|
|
|
:param client_data: The client data.
|
|
|
|
:param attestation_object: The attestation object.
|
|
|
|
:return: The authenticator data"""
|
2018-08-21 17:07:16 +02:00
|
|
|
if client_data.get('type') != WEBAUTHN_TYPE.MAKE_CREDENTIAL:
|
2018-07-03 14:57:00 +02:00
|
|
|
raise ValueError('Incorrect type in ClientData.')
|
|
|
|
if not self._verify(client_data.get('origin')):
|
|
|
|
raise ValueError('Invalid origin in ClientData.')
|
2018-11-30 13:22:07 +01:00
|
|
|
if not constant_time.bytes_eq(state['challenge'],
|
|
|
|
client_data.challenge):
|
2018-07-03 14:57:00 +02:00
|
|
|
raise ValueError('Wrong challenge in response.')
|
2018-08-24 16:45:08 +02:00
|
|
|
if not constant_time.bytes_eq(self.rp.id_hash,
|
2018-07-03 14:57:00 +02:00
|
|
|
attestation_object.auth_data.rp_id_hash):
|
|
|
|
raise ValueError('Wrong RP ID hash in response.')
|
2018-08-21 17:07:16 +02:00
|
|
|
if attestation_object.fmt == ATTESTATION.NONE \
|
|
|
|
and self.attestation != ATTESTATION.NONE:
|
|
|
|
raise ValueError('Attestation required, but not provided.')
|
2018-07-03 14:57:00 +02:00
|
|
|
attestation_object.verify(client_data.hash)
|
2018-08-21 17:38:25 +02:00
|
|
|
|
2018-11-30 13:22:07 +01:00
|
|
|
if state['user_verification'] is USER_VERIFICATION.REQUIRED and \
|
2018-08-23 14:11:29 +02:00
|
|
|
not attestation_object.auth_data.is_user_verified():
|
|
|
|
raise ValueError(
|
|
|
|
'User verification required, but User verified flag not set.')
|
2018-08-21 17:38:25 +02:00
|
|
|
|
2018-07-03 14:57:00 +02:00
|
|
|
return attestation_object.auth_data
|
|
|
|
|
2018-11-30 13:22:07 +01:00
|
|
|
def authenticate_begin(self, credentials,
|
|
|
|
user_verification=USER_VERIFICATION.PREFERRED):
|
|
|
|
"""Return a PublicKeyCredentialRequestOptions assertion object and
|
|
|
|
the internal state dictionary that needs to be passed as is to the
|
|
|
|
corresponding `authenticate_complete` call.
|
|
|
|
|
|
|
|
:param credentials: The list of previously registered credentials.
|
|
|
|
:param user_verification: The desired USER_VERIFICATION level.
|
|
|
|
:return: Assertion data, internal state."""
|
|
|
|
uv = USER_VERIFICATION(user_verification)
|
2018-07-03 14:57:00 +02:00
|
|
|
challenge = os.urandom(32)
|
2018-11-30 13:22:07 +01:00
|
|
|
|
|
|
|
data = {
|
2018-07-03 14:57:00 +02:00
|
|
|
'publicKey': {
|
2018-08-24 16:45:08 +02:00
|
|
|
'rpId': self.rp.ident,
|
2018-07-03 14:57:00 +02:00
|
|
|
'challenge': challenge,
|
|
|
|
'allowCredentials': [
|
|
|
|
{
|
|
|
|
'type': 'public-key',
|
|
|
|
'id': cred.credential_id
|
|
|
|
} for cred in credentials
|
|
|
|
],
|
2018-08-21 17:38:25 +02:00
|
|
|
'timeout': int(self.timeout * 1000),
|
2018-11-30 13:22:07 +01:00
|
|
|
'userVerification': uv
|
2018-07-03 14:57:00 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-11-30 13:22:07 +01:00
|
|
|
state = self._make_internal_state(challenge, uv,
|
|
|
|
credentials=credentials)
|
|
|
|
|
|
|
|
return data, state
|
|
|
|
|
|
|
|
def authenticate_complete(self, state, credential_id,
|
2018-07-03 14:57:00 +02:00
|
|
|
client_data, auth_data, signature):
|
2018-11-30 13:22:07 +01:00
|
|
|
"""Verify the correctness of the assertion data received from
|
|
|
|
the client.
|
|
|
|
|
|
|
|
:param state: The state data returned by the corresponding
|
|
|
|
`register_begin`.
|
|
|
|
:param credentials: The list of previously registered credentials.
|
|
|
|
:param credential_id: The credential id of the new credential.
|
|
|
|
:param client_data: The client data.
|
|
|
|
:param auth_data: The authenticator data.
|
|
|
|
:param signature: The signature provided by the client."""
|
2018-08-21 17:07:16 +02:00
|
|
|
if client_data.get('type') != WEBAUTHN_TYPE.GET_ASSERTION:
|
2018-07-03 14:57:00 +02:00
|
|
|
raise ValueError('Incorrect type in ClientData.')
|
|
|
|
if not self._verify(client_data.get('origin')):
|
|
|
|
raise ValueError('Invalid origin in ClientData.')
|
2018-11-30 13:22:07 +01:00
|
|
|
if state['challenge'] != client_data.challenge:
|
2018-07-03 14:57:00 +02:00
|
|
|
raise ValueError('Wrong challenge in response.')
|
2018-08-24 16:45:08 +02:00
|
|
|
if not constant_time.bytes_eq(self.rp.id_hash, auth_data.rp_id_hash):
|
2018-07-03 14:57:00 +02:00
|
|
|
raise ValueError('Wrong RP ID hash in response.')
|
|
|
|
|
2018-11-30 13:22:07 +01:00
|
|
|
if state['user_verification'] is USER_VERIFICATION.REQUIRED and \
|
2018-08-23 14:11:29 +02:00
|
|
|
not auth_data.is_user_verified():
|
|
|
|
raise ValueError(
|
|
|
|
'User verification required, but user verified flag not set.')
|
2018-08-21 17:38:25 +02:00
|
|
|
|
2018-11-30 13:22:07 +01:00
|
|
|
for cred in state['credentials']:
|
|
|
|
c = AttestedCredentialData(cred)
|
|
|
|
if c.credential_id == credential_id:
|
|
|
|
c.public_key.verify(auth_data + client_data.hash, signature)
|
|
|
|
return c
|
2018-07-03 14:57:00 +02:00
|
|
|
raise ValueError('Unknown credential ID.')
|
2018-11-30 13:22:07 +01:00
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def _make_internal_state(challenge, user_verification, credentials=[]):
|
|
|
|
return {
|
|
|
|
'challenge': challenge,
|
|
|
|
'user_verification': user_verification,
|
|
|
|
'credentials': credentials
|
|
|
|
}
|