From a835bee61b587692413753f2ca8e49e6ef89a293 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Oliv=C3=A9r=20Falvai?= Date: Sun, 5 Dec 2021 18:51:54 +0100 Subject: [PATCH] Brute force encryption key automatically (#52) * Brute force encryption key automatically * Fix config flow error handling * Add config flow tests * Cleanup --- custom_components/candy/client/__init__.py | 57 +++++-- custom_components/candy/client/decryption.py | 63 ++++++++ custom_components/candy/config_flow.py | 47 +++--- custom_components/candy/strings.json | 3 +- custom_components/candy/translations/en.json | 3 +- tests/common.py | 13 +- tests/conftest.py | 23 --- tests/test_client.py | 78 ++++++++- tests/test_config_flow.py | 159 +++++++++++++++++++ tests/test_decryption.py | 29 ++++ 10 files changed, 407 insertions(+), 68 deletions(-) create mode 100644 custom_components/candy/client/decryption.py create mode 100644 tests/test_config_flow.py create mode 100644 tests/test_decryption.py diff --git a/custom_components/candy/client/__init__.py b/custom_components/candy/client/__init__.py index cd75d60..927e917 100644 --- a/custom_components/candy/client/__init__.py +++ b/custom_components/candy/client/__init__.py @@ -1,12 +1,13 @@ -import codecs import json import logging -from typing import Union +from json import JSONDecodeError +from typing import Union, Optional import aiohttp import backoff from aiohttp import ClientSession +from .decryption import decrypt, Encryption, find_key from .model import WashingMachineStatus, TumbleDryerStatus, DishwasherStatus, OvenStatus _LOGGER = logging.getLogger(__name__) @@ -22,16 +23,15 @@ def __init__(self, session: ClientSession, device_ip: str, encryption_key: str, @backoff.on_exception(backoff.expo, aiohttp.ClientError, max_tries=10, logger=__name__) @backoff.on_exception(backoff.expo, TimeoutError, max_tries=10, logger=__name__) - async def status_with_retry(self) -> Union[WashingMachineStatus, TumbleDryerStatus]: + async def status_with_retry(self) -> Union[WashingMachineStatus, TumbleDryerStatus, DishwasherStatus, OvenStatus]: return await self.status() - async def status(self) -> Union[WashingMachineStatus, TumbleDryerStatus]: - url = f"http://{self.device_ip}/http-read.json?encrypted={1 if self.use_encryption else 0}" + async def status(self) -> Union[WashingMachineStatus, TumbleDryerStatus, DishwasherStatus, OvenStatus]: + url = _status_url(self.device_ip, self.use_encryption) async with self.session.get(url) as resp: - if self.use_encryption: - resp_bytes = await resp.read() - resp_hex = codecs.decode(resp_bytes, encoding="hex") - decrypted_text = self.decrypt(resp_hex, self.encryption_key) + if self.encryption_key != "": + resp_hex = await resp.text() # Response is hex encoded encrypted data + decrypted_text = decrypt(self.encryption_key.encode(), bytes.fromhex(resp_hex)) resp_json = json.loads(decrypted_text) else: resp_json = await resp.json(content_type="text/html") @@ -51,9 +51,38 @@ async def status(self) -> Union[WashingMachineStatus, TumbleDryerStatus]: return status - def decrypt(self, cipher_text, key): - decrypted = "" - for i in range(len(cipher_text)): - decrypted += chr(cipher_text[i] ^ ord(key[i % len(key)])) - return decrypted +async def detect_encryption(session: aiohttp.ClientSession, device_ip: str) -> (Encryption, Optional[str]): + # noinspection PyBroadException + try: + _LOGGER.info("Trying to get a response without encryption (encrypted=0)...") + url = _status_url(device_ip, use_encryption=False) + async with session.get(url) as resp: + resp_json = await resp.json(content_type="text/html") + assert resp_json.get("response") != "BAD REQUEST" + _LOGGER.info("Received unencrypted JSON response, no need to use key for decryption") + return Encryption.NO_ENCRYPTION, None + except Exception as e: + _LOGGER.debug(e) + _LOGGER.info("Failed to get a valid response without encryption, let's try with encrypted=1...") + url = _status_url(device_ip, use_encryption=True) + async with session.get(url) as resp: + resp_hex = await resp.text() # Response is hex encoded encrypted data + try: + json.loads(bytes.fromhex(resp_hex)) + _LOGGER.info("Response is not encrypted (despite encryption=1 in request), no need to brute force " + "the key") + return Encryption.ENCRYPTION_WITHOUT_KEY, None + except JSONDecodeError: + _LOGGER.info("Brute force decryption key from the encrypted response...") + _LOGGER.debug(f"Response: {resp_hex}") + key = find_key(bytes.fromhex(resp_hex)) + if key is None: + raise ValueError("Couldn't brute force key") + + _LOGGER.info("Using key with encrypted=1 for future requests") + return Encryption.ENCRYPTION, key + + +def _status_url(device_ip: str, use_encryption: bool) -> str: + return f"http://{device_ip}/http-read.json?encrypted={1 if use_encryption else 0}" diff --git a/custom_components/candy/client/decryption.py b/custom_components/candy/client/decryption.py new file mode 100644 index 0000000..f627dea --- /dev/null +++ b/custom_components/candy/client/decryption.py @@ -0,0 +1,63 @@ +import itertools +import json +import logging +import math +import string +from enum import Enum + +from typing import Optional, Iterable + +# Adapted from https://www.online-python.com/pm93n5Sqg4 + +_LOGGER = logging.getLogger(__name__) + +KEY_LEN = 16 +KEY_CHARSET_CODEPOINTS: list[int] = [ord(c) for c in string.ascii_letters + string.digits] +PLAINTEXT_CHARSET_CODEPOINTS: list[int] = [ord(c) for c in string.printable] + + +class Encryption(Enum): + NO_ENCRYPTION = 1 # Use `encrypted=0` in request, response is plaintext JSON + ENCRYPTION = 2 # Use `encrypted=1` in request, response is encrypted bytes in hex encoding + ENCRYPTION_WITHOUT_KEY = 3 # Use `encrypted=1` in request, response is unencrypted hex bytes (https://github.com/ofalvai/home-assistant-candy/issues/35#issuecomment-965557116) + + +def find_key(encrypted_response: bytes) -> Optional[str]: + candidate_key_codepoints: list[list[int]] = [ + list(_find_candidate_key_codepoints(encrypted_response, i)) for i in range(16) + ] + + number_of_keys = math.prod(len(l) for l in candidate_key_codepoints) + _LOGGER.info(f"{number_of_keys} keys to test") + + for key in itertools.product(*candidate_key_codepoints): + decrypted = decrypt(key, encrypted_response) + if _is_valid_json(decrypted): + key_str = "".join(chr(point) for point in key) + _LOGGER.info(f"Potential key found: {key_str}") + return key_str + + return None + + +def decrypt(key: bytes, encrypted_response: bytes) -> bytes: + key_len = len(key) + decrypted: list[int] = [] + for (i, byte) in enumerate(encrypted_response): + decrypted.append(byte ^ key[i % key_len]) + return bytes(decrypted) + + +def _find_candidate_key_codepoints(encrypted_response: bytes, key_offset: int) -> Iterable[int]: + bytes_to_check: bytes = encrypted_response[key_offset::KEY_LEN] + for point in KEY_CHARSET_CODEPOINTS: + if all(point ^ byte in PLAINTEXT_CHARSET_CODEPOINTS for byte in bytes_to_check): + yield point + + +def _is_valid_json(decrypted: bytes) -> bool: + try: + json.loads(decrypted) + except json.JSONDecodeError: + return False + return True diff --git a/custom_components/candy/config_flow.py b/custom_components/candy/config_flow.py index e066561..2b5aff0 100644 --- a/custom_components/candy/config_flow.py +++ b/custom_components/candy/config_flow.py @@ -4,31 +4,24 @@ import logging from typing import Any +import async_timeout import voluptuous as vol - from homeassistant import config_entries -from homeassistant.core import HomeAssistant, callback from homeassistant.const import CONF_IP_ADDRESS, CONF_PASSWORD from homeassistant.data_entry_flow import FlowResult -from homeassistant.helpers import config_validation as cv +from homeassistant.helpers.aiohttp_client import async_get_clientsession +from .client import detect_encryption +from .client.decryption import Encryption from .const import * _LOGGER = logging.getLogger(__name__) STEP_DATA_SCHEMA = vol.Schema({ vol.Required(CONF_IP_ADDRESS): str, - vol.Required(CONF_KEY_USE_ENCRYPTION, default=True): bool, - vol.Optional(CONF_PASSWORD): str }) -async def validate_input(hass: HomeAssistant, data: dict[str, Any]) -> bool: - """Validate the user input allows us to connect.""" - # Everything is validated in the schema - return True - - class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): """Handle a config flow for Candy.""" @@ -40,19 +33,33 @@ async def async_step_user( ) -> FlowResult: """Handle the initial step.""" if user_input is None: - return self.async_show_form( - step_id="user", data_schema=STEP_DATA_SCHEMA - ) + return self.async_show_form(step_id="user", data_schema=STEP_DATA_SCHEMA) - errors = {} + config_data = { + CONF_IP_ADDRESS: user_input[CONF_IP_ADDRESS] + } + errors = {} try: - await validate_input(self.hass, user_input) - except Exception: # pylint: disable=broad-except - _LOGGER.exception("Unexpected exception") - errors["base"] = "unknown" + async with async_timeout.timeout(40): + encryption_type, key = await detect_encryption( + session=async_get_clientsession(self.hass), + device_ip=user_input[CONF_IP_ADDRESS] + ) + except Exception as e: # pylint: disable=broad-except + _LOGGER.exception(e) + errors["base"] = "detect_encryption" else: - return self.async_create_entry(title=CONF_INTEGRATION_TITLE, data=user_input) + if encryption_type == Encryption.ENCRYPTION: + config_data[CONF_KEY_USE_ENCRYPTION] = True + config_data[CONF_PASSWORD] = key + elif encryption_type == Encryption.NO_ENCRYPTION: + config_data[CONF_KEY_USE_ENCRYPTION] = False + elif encryption_type == Encryption.ENCRYPTION_WITHOUT_KEY: + config_data[CONF_KEY_USE_ENCRYPTION] = True + config_data[CONF_PASSWORD] = "" + + return self.async_create_entry(title=CONF_INTEGRATION_TITLE, data=config_data) return self.async_show_form( step_id="user", data_schema=STEP_DATA_SCHEMA, errors=errors diff --git a/custom_components/candy/strings.json b/custom_components/candy/strings.json index ff0d916..0b73efc 100644 --- a/custom_components/candy/strings.json +++ b/custom_components/candy/strings.json @@ -11,7 +11,8 @@ } }, "error": { - "unknown": "[%key:common::config_flow::error::unknown%]" + "unknown": "[%key:common::config_flow::error::unknown%]", + "detect_encryption": "Failed to detect encryption, check logs" }, "abort": { "already_configured": "[%key:common::config_flow::abort::already_configured_device%]" diff --git a/custom_components/candy/translations/en.json b/custom_components/candy/translations/en.json index 18bc480..78a962a 100644 --- a/custom_components/candy/translations/en.json +++ b/custom_components/candy/translations/en.json @@ -4,7 +4,8 @@ "already_configured": "Device is already configured" }, "error": { - "unknown": "Unexpected error" + "unknown": "Unexpected error", + "detect_encryption": "Failed to detect encryption, check logs" }, "step": { "user": { diff --git a/tests/common.py b/tests/common.py index e0777f1..e288ca1 100644 --- a/tests/common.py +++ b/tests/common.py @@ -1,12 +1,19 @@ import aresponses -from pytest_homeassistant_custom_component.common import load_fixture, MockConfigEntry from homeassistant.const import CONF_IP_ADDRESS, CONF_PASSWORD from homeassistant.core import HomeAssistant +from pytest_homeassistant_custom_component.common import load_fixture, MockConfigEntry from custom_components.candy import DOMAIN, CONF_KEY_USE_ENCRYPTION TEST_IP = "192.168.0.66" -TEST_ENCRYPTION_KEY = "" +TEST_ENCRYPTION_KEY_EMPTY = "" +TEST_ENCRYPTION_KEY = "fbfjlbmmfklfaikm" +TEST_ENCRYPTED_HEX_RESPONSE = """ +1D6F6C634E11190C121E1F2A001F0A19140B050F4E5816606C62654436002D043516071E19114F57445B4E4A6C636264442714184E584F5D4447616C6860492007010E270840574F5749406B6B60624F361044504E534F416B61656F4339193D0E405C485C4041606C626544311B2802020744504E535E5B4447616C6860493E2A07100F0040574F5649406B6B60624F32070B1A4E584F595649406B6B60624F35120F043F124F5744534E4A6C636264442D161E5D40574F5649406B6B60624F291212584E584F5D4447616C686049221616554856405D4F4A66666F684B241D125644504E524F416B61656F43261B1953405C485C4041606C6265442E191F5B4458445A4E4E60676F624E29111D5C4F5C405648406F67646F49231615514957445244466168646444241C12584B514F56404A67666B644F351F09070C4B514F56404A67666B644F221915324353495D444E6B60656B4F2903073A070D4B514F56404A67666B644F340E013208040E4F5C40545958524F416B61656F433B0E0E0F1203230840574F5649406B6B60624F2A03080D4E584F5C4447616C6860492B0F0E0A384E584F5D4447616C686049290F11320F1F16220344514E56434566676F6B442E05113908151F3E03124B514F56404A67666B644F250309050A3C1B3E1203120F4E584F5D4447616C68604939563544504E524F416B61656F433D223A4458445A4E4E60676F624E32513B495744524446616864644405190B274B514F56404A67666B644F13050E204353495D444E6B60656B4F1808092F445B4B5B4F4A6F6C6365402319053C4E5C43585C59444E6B60656B4F23120828445B4B5D4F4A6F6C636540000212384E5C435949416B686F634E233D3E090D0A445B4B5B4F4A6F6C6365402C3D350D1E03104B514F5053505D5D4041606C62654402010A1F122E44504E524F606C62116B6B14 +""" +TEST_UNENCRYPTED_HEX_RESPONSE = """ +7B0D0A0922737461747573466F726E6F223A7B0D0A090922537461746F57694669223A2230222C0D0A090922436F646963654572726F7265223A224530222C0D0A0909225265636970654964223A2230222C0D0A09092252656369706553746570223A2230222C0D0A090922537461727453746F70223A2230222C0D0A0909225061757361223A2230222C0D0A0909225369637572657A7A6142616D62696E69223A2230222C0D0A09092253656C6574746F7265223A2230222C0D0A09092250726F6772616D223A2230222C0D0A09092254656D70536574223A2230222C0D0A09092254656D7052656164223A22323130222C0D0A09092254656D705365745261676769756E7461223A2230222C0D0A09092244656C61795374617274223A2230222C0D0A09092252656D61696E696E6754696D6550726F6772223A223635353335222C0D0A0909226F7261223A223135222C0D0A0909226D696E223A223232222C0D0A090922736563223A223138222C0D0A0909224657766572223A2230303141222C0D0A0909227473223A2230220D0A097D0D0A7D +""" def status_response(filename): @@ -23,7 +30,7 @@ async def init_integration(hass: HomeAssistant, aioclient_mock, status_response: data={ CONF_IP_ADDRESS: "192.168.0.66", CONF_KEY_USE_ENCRYPTION: False, - CONF_PASSWORD: "asdasdasd", + CONF_PASSWORD: "", } ) diff --git a/tests/conftest.py b/tests/conftest.py index f733ad4..7f97f1b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -38,26 +38,3 @@ def skip_notifications_fixture(): "homeassistant.components.persistent_notification.async_dismiss" ): yield - - -# This fixture, when used, will result in calls to async_get_data to return None. To have the call -# return a value, we would add the `return_value=` parameter to the patch call. -@pytest.fixture(name="bypass_get_data") -def bypass_get_data_fixture(): - """Skip calls to get data from API.""" - with patch( - "custom_components.integration_blueprint.IntegrationBlueprintApiClient.async_get_data" - ): - yield - - -# In this fixture, we are forcing calls to async_get_data to raise an Exception. This is useful -# for exception handling. -@pytest.fixture(name="error_on_get_data") -def error_get_data_fixture(): - """Simulate error when retrieving data from API.""" - with patch( - "custom_components.integration_blueprint.IntegrationBlueprintApiClient.async_get_data", - side_effect=Exception, - ): - yield \ No newline at end of file diff --git a/tests/test_client.py b/tests/test_client.py index 26dc639..87f2ba2 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1,11 +1,10 @@ import aiohttp - import pytest from aresponses import ResponsesMockServer -from .common import TEST_IP, TEST_ENCRYPTION_KEY, status_response -from custom_components.candy.client import CandyClient +from custom_components.candy.client import CandyClient, detect_encryption, Encryption from custom_components.candy.client.model import MachineState, WashProgramState, WashingMachineStatus +from .common import * @pytest.mark.asyncio @@ -18,7 +17,7 @@ async def test_idle(aresponses: ResponsesMockServer): ) async with aiohttp.ClientSession() as session: - client = CandyClient(session, device_ip=TEST_IP, encryption_key=TEST_ENCRYPTION_KEY, use_encryption=False) + client = CandyClient(session, device_ip=TEST_IP, encryption_key=TEST_ENCRYPTION_KEY_EMPTY, use_encryption=False) status = await client.status() assert type(status) is WashingMachineStatus @@ -40,7 +39,7 @@ async def test_delayed_start_wait(aresponses: ResponsesMockServer): ) async with aiohttp.ClientSession() as session: - client = CandyClient(session, device_ip=TEST_IP, encryption_key=TEST_ENCRYPTION_KEY, use_encryption=False) + client = CandyClient(session, device_ip=TEST_IP, encryption_key=TEST_ENCRYPTION_KEY_EMPTY, use_encryption=False) status = await client.status() assert type(status) is WashingMachineStatus @@ -59,10 +58,77 @@ async def test_no_fillr_property(aresponses: ResponsesMockServer): ) async with aiohttp.ClientSession() as session: - client = CandyClient(session, device_ip=TEST_IP, encryption_key=TEST_ENCRYPTION_KEY, use_encryption=False) + client = CandyClient(session, device_ip=TEST_IP, encryption_key=TEST_ENCRYPTION_KEY_EMPTY, use_encryption=False) status = await client.status() assert type(status) is WashingMachineStatus assert status.machine_state is MachineState.IDLE assert status.fill_percent is None + +@pytest.mark.asyncio +async def test_detect_no_encryption(aresponses: ResponsesMockServer): + aresponses.add( + TEST_IP, + path_pattern="/http-read.json?encrypted=0", + response=status_response("washing_machine/idle.json"), + match_querystring=True + ) + + async with aiohttp.ClientSession() as session: + encryption_type, key = await detect_encryption(session, TEST_IP) + + assert encryption_type is Encryption.NO_ENCRYPTION + assert key is None + + aresponses.assert_plan_strictly_followed() + + +@pytest.mark.asyncio +async def test_detect_encryption_key(aresponses: ResponsesMockServer): + aresponses.add( + TEST_IP, + path_pattern="/http-read.json?encrypted=0", + response={"response": "BAD REQUEST"}, + match_querystring=True + ) + + aresponses.add( + TEST_IP, + path_pattern="/http-read.json?encrypted=1", + response=TEST_ENCRYPTED_HEX_RESPONSE, + match_querystring=True + ) + + async with aiohttp.ClientSession() as session: + encryption_type, key = await detect_encryption(session, TEST_IP) + + assert encryption_type is Encryption.ENCRYPTION + assert key == TEST_ENCRYPTION_KEY + + aresponses.assert_plan_strictly_followed() + + +@pytest.mark.asyncio +async def test_detect_encryption_without_key(aresponses: ResponsesMockServer): + aresponses.add( + TEST_IP, + path_pattern="/http-read.json?encrypted=0", + response={"response": "BAD REQUEST"}, + match_querystring=True + ) + + aresponses.add( + TEST_IP, + path_pattern="/http-read.json?encrypted=1", + response=TEST_UNENCRYPTED_HEX_RESPONSE, + match_querystring=True + ) + + async with aiohttp.ClientSession() as session: + encryption_type, key = await detect_encryption(session, TEST_IP) + + assert encryption_type is Encryption.ENCRYPTION_WITHOUT_KEY + assert key is None + + aresponses.assert_plan_strictly_followed() diff --git a/tests/test_config_flow.py b/tests/test_config_flow.py new file mode 100644 index 0000000..d29b4d0 --- /dev/null +++ b/tests/test_config_flow.py @@ -0,0 +1,159 @@ +from unittest.mock import patch + +import pytest +from homeassistant import config_entries, data_entry_flow +from homeassistant.const import CONF_IP_ADDRESS, CONF_PASSWORD + +from custom_components.candy import DOMAIN, CONF_KEY_USE_ENCRYPTION +from custom_components.candy.client import Encryption + + +# This fixture bypasses the actual setup of the integration +# since we only want to test the config flow. We test the +# actual functionality of the integration in other test modules. +@pytest.fixture(autouse=True) +def bypass_setup_fixture(): + """Prevent setup.""" + with patch( + "custom_components.candy.async_setup_entry", + return_value=True, + ): + yield + + +@pytest.fixture(name="detect_no_encryption", autouse=False) +def detect_no_encryption_fixture(): + with patch( + "custom_components.candy.config_flow.detect_encryption", + return_value=(Encryption.NO_ENCRYPTION, None) + ): + yield + + +@pytest.fixture(name="detect_encryption_find_key", autouse=False) +def detect_encryption_find_key_fixture(): + with patch( + "custom_components.candy.config_flow.detect_encryption", + return_value=(Encryption.ENCRYPTION, "testkey") + ): + yield + + +@pytest.fixture(name="detect_encryption_key_not_found", autouse=False) +def detect_encryption_key_not_found_fixture(): + with patch( + "custom_components.candy.config_flow.detect_encryption", + side_effect=ValueError + ): + yield + + +@pytest.fixture(name="detect_encryption_without_key", autouse=False) +def detect_encryption_without_key_fixture(): + with patch( + "custom_components.candy.config_flow.detect_encryption", + return_value=(Encryption.ENCRYPTION_WITHOUT_KEY, None) + ): + yield + + +async def test_no_encryption_detected(hass, detect_no_encryption): + """Test a successful config flow when detected encryption is no encryption.""" + + # Initialize a config flow + result = await hass.config_entries.flow.async_init( + DOMAIN, context={"source": config_entries.SOURCE_USER} + ) + + # Check that the config flow shows the user form as the first step + assert result["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result["step_id"] == "user" + + result = await hass.config_entries.flow.async_configure( + result["flow_id"], user_input={CONF_IP_ADDRESS: "192.168.0.66"} + ) + + # Check that the config flow is complete and a new entry is created with + # the input data + assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY + assert result["title"] == "Candy" + assert result["data"] == { + CONF_IP_ADDRESS: "192.168.0.66", + CONF_KEY_USE_ENCRYPTION: False, + } + assert result["result"] + + +async def test_detected_encryption_and_key_found(hass, detect_encryption_find_key): + """Test a successful config flow when encryption is detected and key is found.""" + + # Initialize a config flow + result = await hass.config_entries.flow.async_init( + DOMAIN, context={"source": config_entries.SOURCE_USER} + ) + + # Check that the config flow shows the user form as the first step + assert result["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result["step_id"] == "user" + + result = await hass.config_entries.flow.async_configure( + result["flow_id"], user_input={CONF_IP_ADDRESS: "192.168.0.66"} + ) + + # Check that the config flow is complete and a new entry is created with + # the input data + assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY + assert result["title"] == "Candy" + assert result["data"] == { + CONF_IP_ADDRESS: "192.168.0.66", + CONF_KEY_USE_ENCRYPTION: True, + CONF_PASSWORD: "testkey" + } + assert result["result"] + + +async def test_detected_encryption_and_key_not_found(hass, detect_encryption_key_not_found): + """Test a failing config flow when encryption is detected and key is not found.""" + # Initialize a config flow + result = await hass.config_entries.flow.async_init( + DOMAIN, context={"source": config_entries.SOURCE_USER} + ) + + # Check that the config flow shows the user form as the first step + assert result["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result["step_id"] == "user" + + result = await hass.config_entries.flow.async_configure( + result["flow_id"], user_input={CONF_IP_ADDRESS: "192.168.0.66"} + ) + + assert result["type"] == data_entry_flow.RESULT_TYPE_FORM + + assert result["errors"] == {"base": "detect_encryption"} + + +async def test_detected_encryption_without_key(hass, detect_encryption_without_key): + """Test a successful config flow when encryption is detected without using a key.""" + # Initialize a config flow + result = await hass.config_entries.flow.async_init( + DOMAIN, context={"source": config_entries.SOURCE_USER} + ) + + # Check that the config flow shows the user form as the first step + assert result["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result["step_id"] == "user" + + result = await hass.config_entries.flow.async_configure( + result["flow_id"], user_input={CONF_IP_ADDRESS: "192.168.0.66"} + ) + + # Check that the config flow is complete and a new entry is created with + # the input data + assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY + assert result["title"] == "Candy" + assert result["data"] == { + CONF_IP_ADDRESS: "192.168.0.66", + CONF_KEY_USE_ENCRYPTION: True, + CONF_PASSWORD: "" + } + assert result["result"] diff --git a/tests/test_decryption.py b/tests/test_decryption.py new file mode 100644 index 0000000..7298b34 --- /dev/null +++ b/tests/test_decryption.py @@ -0,0 +1,29 @@ + +from custom_components.candy.client.decryption import find_key + + +def test_find_key_1(): + response = bytes.fromhex(""" +1D6F6C634E11190C121E1F2A001F0A19140B050F4E5816606C62654436002D043516071E19114F57445B4E4A6C636264442714184E584F5D4447616C6860492007010E270840574F5749406B6B60624F361044504E534F416B61656F4339193D0E405C485C4041606C626544311B2802020744504E535E5B4447616C6860493E2A07100F0040574F5649406B6B60624F32070B1A4E584F595649406B6B60624F35120F043F124F5744534E4A6C636264442D161E5D40574F5649406B6B60624F291212584E584F5D4447616C686049221616554856405D4F4A66666F684B241D125644504E524F416B61656F43261B1953405C485C4041606C6265442E191F5B4458445A4E4E60676F624E29111D5C4F5C405648406F67646F49231615514957445244466168646444241C12584B514F56404A67666B644F351F09070C4B514F56404A67666B644F221915324353495D444E6B60656B4F2903073A070D4B514F56404A67666B644F340E013208040E4F5C40545958524F416B61656F433B0E0E0F1203230840574F5649406B6B60624F2A03080D4E584F5C4447616C6860492B0F0E0A384E584F5D4447616C686049290F11320F1F16220344514E56434566676F6B442E05113908151F3E03124B514F56404A67666B644F250309050A3C1B3E1203120F4E584F5D4447616C68604939563544504E524F416B61656F433D223A4458445A4E4E60676F624E32513B495744524446616864644405190B274B514F56404A67666B644F13050E204353495D444E6B60656B4F1808092F445B4B5B4F4A6F6C6365402319053C4E5C43585C59444E6B60656B4F23120828445B4B5D4F4A6F6C636540000212384E5C435949416B686F634E233D3E090D0A445B4B5B4F4A6F6C6365402C3D350D1E03104B514F5053505D5D4041606C62654402010A1F122E44504E524F606C62116B6B14 +""") + key = find_key(response) + + assert key == "fbfjlbmmfklfaikm" + + +def test_find_key_2(): + response = bytes.fromhex(""" +2F7C6B441B390C094C3C42093A023429764B1A403343714A6B3D503902342E073D535B6F086854653240386F2E0C23283714243F4B250A0D1A7313085D416B4C5E78686F6A3E191C570D662C1E0B657B76434361344071611A0454390C2026333D120E6F0368484A14443B44644114353503151E4D25084A026B016F416E4D485D53353F5C23163D562613774F53656D597B68441B0F1B071A73137D4F4F4A4B5D78431D4B251F1A592413774F337263787C6B4430683D104C3B50091F1A657B76414361344071611A0641280327282E263E11391B705A581A653C47646A6505311D00346A3E191A4C6B0B6F5D416B4C5E78686F6B2F153C5124546F5741767364534D403343714A7520423E3E022B35764B437C1B66756231401300041034133D1F12281B705A581A653C47646A650E24140F0956250A4A026B016F416E4D485D5333284A2F0C4A026B016F416E4D485D5322255C29133D486B0B6F5D416B4C5E78686F4B7B5A521A7B136160694E487603536F0368484A14443B4464413572764B437F1B6675623140133F59417D6365534D403343714A4A7C13774F53656D597B68441B384E4A026B016F416E4D485D53137A1B705A5B1A653C47646A65336C535B6F086854653240386F1F5A657B763F3401756854653240386F1F5272636E53506F3440711535434C +""") + key = find_key(response) + + assert key == "TqaM9Jxh8I1MmcGA" + + +def test_find_key_error(): + response = bytes.fromhex(""" +2F7C6B441B390C094C3C42093A023429764B1A403343714A6B3D503902342E073D535B6F086854653240386F2E0C23283714243F4B250A0D1A7313085D416B4C5E78686F6A3E191C570D662C1E0B657B76434361344071611A0454390C2026333D120E6F0368484A14443B44644114353503151E4D25084A026B016F416E4D485D53353F5C23163D562613774F53656D597B68441B0F1B071A73137D4F4F4A4B5D78431D4B251F1A592413774F337263787C6B4430683D104C3B50091F1A657B76414361344071611A0641280327282E263E11391B705A581A653C47646A6505311D00346A3E191A4C6B0B6F5D416B4C5E78686F6B2F153C5124546F5741767364534D403343714A7520423E3E022B35764B437C1B66756231401300041034133D1F12281B705A581A653C47646A650E24140F0956250A4A026B016F416E4D485D5333284A2F0C4A026B016F416E4D485D5322255C29133D486B0B6F5D416B4C5E78686F4B7B5A521A7B136160694E487603536F0368484A14443B4464413572764B437F1B6675623140133F59417D6365534D403343714A4A7C13774F53656D597B68441B384E4A026B016F416E4D485D53137A1B705A5B1A653C47646A65336C535B6F086854653240386F1F5A657B763F3401756854653240386F1F527263 +""") + key = find_key(response) + + assert key is None