1
mirror of https://github.com/home-assistant/core synced 2024-07-18 12:02:20 +02:00

Fix auth_sign_path with query params (#73240)

Co-authored-by: J. Nick Koston <nick@koston.org>
This commit is contained in:
Christopher Bailey 2022-06-21 15:21:47 -04:00 committed by GitHub
parent adf0f62963
commit 67618311fa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 92 additions and 3 deletions

View File

@ -7,11 +7,11 @@ from ipaddress import ip_address
import logging
import secrets
from typing import Final
from urllib.parse import unquote
from aiohttp import hdrs
from aiohttp.web import Application, Request, StreamResponse, middleware
import jwt
from yarl import URL
from homeassistant.auth.const import GROUP_ID_READ_ONLY
from homeassistant.auth.models import User
@ -57,18 +57,24 @@ def async_sign_path(
else:
refresh_token_id = hass.data[STORAGE_KEY]
url = URL(path)
now = dt_util.utcnow()
params = dict(sorted(url.query.items()))
encoded = jwt.encode(
{
"iss": refresh_token_id,
"path": unquote(path),
"path": url.path,
"params": params,
"iat": now,
"exp": now + expiration,
},
secret,
algorithm="HS256",
)
return f"{path}?{SIGN_QUERY_PARAM}={encoded}"
params[SIGN_QUERY_PARAM] = encoded
url = url.with_query(params)
return f"{url.path}?{url.query_string}"
@callback
@ -176,6 +182,11 @@ async def async_setup_auth(hass: HomeAssistant, app: Application) -> None:
if claims["path"] != request.path:
return False
params = dict(sorted(request.query.items()))
del params[SIGN_QUERY_PARAM]
if claims["params"] != params:
return False
refresh_token = await hass.auth.async_get_refresh_token(claims["iss"])
if refresh_token is None:

View File

@ -17,6 +17,7 @@ from homeassistant.components import websocket_api
from homeassistant.components.http.auth import (
CONTENT_USER_NAME,
DATA_SIGN_SECRET,
SIGN_QUERY_PARAM,
STORAGE_KEY,
async_setup_auth,
async_sign_path,
@ -294,6 +295,83 @@ async def test_auth_access_signed_path_with_refresh_token(
assert req.status == HTTPStatus.UNAUTHORIZED
async def test_auth_access_signed_path_with_query_param(
hass, app, aiohttp_client, hass_access_token
):
"""Test access with signed url and query params."""
app.router.add_post("/", mock_handler)
app.router.add_get("/another_path", mock_handler)
await async_setup_auth(hass, app)
client = await aiohttp_client(app)
refresh_token = await hass.auth.async_validate_access_token(hass_access_token)
signed_path = async_sign_path(
hass, "/?test=test", timedelta(seconds=5), refresh_token_id=refresh_token.id
)
req = await client.get(signed_path)
assert req.status == HTTPStatus.OK
data = await req.json()
assert data["user_id"] == refresh_token.user.id
async def test_auth_access_signed_path_with_query_param_order(
hass, app, aiohttp_client, hass_access_token
):
"""Test access with signed url and query params different order."""
app.router.add_post("/", mock_handler)
app.router.add_get("/another_path", mock_handler)
await async_setup_auth(hass, app)
client = await aiohttp_client(app)
refresh_token = await hass.auth.async_validate_access_token(hass_access_token)
signed_path = async_sign_path(
hass,
"/?test=test&foo=bar",
timedelta(seconds=5),
refresh_token_id=refresh_token.id,
)
url = yarl.URL(signed_path)
signed_path = f"{url.path}?{SIGN_QUERY_PARAM}={url.query.get(SIGN_QUERY_PARAM)}&foo=bar&test=test"
req = await client.get(signed_path)
assert req.status == HTTPStatus.OK
data = await req.json()
assert data["user_id"] == refresh_token.user.id
@pytest.mark.parametrize(
"base_url,test_url",
[
("/?test=test", "/?test=test&foo=bar"),
("/", "/?test=test"),
("/?test=test&foo=bar", "/?test=test&foo=baz"),
("/?test=test&foo=bar", "/?test=test"),
],
)
async def test_auth_access_signed_path_with_query_param_tamper(
hass, app, aiohttp_client, hass_access_token, base_url: str, test_url: str
):
"""Test access with signed url and query params that have been tampered with."""
app.router.add_post("/", mock_handler)
app.router.add_get("/another_path", mock_handler)
await async_setup_auth(hass, app)
client = await aiohttp_client(app)
refresh_token = await hass.auth.async_validate_access_token(hass_access_token)
signed_path = async_sign_path(
hass, base_url, timedelta(seconds=5), refresh_token_id=refresh_token.id
)
url = yarl.URL(signed_path)
token = url.query.get(SIGN_QUERY_PARAM)
req = await client.get(f"{test_url}&{SIGN_QUERY_PARAM}={token}")
assert req.status == HTTPStatus.UNAUTHORIZED
async def test_auth_access_signed_path_via_websocket(
hass, app, hass_ws_client, hass_read_only_access_token
):