Refactor client classes.

- Introduce UserInteraction to query for PIN, UV, etc. (replaces passing
PIN directly).
- Enable Extensions to require PIN/UV token (needed for LargeBlobs).
- Update examples and tests.
This commit is contained in:
Dain Nilsson 2021-08-27 15:31:59 +02:00
parent 6ce5291469
commit 07a65065e9
No known key found for this signature in database
GPG Key ID: F04367096FBA95E8
10 changed files with 482 additions and 331 deletions

View File

@ -30,7 +30,7 @@ Connects to the first FIDO device found which supports the CredBlob extension,
creates a new credential for it with the extension enabled, and stores some data.
"""
from fido2.hid import CtapHidDevice
from fido2.client import Fido2Client
from fido2.client import Fido2Client, UserInteraction
from fido2.server import Fido2Server
from getpass import getpass
import sys
@ -50,28 +50,33 @@ def enumerate_devices():
yield dev
# Handle user interaction
class CliInteraction(UserInteraction):
def prompt_up(self):
print("\nTouch your authenticator device now...\n")
def request_pin(self, permissions, rd_id):
return getpass("Enter PIN: ")
def request_uv(self, permissions, rd_id):
print("User Verification required.")
return True
# Locate a device
for dev in enumerate_devices():
client = Fido2Client(dev, "https://example.com")
client = Fido2Client(dev, "https://example.com", user_interaction=CliInteraction())
if "credBlob" in client.info.extensions:
break
else:
print("No Authenticator with the CredBlob extension found!")
sys.exit(1)
use_nfc = CtapPcscDevice and isinstance(dev, CtapPcscDevice)
# Prefer UV token if supported
pin = None
uv = "discouraged"
if client.info.options.get("pinUvAuthToken") and client.info.options.get("uv"):
if client.info.options.get("pinUvAuthToken") or client.info.options.get("uv"):
uv = "preferred"
print("Authenticator supports UV token")
elif client.info.options.get("clientPin"):
# Prompt for PIN if needed
pin = getpass("Please enter PIN: ")
else:
print("PIN not set, won't use")
server = Fido2Server({"id": "example.com", "name": "Example RP"})
@ -89,10 +94,7 @@ blob = os.urandom(32) # 32 random bytes
create_options["publicKey"].extensions = {"credBlob": blob}
# Create a credential
if not use_nfc:
print("\nTouch your authenticator device now...\n")
result = client.make_credential(create_options["publicKey"], pin=pin)
result = client.make_credential(create_options["publicKey"])
# Complete registration
auth_data = server.register_complete(
@ -113,11 +115,8 @@ request_options, state = server.authenticate_begin()
request_options["publicKey"].extensions = {"getCredBlob": True}
# Authenticate the credential
if not use_nfc:
print("\nTouch your authenticator device now...\n")
# Only one cred in allowCredentials, only one response.
result = client.get_assertion(request_options["publicKey"], pin=pin).get_response(0)
result = client.get_assertion(request_options["publicKey"]).get_response(0)
blob_res = result.authenticator_data.extensions.get("credBlob")

View File

@ -32,18 +32,25 @@ This works with both FIDO 2.0 devices as well as with U2F devices.
On Windows, the native WebAuthn API will be used.
"""
from fido2.hid import CtapHidDevice
from fido2.ctap import CtapError, STATUS
from fido2.client import Fido2Client, WindowsClient, PinRequiredError
from fido2.client import Fido2Client, WindowsClient, UserInteraction
from fido2.server import Fido2Server
from getpass import getpass
import sys
import ctypes
def on_keepalive(status):
if status == STATUS.UPNEEDED: # Waiting for touch
# Handle user interaction
class CliInteraction(UserInteraction):
def prompt_up(self):
print("\nTouch your authenticator device now...\n")
def request_pin(self, permissions, rd_id):
return getpass("Enter PIN: ")
def request_uv(self, permissions, rd_id):
print("User Verification required.")
return True
uv = "discouraged"
@ -69,10 +76,10 @@ else:
sys.exit(1)
# Set up a FIDO 2 client using the origin https://example.com
client = Fido2Client(dev, "https://example.com")
client = Fido2Client(dev, "https://example.com", user_interaction=CliInteraction())
# Prefer UV if supported and configured
if client.info.options.get("uv"):
if client.info.options.get("uv") or client.info.options.get("pinUvAuthToken"):
uv = "preferred"
print("Authenticator supports User Verification")
@ -81,24 +88,14 @@ server = Fido2Server({"id": "example.com", "name": "Example RP"}, attestation="d
user = {"id": b"user_id", "name": "A. User"}
# Prepare parameters for makeCredential
create_options, state = server.register_begin(
user, user_verification=uv, authenticator_attachment="cross-platform"
)
# Create a credential
try:
result = client.make_credential(
create_options["publicKey"], on_keepalive=on_keepalive
)
except PinRequiredError as e:
if isinstance(e.cause, CtapError):
print(e.cause)
result = client.make_credential(
create_options["publicKey"],
on_keepalive=on_keepalive,
pin=getpass("Enter PIN: "),
)
result = client.make_credential(create_options["publicKey"])
# Complete registration
auth_data = server.register_complete(
@ -118,18 +115,7 @@ print("CREDENTIAL DATA:", auth_data.credential_data)
request_options, state = server.authenticate_begin(credentials, user_verification=uv)
# Authenticate the credential
try:
result = client.get_assertion(
request_options["publicKey"], on_keepalive=on_keepalive
)
except PinRequiredError as e:
if isinstance(e.cause, CtapError):
print(e.cause)
result = client.get_assertion(
request_options["publicKey"],
on_keepalive=on_keepalive,
pin=getpass("Enter PIN: "),
)
result = client.get_assertion(request_options["publicKey"])
# Only one cred in allowCredentials, only one response.
result = result.get_response(0)

View File

@ -31,7 +31,7 @@ creates a new credential for it with the extension enabled, and uses it to
derive two separate secrets.
"""
from fido2.hid import CtapHidDevice
from fido2.client import Fido2Client
from fido2.client import Fido2Client, UserInteraction
from getpass import getpass
from binascii import b2a_hex
import sys
@ -51,32 +51,34 @@ def enumerate_devices():
yield dev
# Handle user interaction
class CliInteraction(UserInteraction):
def prompt_up(self):
print("\nTouch your authenticator device now...\n")
def request_pin(self, permissions, rd_id):
return getpass("Enter PIN: ")
def request_uv(self, permissions, rd_id):
print("User Verification required.")
return True
# Locate a device
for dev in enumerate_devices():
client = Fido2Client(dev, "https://example.com")
client = Fido2Client(dev, "https://example.com", user_interaction=CliInteraction())
if "hmac-secret" in client.info.extensions:
break
else:
print("No Authenticator with the HmacSecret extension found!")
sys.exit(1)
use_nfc = CtapPcscDevice and isinstance(dev, CtapPcscDevice)
# Prepare parameters for makeCredential
rp = {"id": "example.com", "name": "Example RP"}
user = {"id": b"user_id", "name": "A. User"}
challenge = b"Y2hhbGxlbmdl"
# Prompt for PIN if needed
pin = None
if client.info.options.get("clientPin"):
pin = getpass("Please enter PIN:")
else:
print("no pin")
# Create a credential with a HmacSecret
if not use_nfc:
print("\nTouch your authenticator device now...\n")
result = client.make_credential(
{
"rp": rp,
@ -85,7 +87,6 @@ result = client.make_credential(
"pubKeyCredParams": [{"type": "public-key", "alg": -7}],
"extensions": {"hmacCreateSecret": True},
},
pin=pin,
)
# HmacSecret result:
@ -105,9 +106,6 @@ salt = os.urandom(32)
print("Authenticate with salt:", b2a_hex(salt))
# Authenticate the credential
if not use_nfc:
print("\nTouch your authenticator device now...\n")
result = client.get_assertion(
{
"rpId": rp["id"],
@ -115,7 +113,6 @@ result = client.get_assertion(
"allowCredentials": allow_list,
"extensions": {"hmacGetSecret": {"salt1": salt}},
},
pin=pin,
).get_response(
0
) # Only one cred in allowList, only one response.
@ -129,9 +126,6 @@ print("Authenticated, secret:", b2a_hex(output1))
salt2 = os.urandom(32)
print("Authenticate with second salt:", b2a_hex(salt2))
if not use_nfc:
print("\nTouch your authenticator device now...\n")
# The first salt is reused, which should result in the same secret.
result = client.get_assertion(
{
@ -140,7 +134,6 @@ result = client.get_assertion(
"allowCredentials": allow_list,
"extensions": {"hmacGetSecret": {"salt1": salt, "salt2": salt2}},
},
pin=pin,
).get_response(
0
) # One cred in allowCredentials, single response.

View File

@ -32,9 +32,7 @@ This works with both FIDO 2.0 devices as well as with U2F devices.
On Windows, the native WebAuthn API will be used.
"""
from fido2.hid import CtapHidDevice
from fido2.ctap2 import LargeBlobs
from fido2.ctap2.pin import ClientPin
from fido2.client import Fido2Client
from fido2.client import Fido2Client, UserInteraction
from fido2.server import Fido2Server
from getpass import getpass
import sys
@ -54,17 +52,28 @@ def enumerate_devices():
yield dev
# Handle user interaction
class CliInteraction(UserInteraction):
def prompt_up(self):
print("\nTouch your authenticator device now...\n")
def request_pin(self, permissions, rd_id):
return getpass("Enter PIN: ")
def request_uv(self, permissions, rd_id):
print("User Verification required.")
return True
# Locate a device
for dev in enumerate_devices():
client = Fido2Client(dev, "https://example.com")
client = Fido2Client(dev, "https://example.com", user_interaction=CliInteraction())
if "largeBlobKey" in client.info.extensions:
break
else:
print("No Authenticator with the largeBlobKey extension found!")
sys.exit(1)
pin = None
uv = "discouraged"
if not client.info.options.get("largeBlobs"):
print("Authenticator does not support large blobs!")
@ -72,18 +81,13 @@ if not client.info.options.get("largeBlobs"):
# Prefer UV token if supported
if client.info.options.get("pinUvAuthToken") and client.info.options.get("uv"):
uv = "discouraged"
if client.info.options.get("pinUvAuthToken") or client.info.options.get("uv"):
uv = "preferred"
print("Authenticator supports UV token")
elif client.info.options.get("clientPin"):
# Prompt for PIN if needed
pin = getpass("Please enter PIN: ")
else:
print("PIN not set, won't use")
server = Fido2Server({"id": "example.com", "name": "Example RP"})
user = {"id": b"user_id", "name": "A. User"}
# Prepare parameters for makeCredential
@ -94,14 +98,14 @@ create_options, state = server.register_begin(
authenticator_attachment="cross-platform",
)
# Enable largeBlobKey
print("Creating a credential with LargeBlob support...")
# Enable largeBlob
options = create_options["publicKey"]
options.extensions = {"largeBlobKey": True}
options.extensions = {"largeBlob": {"support": "required"}}
# Create a credential
print("\nTouch your authenticator device now...\n")
result = client.make_credential(options, pin=pin)
result = client.make_credential(options)
# Complete registration
auth_data = server.register_complete(
@ -109,52 +113,37 @@ auth_data = server.register_complete(
)
credentials = [auth_data.credential_data]
print("New credential created!")
if not result.extension_results.get("supported"):
print("Credential does not support largeBlob, failure!")
sys.exit(1)
print("Credential created! Writing a blob...")
# Prepare parameters for getAssertion
request_options, state = server.authenticate_begin(user_verification=uv)
# Enable largeBlobKey
# Write a large blob
options = request_options["publicKey"]
options.extensions = {"largeBlobKey": True}
options.extensions = {"largeBlob": {"write": b"Here is some data to store!"}}
# Authenticate the credential
print("\nTouch your authenticator device now...\n")
selection = client.get_assertion(options)
selection = client.get_assertion(options, pin=pin)
# Only one cred in allowCredentials, only one response.
assertion = selection.get_assertions()[0]
result = selection.get_response(0)
if not result.extension_results.get("written"):
print("Failed to write blob!")
sys.exit(1)
# This should match the key from MakeCredential.
key = assertion.large_blob_key
print("Large Blob Key:", key.hex())
print("Blob written! Reading back the blob...")
# Get a PIN/UV token for writing a blob
client_pin = ClientPin(client.ctap2)
if pin:
token = client_pin.get_pin_token(pin, ClientPin.PERMISSION.LARGE_BLOB_WRITE)
else:
print("\nPerform User Verification now...\n")
token = client_pin.get_uv_token(ClientPin.PERMISSION.LARGE_BLOB_WRITE)
large_blobs = LargeBlobs(client.ctap2, client_pin.protocol, token)
# Read the blob
options = request_options["publicKey"]
options.extensions = {"largeBlob": {"read": True}}
# Write a large blob
print("Writing a large blob...")
large_blobs.put_blob(key, b"Here is some data to store!")
# Authenticate the credential
selection = client.get_assertion(options)
# Read the blob without providing a PIN token.
large_blobs = LargeBlobs(client.ctap2)
blob = large_blobs.get_blob(key)
print("Read blob", blob)
# Get a fresh PIN token
if pin:
token = client_pin.get_pin_token(pin, ClientPin.PERMISSION.LARGE_BLOB_WRITE)
else:
print("\nPerform User Verification now...\n")
token = client_pin.get_uv_token(ClientPin.PERMISSION.LARGE_BLOB_WRITE)
large_blobs = LargeBlobs(client.ctap2, client_pin.protocol, token)
# Clean up
large_blobs.delete_blob(key)
print("Blob deleted...")
# Only one cred in allowCredentials, only one response.
result = selection.get_response(0)
print("Read blob: ", result.extension_results.get("blob"))

View File

@ -30,9 +30,10 @@ Connects to each FIDO device found, and causes them all to blink until the user
triggers one to select it. A new credential is created for that authenticator,
and the operation is cancelled for the others.
"""
from fido2.hid import CtapHidDevice, STATUS
from fido2.client import Fido2Client, ClientError
from fido2.hid import CtapHidDevice
from fido2.client import Fido2Client, ClientError, UserInteraction
from threading import Event, Thread
from getpass import getpass
import sys
# Locate a device
@ -41,38 +42,38 @@ if not devs:
print("No FIDO device found")
sys.exit(1)
clients = [Fido2Client(d, "https://example.com") for d in devs]
# Handle user interaction
class CliInteraction(UserInteraction):
def prompt_up(self):
print("\nTouch your authenticator device now...\n")
def request_pin(self, permissions, rd_id):
return getpass("Enter PIN: ")
def request_uv(self, permissions, rd_id):
print("User Verification required.")
return True
clients = [
Fido2Client(d, "https://example.com", user_interaction=CliInteraction())
for d in devs
]
# Prepare parameters for makeCredential
rp = {"id": "example.com", "name": "Example RP"}
user = {"id": b"user_id", "name": "A. User"}
challenge = b"Y2hhbGxlbmdl"
cancel = Event()
result = None
has_prompted = False
selected = None
def on_keepalive(status):
global has_prompted # Don't prompt for each device.
if status == STATUS.UPNEEDED and not has_prompted:
print("\nTouch your authenticator device now...\n")
has_prompted = True
def work(client):
global result
def select(client):
global selected
try:
result = client.make_credential(
{
"rp": rp,
"user": user,
"challenge": challenge,
"pubKeyCredParams": [{"type": "public-key", "alg": -7}],
},
event=cancel,
on_keepalive=on_keepalive,
)
client.selection(cancel)
selected = client
except ClientError as e:
if e.code != ClientError.ERR.TIMEOUT:
raise
@ -81,9 +82,11 @@ def work(client):
cancel.set()
print("\nTouch the authenticator you wish to use...\n")
threads = []
for client in clients:
t = Thread(target=work, args=(client,))
t = Thread(target=select, args=(client,))
threads.append(t)
t.start()
@ -91,6 +94,17 @@ for t in threads:
t.join()
if cancel.is_set():
print("Authenticator selected, making credential...")
result = selected.make_credential(
{
"rp": rp,
"user": user,
"challenge": challenge,
"pubKeyCredParams": [{"type": "public-key", "alg": -7}],
},
)
print("New credential created!")
print("ATTESTATION OBJECT:", result.attestation_object)
print()

View File

@ -32,7 +32,7 @@ This works with both FIDO 2.0 devices as well as with U2F devices.
On Windows, the native WebAuthn API will be used.
"""
from fido2.hid import CtapHidDevice
from fido2.client import Fido2Client, WindowsClient
from fido2.client import Fido2Client, WindowsClient, UserInteraction
from fido2.server import Fido2Server
from getpass import getpass
import sys
@ -52,8 +52,19 @@ def enumerate_devices():
yield dev
use_prompt = False
pin = None
# Handle user interaction
class CliInteraction(UserInteraction):
def prompt_up(self):
print("\nTouch your authenticator device now...\n")
def request_pin(self, permissions, rd_id):
return getpass("Enter PIN: ")
def request_uv(self, permissions, rd_id):
print("User Verification required.")
return True
uv = "discouraged"
if WindowsClient.is_available() and not ctypes.windll.shell32.IsUserAnAdmin():
@ -62,9 +73,10 @@ if WindowsClient.is_available() and not ctypes.windll.shell32.IsUserAnAdmin():
else:
# Locate a device
for dev in enumerate_devices():
client = Fido2Client(dev, "https://example.com")
client = Fido2Client(
dev, "https://example.com", user_interaction=CliInteraction()
)
if client.info.options.get("rk"):
use_prompt = not (CtapPcscDevice and isinstance(dev, CtapPcscDevice))
break
else:
print("No Authenticator with support for resident key found!")
@ -74,11 +86,6 @@ else:
if client.info.options.get("uv"):
uv = "preferred"
print("Authenticator supports User Verification")
elif client.info.options.get("clientPin"):
# Prompt for PIN if needed
pin = getpass("Please enter PIN: ")
else:
print("PIN not set, won't use")
server = Fido2Server({"id": "example.com", "name": "Example RP"}, attestation="direct")
@ -94,10 +101,7 @@ create_options, state = server.register_begin(
)
# Create a credential
if use_prompt:
print("\nTouch your authenticator device now...\n")
result = client.make_credential(create_options["publicKey"], pin=pin)
result = client.make_credential(create_options["publicKey"])
# Complete registration
@ -118,10 +122,7 @@ print("CREDENTIAL DATA:", auth_data.credential_data)
request_options, state = server.authenticate_begin(user_verification=uv)
# Authenticate the credential
if use_prompt:
print("\nTouch your authenticator device now...\n")
selection = client.get_assertion(request_options["publicKey"], pin=pin)
selection = client.get_assertion(request_options["publicKey"])
result = selection.get_response(0) # There may be multiple responses, get the first.
print("USER ID:", result.user_handle)

View File

@ -35,7 +35,7 @@ Yubico FIDO root CA (this will only work for Yubico devices).
On Windows, the native WebAuthn API will be used.
"""
from fido2.hid import CtapHidDevice
from fido2.client import Fido2Client, WindowsClient
from fido2.client import Fido2Client, WindowsClient, UserInteraction
from fido2.server import Fido2Server, AttestationVerifier
from base64 import b64decode
from getpass import getpass
@ -80,10 +80,22 @@ class YubicoAttestationVerifier(AttestationVerifier):
return [YUBICO_CA]
use_prompt = False
pin = None
uv = "discouraged"
# Handle user interaction
class CliInteraction(UserInteraction):
def prompt_up(self):
print("\nTouch your authenticator device now...\n")
def request_pin(self, permissions, rd_id):
return getpass("Enter PIN: ")
def request_uv(self, permissions, rd_id):
print("User Verification required.")
return True
if WindowsClient.is_available() and not ctypes.windll.shell32.IsUserAnAdmin():
# Use the Windows WebAuthn API if available, and we're not running as admin
client = WindowsClient("https://example.com")
@ -107,17 +119,12 @@ else:
sys.exit(1)
# Set up a FIDO 2 client using the origin https://example.com
client = Fido2Client(dev, "https://example.com")
client = Fido2Client(dev, "https://example.com", user_interaction=CliInteraction())
# Prefer UV if supported
if client.info.options.get("uv"):
uv = "preferred"
print("Authenticator supports User Verification")
elif client.info.options.get("clientPin"):
# Prompt for PIN if needed
pin = getpass("Please enter PIN: ")
else:
print("PIN not set, won't use")
server = Fido2Server(
@ -137,7 +144,7 @@ create_options, state = server.register_begin(
if use_prompt:
print("\nTouch your authenticator device now...\n")
result = client.make_credential(create_options["publicKey"], pin=pin)
result = client.make_credential(create_options["publicKey"])
# Complete registration
auth_data = server.register_complete(

View File

@ -29,7 +29,7 @@ from .hid import STATUS
from .ctap import CtapDevice, CtapError
from .ctap1 import Ctap1, APDU, ApduError
from .ctap2 import Ctap2, AssertionResponse, Info
from .ctap2.pin import ClientPin
from .ctap2.pin import ClientPin, PinProtocol
from .ctap2.extensions import Ctap2Extension
from .webauthn import (
AttestationObject,
@ -53,14 +53,16 @@ from typing import (
Optional,
Mapping,
Sequence,
Tuple,
List,
Dict,
)
import abc
import json
import platform
import inspect
import logging
logger = logging.getLogger(__name__)
class ClientData(bytes):
@ -376,27 +378,32 @@ class AssertionSelection:
)
class Fido2ClientAssertionSelection(AssertionSelection):
def __init__(
class WebAuthnClient(abc.ABC):
@abc.abstractmethod
def make_credential(
self,
client_data: ClientData,
assertions: Sequence[AssertionResponse],
extensions: Sequence[Ctap2Extension],
):
super().__init__(client_data, assertions)
self._extensions = extensions
options: PublicKeyCredentialCreationOptions,
event: Optional[Event] = None,
) -> AuthenticatorAttestationResponse:
"""Creates a credential.
def _get_extension_results(self, assertion):
# Process extenstion outputs
extension_outputs = {}
try:
for ext in self._extensions:
output = ext.process_get_output(assertion.auth_data)
if output is not None:
extension_outputs.update(output)
except ValueError as e:
raise ClientError.ERR.CONFIGURATION_UNSUPPORTED(e)
return extension_outputs
:param options: PublicKeyCredentialCreationOptions data.
:param threading.Event event: (optional) Signal to abort the operation.
"""
raise NotImplementedError()
@abc.abstractmethod
def get_assertion(
self,
options: PublicKeyCredentialRequestOptions,
event: Optional[Event] = None,
):
"""Get an assertion.
:param options: PublicKeyCredentialRequestOptions data.
:param threading.Event event: (optional) Signal to abort the operation.
"""
raise NotImplementedError()
def _default_extensions() -> Sequence[Type[Ctap2Extension]]:
@ -405,23 +412,38 @@ def _default_extensions() -> Sequence[Type[Ctap2Extension]]:
]
class _ClientBackend:
class _ClientBackend(abc.ABC):
info: Info
def do_make_credential(self, *args) -> Tuple[AttestationObject, Dict[str, Any]]:
@abc.abstractmethod
def selection(self, event) -> None:
raise NotImplementedError()
def do_get_assertion(
self, *args
) -> Tuple[List[AssertionResponse], List[Ctap2Extension]]:
@abc.abstractmethod
def do_make_credential(self, *args) -> AuthenticatorAttestationResponse:
raise NotImplementedError()
@abc.abstractmethod
def do_get_assertion(self, *args) -> AssertionSelection:
raise NotImplementedError()
class _Ctap1ClientBackend(_ClientBackend):
def __init__(self, device):
def __init__(self, device, user_interaction):
self.ctap1 = Ctap1(device)
self.info = Info.create(versions=["U2F_V2"], aaguid=b"\0" * 32)
self._poll_delay = 0.25
self._on_keepalive = _user_keepalive(user_interaction)
def selection(self, event):
_call_polling(
self._poll_delay,
event,
None,
self.ctap1.register,
b"\0" * 32,
b"\0" * 32,
)
def do_make_credential(
self,
@ -433,9 +455,7 @@ class _Ctap1ClientBackend(_ClientBackend):
extensions,
rk,
user_verification,
pin,
event,
on_keepalive,
):
if (
rk
@ -457,25 +477,27 @@ class _Ctap1ClientBackend(_ClientBackend):
_call_polling(
self._poll_delay,
event,
on_keepalive,
self._on_keepalive,
self.ctap1.register,
dummy_param,
dummy_param,
)
raise ClientError.ERR.DEVICE_INELIGIBLE()
return (
AttestationObject.from_ctap1(
att_obj = AttestationObject.from_ctap1(
app_param,
_call_polling(
self._poll_delay,
event,
self._on_keepalive,
self.ctap1.register,
client_data.hash,
app_param,
_call_polling(
self._poll_delay,
event,
on_keepalive,
self.ctap1.register,
client_data.hash,
app_param,
),
),
)
return AuthenticatorAttestationResponse(
client_data,
AttestationObject.create(att_obj.fmt, att_obj.auth_data, att_obj.att_stmt),
{},
)
@ -486,9 +508,7 @@ class _Ctap1ClientBackend(_ClientBackend):
allow_list,
extensions,
user_verification,
pin,
event,
on_keepalive,
):
if user_verification == UserVerificationRequirement.REQUIRED or not allow_list:
raise CtapError(CtapError.ERR.UNSUPPORTED_OPTION)
@ -500,29 +520,110 @@ class _Ctap1ClientBackend(_ClientBackend):
auth_resp = _call_polling(
self._poll_delay,
event,
on_keepalive,
self._on_keepalive,
self.ctap1.authenticate,
client_param,
app_param,
cred["id"],
)
assertions = [AssertionResponse.from_ctap1(app_param, cred, auth_resp)]
return assertions, []
return AssertionSelection(client_data, assertions)
except ClientError as e:
if e.code == ClientError.ERR.TIMEOUT:
raise # Other errors are ignored so we move to the next.
raise ClientError.ERR.DEVICE_INELIGIBLE()
class UserInteraction:
"""Provides user interaction to the Client.
Users of Fido2Client should subclass this to implement asking the user to perform
specific actions, such as entering a PIN or touching their"""
def prompt_up(self) -> None:
"""Called when the authenticator is awaiting a user presence check."""
logger.info("User Presence check required.")
def request_pin(
self, permissions: ClientPin.PERMISSION, rp_id: Optional[str]
) -> Optional[str]:
"""Called when the client requires a PIN from the user.
Should return a PIN, or None/Empty to cancel."""
logger.info("PIN requested, but UserInteraction does not support it.")
return None
def request_uv(
self, permissions: ClientPin.PERMISSION, rp_id: Optional[str]
) -> bool:
"""Called when the client is about to request UV from the user.
Should return True if allowed, or False to cancel."""
logger.info("User Verification requested.")
return True
def _user_keepalive(user_interaction):
def on_keepalive(status):
if status == STATUS.UPNEEDED: # Waiting for touch
user_interaction.prompt_up()
return on_keepalive
class _Ctap2ClientAssertionSelection(AssertionSelection):
def __init__(
self,
client_data: ClientData,
assertions: Sequence[AssertionResponse],
extensions: Sequence[Ctap2Extension],
pin_token: Optional[str],
pin_protocol: Optional[PinProtocol],
):
super().__init__(client_data, assertions)
self._extensions = extensions
self._pin_token = pin_token
self._pin_protocol = pin_protocol
def _get_extension_results(self, assertion):
# Process extenstion outputs
extension_outputs = {}
try:
for ext in self._extensions:
output = ext.process_get_output(
assertion, self._pin_token, self._pin_protocol
)
if output is not None:
extension_outputs.update(output)
except ValueError as e:
raise ClientError.ERR.CONFIGURATION_UNSUPPORTED(e)
return extension_outputs
class _Ctap2ClientBackend(_ClientBackend):
def __init__(self, device, extensions):
def __init__(self, device, user_interaction, extensions):
self.ctap2 = Ctap2(device)
self.info = self.ctap2.info
self.extensions = extensions
try:
self.client_pin: ClientPin = ClientPin(self.ctap2)
except ValueError:
self.client_pin = None # type: ignore
self.user_interaction = user_interaction
def selection(self, event):
if "FIDO_2_1" in self.info.versions:
self.ctap2.selection(event)
else:
try:
self.ctap2.make_credential(
b"\0" * 32,
{"id": "example.com", "name": "example.com"},
{"id": b"dummy", "name": "dummy"},
[{"type": "public-key", "alg": -7}],
pin_uv_param=b"",
event=event,
)
except CtapError as e:
if e.code is CtapError.ERR.PIN_AUTH_INVALID:
return
raise
def _should_use_uv(self, user_verification, mc):
uv_supported = any(
@ -549,54 +650,59 @@ class _Ctap2ClientBackend(_ClientBackend):
return True
return False
def _get_token(self, permissions, rp_id, pin, event, on_keepalive):
if pin:
if self.info.options.get("clientPin"):
return self.client_pin.get_pin_token(pin, permissions, rp_id)
else:
raise ClientError.ERR.BAD_REQUEST("PIN provided, but not set/supported")
elif self.info.options.get("uv"):
if self.info.options.get("pinUvAuthToken") and self.info.options.get(
"bioEnroll"
):
try:
return self.client_pin.get_uv_token(
def _get_token(
self, client_pin, permissions, rp_id, event, on_keepalive, allow_internal_uv
):
# Prefer UV
if self.info.options.get("uv"):
if self.info.options.get("pinUvAuthToken"):
if self.user_interaction.request_uv(permissions, rp_id):
return client_pin.get_uv_token(
permissions, rp_id, event, on_keepalive
)
except CtapError as e:
raise _ctap2client_err(e, PinRequiredError)
else:
return None # No token, use uv=True
elif self.info.options.get("clientPin"):
elif allow_internal_uv:
if self.user_interaction.request_uv(permissions, rp_id):
return None # No token, use uv=True
# PIN if UV not supported/allowed.
if self.info.options.get("clientPin"):
pin = self.user_interaction.request_pin(permissions, rp_id)
if pin:
return client_pin.get_pin_token(pin, permissions, rp_id)
raise PinRequiredError()
# Client PIN not configured.
raise ClientError.ERR.CONFIGURATION_UNSUPPORTED(
"User verification not configured/supported"
"User verification not configured"
)
def _get_auth_params(
self, client_data, rp_id, user_verification, pin, event, on_keepalive
self, client_data, rp_id, user_verification, permissions, event, on_keepalive
):
mc = client_data.get("type") == WEBAUTHN_TYPE.MAKE_CREDENTIAL
self.info = self.ctap2.get_info() # Make sure we have "fresh" info
pin_auth = None
pin_protocol = None
pin_token = None
pin_auth = None
internal_uv = False
if self._should_use_uv(user_verification, mc):
permission = (
if self._should_use_uv(user_verification, mc) or permissions:
client_pin = ClientPin(self.ctap2)
allow_internal_uv = not permissions
permissions |= (
ClientPin.PERMISSION.MAKE_CREDENTIAL
if mc
else ClientPin.PERMISSION.GET_ASSERTION
)
token = self._get_token(permission, rp_id, pin, event, on_keepalive)
if token:
pin_protocol = self.client_pin.protocol.VERSION
pin_auth = self.client_pin.protocol.authenticate(
token, client_data.hash
)
pin_token = self._get_token(
client_pin, permissions, rp_id, event, on_keepalive, allow_internal_uv
)
if pin_token:
pin_protocol = client_pin.protocol
pin_auth = client_pin.protocol.authenticate(pin_token, client_data.hash)
else:
internal_uv = True
return pin_protocol, pin_auth, internal_uv
return pin_protocol, pin_token, pin_auth, internal_uv
def do_make_credential(
self,
@ -608,9 +714,7 @@ class _Ctap2ClientBackend(_ClientBackend):
extensions,
rk,
user_verification,
pin,
event,
on_keepalive,
):
if exclude_list:
# Filter out credential IDs which are too long
@ -627,18 +731,24 @@ class _Ctap2ClientBackend(_ClientBackend):
client_inputs = extensions or {}
extension_inputs = {}
used_extensions = []
permissions = ClientPin.PERMISSION(0)
try:
for ext in [cls(self.ctap2) for cls in self.extensions]:
auth_input = ext.process_create_input(client_inputs)
auth_input, req_perms = ext.process_create_input_with_permissions(
client_inputs
)
if auth_input is not None:
used_extensions.append(ext)
permissions |= req_perms
extension_inputs[ext.NAME] = auth_input
except ValueError as e:
raise ClientError.ERR.CONFIGURATION_UNSUPPORTED(e)
on_keepalive = _user_keepalive(self.user_interaction)
# Handle auth
pin_protocol, pin_auth, internal_uv = self._get_auth_params(
client_data, rp["id"], user_verification, pin, event, on_keepalive
pin_protocol, pin_token, pin_auth, internal_uv = self._get_auth_params(
client_data, rp["id"], user_verification, permissions, event, on_keepalive
)
if not (rk or internal_uv):
@ -659,7 +769,7 @@ class _Ctap2ClientBackend(_ClientBackend):
extension_inputs or None,
options,
pin_auth,
pin_protocol,
pin_protocol.VERSION if pin_protocol else None,
event,
on_keepalive,
)
@ -668,13 +778,17 @@ class _Ctap2ClientBackend(_ClientBackend):
extension_outputs = {}
try:
for ext in used_extensions:
output = ext.process_create_output(att_obj.auth_data)
output = ext.process_create_output(att_obj, pin_token, pin_protocol)
if output is not None:
extension_outputs.update(output)
except ValueError as e:
raise ClientError.ERR.CONFIGURATION_UNSUPPORTED(e)
return att_obj, extension_outputs
return AuthenticatorAttestationResponse(
client_data,
AttestationObject.create(att_obj.fmt, att_obj.auth_data, att_obj.att_stmt),
extension_outputs,
)
def do_get_assertion(
self,
@ -683,15 +797,8 @@ class _Ctap2ClientBackend(_ClientBackend):
allow_list,
extensions,
user_verification,
pin,
event,
on_keepalive,
):
pin_protocol, pin_auth, internal_uv = self._get_auth_params(
client_data, rp_id, user_verification, pin, event, on_keepalive
)
options = {"uv": True} if internal_uv else None
if allow_list:
# Filter out credential IDs which are too long
max_len = self.info.max_cred_id_length
@ -709,15 +816,26 @@ class _Ctap2ClientBackend(_ClientBackend):
client_inputs = extensions or {}
extension_inputs = {}
used_extensions = []
permissions = ClientPin.PERMISSION(0)
try:
for ext in [cls(self.ctap2) for cls in self.extensions]:
auth_input = ext.process_get_input(client_inputs)
auth_input, req_perms = ext.process_get_input_with_permissions(
client_inputs
)
if auth_input is not None:
used_extensions.append(ext)
permissions |= req_perms
extension_inputs[ext.NAME] = auth_input
except ValueError as e:
raise ClientError.ERR.CONFIGURATION_UNSUPPORTED(e)
on_keepalive = _user_keepalive(self.user_interaction)
pin_protocol, pin_token, pin_auth, internal_uv = self._get_auth_params(
client_data, rp_id, user_verification, permissions, event, on_keepalive
)
options = {"uv": True} if internal_uv else None
assertions = self.ctap2.get_assertions(
rp_id,
client_data.hash,
@ -725,15 +843,21 @@ class _Ctap2ClientBackend(_ClientBackend):
extension_inputs or None,
options,
pin_auth,
pin_protocol,
pin_protocol.VERSION if pin_protocol else None,
event,
on_keepalive,
)
return assertions, used_extensions
return _Ctap2ClientAssertionSelection(
client_data,
assertions,
used_extensions,
pin_token,
pin_protocol,
)
class Fido2Client(_BaseClient):
class Fido2Client(WebAuthnClient, _BaseClient):
"""WebAuthn-like client implementation.
The client allows registration and authentication of WebAuthn credentials against
@ -749,34 +873,37 @@ class Fido2Client(_BaseClient):
device: CtapDevice,
origin: str,
verify: Callable[[str, str], bool] = verify_rp_id,
extension_types: Sequence[Type[Ctap2Extension]] = [],
extension_types: Sequence[Type[Ctap2Extension]] = _default_extensions(),
user_interaction: UserInteraction = UserInteraction(),
):
super().__init__(origin, verify)
try:
self._backend: _ClientBackend = _Ctap2ClientBackend(
device, extension_types or _default_extensions()
device, user_interaction, extension_types
)
except (ValueError, CtapError):
self._backend = _Ctap1ClientBackend(device)
self._backend = _Ctap1ClientBackend(device, user_interaction)
@property
def info(self):
return self._backend.info
def selection(self, event):
try:
self._backend.selection(event)
except CtapError as e:
raise _ctap2client_err(e)
def make_credential(
self,
options: PublicKeyCredentialCreationOptions,
pin: Optional[str] = None,
event: Optional[Event] = None,
on_keepalive: Optional[Callable[[int], None]] = None,
):
) -> AuthenticatorAttestationResponse:
"""Creates a credential.
:param options: PublicKeyCredentialCreationOptions data.
:param pin: (optional) Used if PIN verification is required.
:param threading.Event event: (optional) Signal to abort the operation.
:param on_keepalive: (optional) function to call with CTAP status updates.
"""
options = PublicKeyCredentialCreationOptions._wrap(options)
@ -795,7 +922,7 @@ class Fido2Client(_BaseClient):
selection = options.authenticator_selection or AuthenticatorSelectionCriteria()
try:
att_resp, extension_outputs = self._backend.do_make_credential(
return self._backend.do_make_credential(
client_data,
options.rp,
options.user,
@ -804,16 +931,7 @@ class Fido2Client(_BaseClient):
options.extensions,
selection.require_resident_key,
selection.user_verification,
pin,
event,
on_keepalive,
)
return AuthenticatorAttestationResponse(
client_data,
AttestationObject.create(
att_resp.fmt, att_resp.auth_data, att_resp.att_stmt
),
extension_outputs,
)
except CtapError as e:
raise _ctap2client_err(e)
@ -824,16 +942,12 @@ class Fido2Client(_BaseClient):
def get_assertion(
self,
options: PublicKeyCredentialRequestOptions,
pin: Optional[str] = None,
event: Optional[Event] = None,
on_keepalive: Optional[Callable[[int], None]] = None,
):
"""Get an assertion.
:param options: PublicKeyCredentialRequestOptions data.
:param pin: (optional) Used if PIN verification is required.
:param threading.Event event: (optional) Signal to abort the operation.
:param on_keepalive: (optional) Not implemented.
"""
options = PublicKeyCredentialRequestOptions._wrap(options)
@ -850,20 +964,13 @@ class Fido2Client(_BaseClient):
)
try:
assertions, used_extensions = self._backend.do_get_assertion(
return self._backend.do_get_assertion(
client_data,
options.rp_id,
options.allow_credentials,
options.extensions,
options.user_verification,
pin,
event,
on_keepalive,
)
return Fido2ClientAssertionSelection(
client_data,
assertions,
used_extensions,
)
except CtapError as e:
raise _ctap2client_err(e)
@ -872,9 +979,6 @@ class Fido2Client(_BaseClient):
timer.cancel()
_WIN_INFO = Info.create(versions=["U2F_V2", "FIDO_2_0"], aaguid=b"\0" * 32)
if platform.system().lower() == "windows":
try:
from .win_api import (
@ -887,7 +991,7 @@ if platform.system().lower() == "windows":
pass
class WindowsClient(_BaseClient):
class WindowsClient(WebAuthnClient, _BaseClient):
"""Fido2Client-like class using the Windows WebAuthn API.
Note: This class only works on Windows 10 19H1 or later. This is also when Windows

View File

@ -25,8 +25,11 @@
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
from .pin import ClientPin
from .base import AttestationResponse, AssertionResponse
from .pin import ClientPin, PinProtocol
from .blob import LargeBlobs
from enum import Enum, unique
from typing import Dict, Tuple, Any, Optional
import abc
@ -41,27 +44,48 @@ class Ctap2Extension(abc.ABC):
def __init__(self, ctap):
self.ctap = ctap
def is_supported(self):
def is_supported(self) -> bool:
"""Whether or not the extension is supported by the authenticator."""
return self.NAME in self.ctap.info.extensions
def process_create_input(self, inputs):
def process_create_input(self, inputs: Dict[str, Any]) -> Any:
"""Returns a value to include in the authenticator extension input,
or None.
"""
return None
def process_create_output(self, auth_data):
"""Return client extension output given auth_data, or None."""
def process_create_input_with_permissions(
self, inputs: Dict[str, Any]
) -> Tuple[Any, ClientPin.PERMISSION]:
return self.process_create_input(inputs), ClientPin.PERMISSION(0)
def process_create_output(
self,
attestation_response: AttestationResponse,
token: Optional[str],
pin_protocol: Optional[PinProtocol],
) -> Optional[Dict[str, Any]]:
"""Return client extension output given attestation_response, or None."""
return None
def process_get_input(self, inputs):
def process_get_input(self, inputs: Dict[str, Any]) -> Any:
"""Returns a value to include in the authenticator extension input,
or None.
"""
return None
def process_get_output(self, auth_data):
"""Return client extension output given auth_data, or None."""
def process_get_input_with_permissions(
self, inputs: Dict[str, Any]
) -> Tuple[Any, ClientPin.PERMISSION]:
return self.process_get_input(inputs), ClientPin.PERMISSION(0)
def process_get_output(
self,
assertion_response: AssertionResponse,
token: Optional[str],
pin_protocol: Optional[PinProtocol],
) -> Optional[Dict[str, Any]]:
"""Return client extension output given assertion_response, or None."""
return None
@ -81,8 +105,8 @@ class HmacSecretExtension(Ctap2Extension):
if self.is_supported() and inputs.get("hmacCreateSecret") is True:
return True
def process_create_output(self, auth_data):
if auth_data.extensions.get(self.NAME):
def process_create_output(self, attestation_response, *args):
if attestation_response.auth_data.extensions.get(self.NAME):
return {"hmacCreateSecret": True}
def process_get_input(self, inputs):
@ -113,8 +137,8 @@ class HmacSecretExtension(Ctap2Extension):
4: self.pin_protocol.VERSION,
}
def process_get_output(self, auth_data):
value = auth_data.extensions.get(self.NAME)
def process_get_output(self, assertion_response, *args):
value = assertion_response.auth_data.extensions.get(self.NAME)
decrypted = self.pin_protocol.decrypt(self.shared_secret, value)
output1 = decrypted[: HmacSecretExtension.SALT_LEN]
@ -126,20 +150,54 @@ class HmacSecretExtension(Ctap2Extension):
return {"hmacGetSecret": outputs}
class LargeBlobKeyExtension(Ctap2Extension):
class LargeBlobExtension(Ctap2Extension):
"""
Implements the Large Blob Key CTAP2 extension.
Implements the Large Blob WebAuthn extension.
"""
NAME = "largeBlobKey"
def is_supported(self):
return super().is_supported() and self.ctap.info.options.get("largeBlobs")
def process_create_input(self, inputs):
if self.is_supported() and inputs.get("largeBlobKey") is True:
data = inputs.get("largeBlob", {})
if data:
if "read" in data or "write" in data:
raise ValueError("Invalid set of parameters")
is_supported = self.is_supported()
if data.get("support") == "required" and not is_supported:
raise ValueError("Authenticator does not support large blob storage")
return True
def process_get_input(self, inputs):
if self.is_supported() and inputs.get("largeBlobKey") is True:
return True
def process_create_output(self, attestation_response, *args):
return {"supported": attestation_response.large_blob_key is not None}
def process_get_input_with_permissions(self, inputs):
data = inputs.get("largeBlob", {})
permissions = ClientPin.PERMISSION(0)
if data:
if "support" in data or ("read" in data and "write" in data):
raise ValueError("Invalid set of parameters")
if not self.is_supported():
raise ValueError("Authenticator does not support large blob storage")
if data.get("read") is True:
self._action = True
else:
self._action = data.get("write")
permissions = ClientPin.PERMISSION.LARGE_BLOB_WRITE
return True if data else None, permissions
def process_get_output(self, assertion_response, token, pin_protocol):
blob_key = assertion_response.large_blob_key
if self._action is True: # Read
large_blobs = LargeBlobs(self.ctap)
blob = large_blobs.get_blob(blob_key)
return {"blob": blob}
elif self._action: # Write
large_blobs = LargeBlobs(self.ctap, pin_protocol, token)
large_blobs.put_blob(blob_key, self._action)
return {"written": True}
class CredBlobExtension(Ctap2Extension):

View File

@ -416,7 +416,7 @@ class TestFido2Client(unittest.TestCase):
None,
None,
mock.ANY,
None,
mock.ANY,
)
self.assertEqual(response.client_data.get("origin"), APP_ID)