remove pbkdf2 upgrade path (#18736)

This commit is contained in:
Matt Hamilton 2018-11-27 04:42:56 -05:00 committed by Paulus Schoutsen
parent c2f8dfcb9f
commit 4f2e7fc912
2 changed files with 0 additions and 120 deletions

View File

@ -1,8 +1,6 @@
"""Home Assistant auth provider.""" """Home Assistant auth provider."""
import base64 import base64
from collections import OrderedDict from collections import OrderedDict
import hashlib
import hmac
from typing import Any, Dict, List, Optional, cast from typing import Any, Dict, List, Optional, cast
import bcrypt import bcrypt
@ -11,7 +9,6 @@ import voluptuous as vol
from homeassistant.const import CONF_ID from homeassistant.const import CONF_ID
from homeassistant.core import callback, HomeAssistant from homeassistant.core import callback, HomeAssistant
from homeassistant.exceptions import HomeAssistantError from homeassistant.exceptions import HomeAssistantError
from homeassistant.util.async_ import run_coroutine_threadsafe
from . import AuthProvider, AUTH_PROVIDER_SCHEMA, AUTH_PROVIDERS, LoginFlow from . import AuthProvider, AUTH_PROVIDER_SCHEMA, AUTH_PROVIDERS, LoginFlow
@ -94,39 +91,11 @@ class Data:
user_hash = base64.b64decode(found['password']) user_hash = base64.b64decode(found['password'])
# if the hash is not a bcrypt hash...
# provide a transparant upgrade for old pbkdf2 hash format
if not (user_hash.startswith(b'$2a$')
or user_hash.startswith(b'$2b$')
or user_hash.startswith(b'$2x$')
or user_hash.startswith(b'$2y$')):
# IMPORTANT! validate the login, bail if invalid
hashed = self.legacy_hash_password(password)
if not hmac.compare_digest(hashed, user_hash):
raise InvalidAuth
# then re-hash the valid password with bcrypt
self.change_password(found['username'], password)
run_coroutine_threadsafe(
self.async_save(), self.hass.loop
).result()
user_hash = base64.b64decode(found['password'])
# bcrypt.checkpw is timing-safe # bcrypt.checkpw is timing-safe
if not bcrypt.checkpw(password.encode(), if not bcrypt.checkpw(password.encode(),
user_hash): user_hash):
raise InvalidAuth raise InvalidAuth
def legacy_hash_password(self, password: str,
for_storage: bool = False) -> bytes:
"""LEGACY password encoding."""
# We're no longer storing salts in data, but if one exists we
# should be able to retrieve it.
salt = self._data['salt'].encode() # type: ignore
hashed = hashlib.pbkdf2_hmac('sha512', password.encode(), salt, 100000)
if for_storage:
hashed = base64.b64encode(hashed)
return hashed
# pylint: disable=no-self-use # pylint: disable=no-self-use
def hash_password(self, password: str, for_storage: bool = False) -> bytes: def hash_password(self, password: str, for_storage: bool = False) -> bytes:
"""Encode a password.""" """Encode a password."""

View File

@ -1,7 +1,6 @@
"""Test the Home Assistant local auth provider.""" """Test the Home Assistant local auth provider."""
from unittest.mock import Mock from unittest.mock import Mock
import base64
import pytest import pytest
import voluptuous as vol import voluptuous as vol
@ -134,91 +133,3 @@ async def test_new_users_populate_values(hass, data):
user = await manager.async_get_or_create_user(credentials) user = await manager.async_get_or_create_user(credentials)
assert user.name == 'hello' assert user.name == 'hello'
assert user.is_active assert user.is_active
async def test_new_hashes_are_bcrypt(data, hass):
"""Test that newly created hashes are using bcrypt."""
data.add_auth('newuser', 'newpass')
found = None
for user in data.users:
if user['username'] == 'newuser':
found = user
assert found is not None
user_hash = base64.b64decode(found['password'])
assert (user_hash.startswith(b'$2a$')
or user_hash.startswith(b'$2b$')
or user_hash.startswith(b'$2x$')
or user_hash.startswith(b'$2y$'))
async def test_pbkdf2_to_bcrypt_hash_upgrade(hass_storage, hass):
"""Test migrating user from pbkdf2 hash to bcrypt hash."""
hass_storage[hass_auth.STORAGE_KEY] = {
'version': hass_auth.STORAGE_VERSION,
'key': hass_auth.STORAGE_KEY,
'data': {
'salt': '09c52f0b120eaa7dea5f73f9a9b985f3d493b30a08f3f2945ef613'
'0b08e6a3ea',
'users': [
{
'password': 'L5PAbehB8LAQI2Ixu+d+PDNJKmljqLnBcYWYw35onC/8D'
'BM1SpvT6A8ZFael5+deCt+s+43J08IcztnguouHSw==',
'username': 'legacyuser'
}
]
},
}
data = hass_auth.Data(hass)
await data.async_load()
# verify the correct (pbkdf2) password successfuly authenticates the user
await hass.async_add_executor_job(
data.validate_login, 'legacyuser', 'beer')
# ...and that the hashes are now bcrypt hashes
user_hash = base64.b64decode(
hass_storage[hass_auth.STORAGE_KEY]['data']['users'][0]['password'])
assert (user_hash.startswith(b'$2a$')
or user_hash.startswith(b'$2b$')
or user_hash.startswith(b'$2x$')
or user_hash.startswith(b'$2y$'))
async def test_pbkdf2_to_bcrypt_hash_upgrade_with_incorrect_pass(hass_storage,
hass):
"""Test migrating user from pbkdf2 hash to bcrypt hash."""
hass_storage[hass_auth.STORAGE_KEY] = {
'version': hass_auth.STORAGE_VERSION,
'key': hass_auth.STORAGE_KEY,
'data': {
'salt': '09c52f0b120eaa7dea5f73f9a9b985f3d493b30a08f3f2945ef613'
'0b08e6a3ea',
'users': [
{
'password': 'L5PAbehB8LAQI2Ixu+d+PDNJKmljqLnBcYWYw35onC/8D'
'BM1SpvT6A8ZFael5+deCt+s+43J08IcztnguouHSw==',
'username': 'legacyuser'
}
]
},
}
data = hass_auth.Data(hass)
await data.async_load()
orig_user_hash = base64.b64decode(
hass_storage[hass_auth.STORAGE_KEY]['data']['users'][0]['password'])
# Make sure invalid legacy passwords fail
with pytest.raises(hass_auth.InvalidAuth):
await hass.async_add_executor_job(
data.validate_login, 'legacyuser', 'wine')
# Make sure we don't change the password/hash when password is incorrect
with pytest.raises(hass_auth.InvalidAuth):
await hass.async_add_executor_job(
data.validate_login, 'legacyuser', 'wine')
same_user_hash = base64.b64decode(
hass_storage[hass_auth.STORAGE_KEY]['data']['users'][0]['password'])
assert orig_user_hash == same_user_hash