python-fido2/fido2/client.py

579 lines
17 KiB
Python

# Copyright (c) 2018 Yubico AB
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# conditions are met:
#
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
from __future__ import absolute_import, unicode_literals
from .hid import STATUS
from .ctap import CtapError
from .ctap1 import CTAP1, APDU, ApduError
from .ctap2 import CTAP2, PinProtocolV1, AttestationObject, AssertionResponse, Info
from .cose import ES256
from .rpid import verify_rp_id, verify_app_id
from .utils import Timeout, sha256, hmac_sha256, websafe_decode, websafe_encode
from enum import Enum, IntEnum, unique
import json
import six
class ClientData(bytes):
def __init__(self, _):
super(ClientData, self).__init__()
self.data = json.loads(self.decode())
def get(self, key):
return self.data[key]
@property
def challenge(self):
return websafe_decode(self.get("challenge"))
@property
def b64(self):
return websafe_encode(self)
@property
def hash(self):
return sha256(self)
@classmethod
def build(cls, **kwargs):
return cls(json.dumps(kwargs).encode())
@classmethod
def from_b64(cls, data):
return cls(websafe_decode(data))
def __repr__(self):
return self.decode()
def __str__(self):
return self.decode()
class ClientError(Exception):
@unique
class ERR(IntEnum):
OTHER_ERROR = 1
BAD_REQUEST = 2
CONFIGURATION_UNSUPPORTED = 3
DEVICE_INELIGIBLE = 4
TIMEOUT = 5
def __call__(self, cause=None):
return ClientError(self, cause)
def __init__(self, code, cause=None):
self.code = ClientError.ERR(code)
self.cause = cause
def __repr__(self):
r = "Client error: {0} - {0.name}".format(self.code)
if self.cause:
r += ". Caused by {}".format(self.cause)
return r
def _ctap2client_err(e):
if e.code in [CtapError.ERR.CREDENTIAL_EXCLUDED, CtapError.ERR.NO_CREDENTIALS]:
ce = ClientError.ERR.DEVICE_INELIGIBLE
elif e.code in [
CtapError.ERR.KEEPALIVE_CANCEL,
CtapError.ERR.ACTION_TIMEOUT,
CtapError.ERR.USER_ACTION_TIMEOUT,
]:
ce = ClientError.ERR.TIMEOUT
elif e.code in [
CtapError.ERR.UNSUPPORTED_ALGORITHM,
CtapError.ERR.UNSUPPORTED_OPTION,
CtapError.ERR.UNSUPPORTED_EXTENSION,
CtapError.ERR.KEY_STORE_FULL,
]:
ce = ClientError.ERR.CONFIGURATION_UNSUPPORTED
elif e.code in [
CtapError.ERR.INVALID_COMMAND,
CtapError.ERR.CBOR_UNEXPECTED_TYPE,
CtapError.ERR.INVALID_CBOR,
CtapError.ERR.MISSING_PARAMETER,
CtapError.ERR.INVALID_OPTION,
CtapError.ERR.PIN_REQUIRED,
CtapError.ERR.PIN_INVALID,
CtapError.ERR.PIN_BLOCKED,
CtapError.ERR.PIN_NOT_SET,
CtapError.ERR.PIN_POLICY_VIOLATION,
CtapError.ERR.PIN_TOKEN_EXPIRED,
CtapError.ERR.PIN_AUTH_INVALID,
CtapError.ERR.PIN_AUTH_BLOCKED,
CtapError.ERR.REQUEST_TOO_LARGE,
CtapError.ERR.OPERATION_DENIED,
]:
ce = ClientError.ERR.BAD_REQUEST
else:
ce = ClientError.ERR.OTHER_ERROR
return ce(e)
def _call_polling(poll_delay, timeout, on_keepalive, func, *args, **kwargs):
with Timeout(timeout or 30) as event:
while not event.is_set():
try:
return func(*args, **kwargs)
except ApduError as e:
if e.code == APDU.USE_NOT_SATISFIED:
if on_keepalive:
on_keepalive(STATUS.UPNEEDED)
on_keepalive = None
event.wait(poll_delay)
else:
raise ClientError.ERR.OTHER_ERROR(e)
except CtapError as e:
raise _ctap2client_err(e)
raise ClientError.ERR.TIMEOUT()
@unique
class U2F_TYPE(six.text_type, Enum):
REGISTER = "navigator.id.finishEnrollment"
SIGN = "navigator.id.getAssertion"
class U2fClient(object):
def __init__(self, device, origin, verify=verify_app_id):
self.poll_delay = 0.25
self.ctap = CTAP1(device)
self.origin = origin
self._verify = verify
def _verify_app_id(self, app_id):
try:
if self._verify(app_id, self.origin):
return
except Exception:
pass # Fall through to ClientError
raise ClientError.ERR.BAD_REQUEST()
def register(
self,
app_id,
register_requests,
registered_keys,
timeout=None,
on_keepalive=None,
):
self._verify_app_id(app_id)
version = self.ctap.get_version()
dummy_param = b"\0" * 32
for key in registered_keys:
if key["version"] != version:
continue
key_app_id = key.get("appId", app_id)
app_param = sha256(key_app_id.encode())
self._verify_app_id(key_app_id)
key_handle = websafe_decode(key["keyHandle"])
try:
self.ctap.authenticate(dummy_param, app_param, key_handle, True)
raise ClientError.ERR.DEVICE_INELIGIBLE() # Bad response
except ApduError as e:
if e.code == APDU.USE_NOT_SATISFIED:
raise ClientError.ERR.DEVICE_INELIGIBLE()
except CtapError as e:
raise _ctap2client_err(e)
for request in register_requests:
if request["version"] == version:
challenge = request["challenge"]
break
else:
raise ClientError.ERR.DEVICE_INELIGIBLE()
client_data = ClientData.build(
typ=U2F_TYPE.REGISTER, challenge=challenge, origin=self.origin
)
app_param = sha256(app_id.encode())
reg_data = _call_polling(
self.poll_delay,
timeout,
on_keepalive,
self.ctap.register,
client_data.hash,
app_param,
)
return {"registrationData": reg_data.b64, "clientData": client_data.b64}
def sign(self, app_id, challenge, registered_keys, timeout=None, on_keepalive=None):
client_data = ClientData.build(
typ=U2F_TYPE.SIGN, challenge=challenge, origin=self.origin
)
version = self.ctap.get_version()
for key in registered_keys:
if key["version"] == version:
key_app_id = key.get("appId", app_id)
self._verify_app_id(key_app_id)
key_handle = websafe_decode(key["keyHandle"])
app_param = sha256(key_app_id.encode())
try:
signature_data = _call_polling(
self.poll_delay,
timeout,
on_keepalive,
self.ctap.authenticate,
client_data.hash,
app_param,
key_handle,
)
break
except ClientError:
pass # Ignore and try next key
else:
raise ClientError.ERR.DEVICE_INELIGIBLE()
return {
"clientData": client_data.b64,
"signatureData": signature_data.b64,
"keyHandle": key["keyHandle"],
}
@unique
class WEBAUTHN_TYPE(six.text_type, Enum):
MAKE_CREDENTIAL = "webauthn.create"
GET_ASSERTION = "webauthn.get"
_CTAP1_INFO = b"\xa2\x01\x81\x66\x55\x32\x46\x5f\x56\x32\x03\x50" + b"\0" * 16
class Fido2Client(object):
def __init__(self, device, origin, verify=verify_rp_id):
self.ctap1_poll_delay = 0.25
self.origin = origin
self._verify = verify
try:
self.ctap2 = CTAP2(device)
self.info = self.ctap2.get_info()
if PinProtocolV1.VERSION in self.info.pin_protocols:
self.pin_protocol = PinProtocolV1(self.ctap2)
else:
self.pin_protocol = None
self._do_make_credential = self._ctap2_make_credential
self._do_get_assertion = self._ctap2_get_assertion
except ValueError:
self.ctap1 = CTAP1(device)
self.info = Info(_CTAP1_INFO)
self._do_make_credential = self._ctap1_make_credential
self._do_get_assertion = self._ctap1_get_assertion
def _verify_rp_id(self, rp_id):
try:
if self._verify(rp_id, self.origin):
return
except Exception:
pass # Fall through to ClientError
raise ClientError.ERR.BAD_REQUEST()
def make_credential(
self,
rp,
user,
challenge,
algos=[ES256.ALGORITHM],
exclude_list=None,
extensions=None,
rk=False,
uv=False,
pin=None,
timeout=None,
on_keepalive=None,
):
self._verify_rp_id(rp["id"])
client_data = ClientData.build(
type=WEBAUTHN_TYPE.MAKE_CREDENTIAL,
clientExtensions={},
challenge=challenge,
origin=self.origin,
)
try:
return (
self._do_make_credential(
client_data,
rp,
user,
algos,
exclude_list,
extensions,
rk,
uv,
pin,
timeout,
on_keepalive,
),
client_data,
)
except CtapError as e:
raise _ctap2client_err(e)
def _ctap2_make_credential(
self,
client_data,
rp,
user,
algos,
exclude_list,
extensions,
rk,
uv,
pin,
timeout,
on_keepalive,
):
key_params = [{"type": "public-key", "alg": alg} for alg in algos]
pin_auth = None
pin_protocol = None
if pin:
pin_protocol = self.pin_protocol.VERSION
pin_token = self.pin_protocol.get_pin_token(pin)
pin_auth = hmac_sha256(pin_token, client_data.hash)[:16]
elif self.info.options.get("clientPin"):
raise ValueError("PIN required!")
if not (rk or uv):
options = None
else:
options = {}
if rk:
options["rk"] = True
if uv:
options["uv"] = True
# Filter out credential IDs which are too long
max_len = self.info.max_cred_id_length
if max_len and exclude_list:
exclude_list = [e for e in exclude_list if len(e) <= max_len]
# Reject the request if too many credentials remain.
max_creds = self.info.max_creds_in_list
if max_creds and len(exclude_list or ()) > max_creds:
raise ClientError.ERR.BAD_REQUEST("exclude_list too long")
return self.ctap2.make_credential(
client_data.hash,
rp,
user,
key_params,
exclude_list,
extensions,
options,
pin_auth,
pin_protocol,
timeout,
on_keepalive,
)
def _ctap1_make_credential(
self,
client_data,
rp,
user,
algos,
exclude_list,
extensions,
rk,
uv,
pin,
timeout,
on_keepalive,
):
if rk or uv or ES256.ALGORITHM not in algos:
raise CtapError(CtapError.ERR.UNSUPPORTED_OPTION)
app_param = sha256(rp["id"].encode())
dummy_param = b"\0" * 32
for cred in exclude_list or []:
key_handle = cred["id"]
try:
self.ctap1.authenticate(dummy_param, app_param, key_handle, True)
raise ClientError.ERR.OTHER_ERROR() # Shouldn't happen
except ApduError as e:
if e.code == APDU.USE_NOT_SATISFIED:
_call_polling(
self.ctap1_poll_delay,
timeout,
on_keepalive,
self.ctap1.register,
dummy_param,
dummy_param,
)
raise ClientError.ERR.DEVICE_INELIGIBLE()
return AttestationObject.from_ctap1(
app_param,
_call_polling(
self.ctap1_poll_delay,
timeout,
on_keepalive,
self.ctap1.register,
client_data.hash,
app_param,
),
)
def get_assertion(
self,
rp_id,
challenge,
allow_list=None,
extensions=None,
up=True,
uv=False,
pin=None,
timeout=None,
on_keepalive=None,
):
self._verify_rp_id(rp_id)
client_data = ClientData.build(
type=WEBAUTHN_TYPE.GET_ASSERTION,
clientExtensions={},
challenge=challenge,
origin=self.origin,
)
try:
return (
self._do_get_assertion(
client_data,
rp_id,
allow_list,
extensions,
up,
uv,
pin,
timeout,
on_keepalive,
),
client_data,
)
except CtapError as e:
raise _ctap2client_err(e)
def _ctap2_get_assertion(
self,
client_data,
rp_id,
allow_list,
extensions,
up,
uv,
pin,
timeout,
on_keepalive,
):
pin_auth = None
pin_protocol = None
if pin:
pin_protocol = self.pin_protocol.VERSION
pin_token = self.pin_protocol.get_pin_token(pin)
pin_auth = hmac_sha256(pin_token, client_data.hash)[:16]
elif self.info.options.get("clientPin"):
raise ValueError("PIN required!")
options = {}
if not up:
options["up"] = False
if uv:
options["uv"] = True
if len(options) == 0:
options = None
# Filter out credential IDs which are too long
max_len = self.info.max_cred_id_length
if max_len:
allow_list = [e for e in allow_list if len(e) <= max_len]
if not allow_list:
raise CtapError(CtapError.ERR.NO_CREDENTIALS)
# Reject the request if too many credentials remain.
max_creds = self.info.max_creds_in_list
if max_creds and len(allow_list) > max_creds:
raise ClientError.ERR.BAD_REQUEST("allow_list too long")
return self.ctap2.get_assertions(
rp_id,
client_data.hash,
allow_list,
extensions,
options,
pin_auth,
pin_protocol,
timeout,
on_keepalive,
)
def _ctap1_get_assertion(
self,
client_data,
rp_id,
allow_list,
extensions,
up,
uv,
pin,
timeout,
on_keepalive,
):
if (not up) or uv or not allow_list:
raise CtapError(CtapError.ERR.UNSUPPORTED_OPTION)
app_param = sha256(rp_id.encode())
client_param = client_data.hash
for cred in allow_list:
try:
auth_resp = _call_polling(
self.ctap1_poll_delay,
timeout,
on_keepalive,
self.ctap1.authenticate,
client_param,
app_param,
cred["id"],
)
return [AssertionResponse.from_ctap1(app_param, cred, auth_resp)]
except ClientError as e:
if e.code == ClientError.ERR.TIMEOUT:
raise # Other errors are ignored so we move to the next.
raise ClientError.ERR.DEVICE_INELIGIBLE()