diff --git a/custom_components/linkytic/__init__.py b/custom_components/linkytic/__init__.py index 5616d82..f6685f3 100644 --- a/custom_components/linkytic/__init__.py +++ b/custom_components/linkytic/__init__.py @@ -1,11 +1,14 @@ """The linkytic integration.""" + from __future__ import annotations +import asyncio import logging -from homeassistant.config_entries import ConfigEntry +from homeassistant.config_entries import ConfigEntry, ConfigEntryNotReady from homeassistant.const import EVENT_HOMEASSISTANT_STOP, Platform from homeassistant.core import HomeAssistant +from homeassistant.components import usb from .const import ( DOMAIN, @@ -15,6 +18,7 @@ SETUP_TICMODE, SETUP_PRODUCER, TICMODE_STANDARD, + LINKY_IO_ERRORS, ) from .serial_reader import LinkyTICReader @@ -26,15 +30,39 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: """Set up linkytic from a config entry.""" # Create the serial reader thread and start it - serial_reader = LinkyTICReader( - title=entry.title, - port=entry.data.get(SETUP_SERIAL), - std_mode=entry.data.get(SETUP_TICMODE) == TICMODE_STANDARD, - producer_mode=entry.data.get(SETUP_PRODUCER), - three_phase=entry.data.get(SETUP_THREEPHASE), - real_time=entry.options.get(OPTIONS_REALTIME), - ) - serial_reader.start() + port = entry.data.get(SETUP_SERIAL) + try: + serial_reader = LinkyTICReader( + title=entry.title, + port=port, + std_mode=entry.data.get(SETUP_TICMODE) == TICMODE_STANDARD, + producer_mode=entry.data.get(SETUP_PRODUCER), + three_phase=entry.data.get(SETUP_THREEPHASE), + real_time=entry.options.get(OPTIONS_REALTIME), + ) + serial_reader.start() + + async def read_serial_number(serial: LinkyTICReader): + while serial.serial_number is None: + await asyncio.sleep(1) + return serial.serial_number + + s_n = await asyncio.wait_for(read_serial_number(serial_reader), timeout=5) + # TODO: check if S/N is the one saved in config entry, if not this is a different meter! + + # Error when opening serial port. + except LINKY_IO_ERRORS as e: + raise ConfigEntryNotReady(f"Couldn't open serial port {port}: {e}") from e + + # Timeout waiting for S/N to be read. + except TimeoutError as e: + serial_reader.signalstop("linkytic_timeout") + raise ConfigEntryNotReady( + "Connected to serial port but coulnd't read serial number before timeout: check if TIC is connected and active." + ) from e + + _LOGGER.info(f"Device connected with serial number: {s_n}") + hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, serial_reader.signalstop) # Add options callback entry.async_on_unload(entry.add_update_listener(update_listener)) @@ -71,3 +99,32 @@ async def update_listener(hass: HomeAssistant, entry: ConfigEntry): return # Update its options serial_reader.update_options(entry.options.get(OPTIONS_REALTIME)) + + +async def async_migrate_entry(hass: HomeAssistant, config_entry: ConfigEntry): + """Migrate old entry.""" + _LOGGER.info("Migrating from version %d.%d", config_entry.version, config_entry.minor_version) + + if config_entry.version == 1: + new = {**config_entry.data} + + if config_entry.minor_version < 2: + # Migrate to serial by-id. + serial_by_id = await hass.async_add_executor_job(usb.get_serial_by_id, new[SETUP_SERIAL]) + if serial_by_id == new[SETUP_SERIAL]: + _LOGGER.warning( + f"Couldn't find a persistent /dev/serial/by-id alias for {serial_by_id}. " + "Problems might occur at startup if device names are not persistent." + ) + else: + new[SETUP_SERIAL] = serial_by_id + + config_entry.minor_version = 2 + hass.config_entries.async_update_entry(config_entry, data=new) + + _LOGGER.info( + "Migration to version %d.%d successful", + config_entry.version, + config_entry.minor_version, + ) + return True diff --git a/custom_components/linkytic/binary_sensor.py b/custom_components/linkytic/binary_sensor.py index 79b9525..2b003d6 100644 --- a/custom_components/linkytic/binary_sensor.py +++ b/custom_components/linkytic/binary_sensor.py @@ -1,8 +1,9 @@ """Binary sensors for linkytic integration.""" + from __future__ import annotations -import asyncio import logging +from typing import Optional, cast from homeassistant.components.binary_sensor import ( BinarySensorDeviceClass, @@ -10,21 +11,63 @@ ) from homeassistant.config_entries import ConfigEntry from homeassistant.core import HomeAssistant -from homeassistant.helpers.entity import DeviceInfo, EntityCategory +from homeassistant.helpers.entity import EntityCategory from homeassistant.helpers.entity_platform import AddEntitiesCallback -from .const import ( - DID_CONNECTION_TYPE, - DID_CONSTRUCTOR, - DID_DEFAULT_NAME, - DID_REGNUMBER, - DID_TYPE, - DOMAIN, -) +from .const import DOMAIN, SETUP_TICMODE, TICMODE_STANDARD from .serial_reader import LinkyTICReader +from .entity import LinkyTICEntity +from .status_register import StatusRegister _LOGGER = logging.getLogger(__name__) +STATUS_REGISTER_SENSORS = ( + ( + StatusRegister.CONTACT_SEC, + "Contact sec", + BinarySensorDeviceClass.OPENING, + "mdi:electric-switch-closed", + "mdi-electric-switch", + False, + ), + ( + StatusRegister.ETAT_DU_CACHE_BORNE_DISTRIBUTEUR, + "Cache-borne", + BinarySensorDeviceClass.OPENING, + "mdi:toy-brick", + "mdi:toy-brick-outline", + False, + ), + ( + StatusRegister.SURTENSION_SUR_UNE_DES_PHASES, + "Surtension", + BinarySensorDeviceClass.PRESENCE, + "mdi:flash-triangle-outline", + "mdi:flash-triangle", + False, + ), + ( + StatusRegister.DEPASSEMENT_PUISSANCE_REFERENCE, + "Dépassement puissance", + BinarySensorDeviceClass.PRESENCE, + "mdi:transmission-tower", + "mdi:transmission-tower-off", + False, + ), + (StatusRegister.PRODUCTEUR_CONSOMMATEUR, "Producteur", None, "mdi:transmission-tower-export", None, False), + (StatusRegister.SENS_ENERGIE_ACTIVE, "Sens énergie active", None, "mdi:transmission-tower-export", None, False), + ( + StatusRegister.MODE_DEGRADE_HORLOGE, + "Synchronisation horloge", + BinarySensorDeviceClass.LOCK, + "mdi:sync", + "mdi:sync-off", + False, + ), + (StatusRegister.MODE_TIC, "Mode historique", None, "mdi:tag", None, False), + (StatusRegister.SYNCHRO_CPL, "Synchronisation CPL", BinarySensorDeviceClass.LOCK, "mdi:sync", "mdi:sync-off", True), +) + # config flow setup async def async_setup_entry( @@ -43,67 +86,152 @@ async def async_setup_entry( config_entry.title, ) return - # Wait a bit for the controller to feed on serial frames (home assistant warns after 10s) - _LOGGER.debug( - "%s: waiting at most 9s before setting up binary sensor plateform in order for the async serial reader to have time to parse a full frame", - config_entry.title, - ) - for i in range(9): - await asyncio.sleep(1) - if serial_reader.has_read_full_frame(): - _LOGGER.debug( - "%s: a full frame has been read, initializing sensors", - config_entry.title, - ) - break - if i == 8: - _LOGGER.warning( - "%s: wait time is over but a full frame has yet to be read: initializing sensors anyway", - config_entry.title, - ) # Init sensors - async_add_entities( - [SerialConnectivity(config_entry.title, config_entry.entry_id, serial_reader)], - True, - ) + sensors: list[BinarySensorEntity] = [SerialConnectivity(config_entry.title, config_entry.entry_id, serial_reader)] + + if config_entry.data.get(SETUP_TICMODE) == TICMODE_STANDARD: + sensors.extend( + StatusRegisterBinarySensor( + name=name, + config_title=config_entry.title, + field=field, + serial_reader=serial_reader, + unique_id=config_entry.entry_id, + device_class=devclass, + icon_off=icon_off, + icon_on=icon_on, + inverted=inverted, + ) + for field, name, devclass, icon_off, icon_on, inverted in STATUS_REGISTER_SENSORS + ) + async_add_entities(sensors, True) -class SerialConnectivity(BinarySensorEntity): + +class SerialConnectivity(LinkyTICEntity, BinarySensorEntity): """Serial connectivity to the Linky TIC serial interface.""" # Generic properties # https://developers.home-assistant.io/docs/core/entity#generic-properties - _attr_has_entity_name = True _attr_entity_category = EntityCategory.DIAGNOSTIC _attr_name = "Connectivité du lien série" - _attr_should_poll = True # Binary sensor properties # https://developers.home-assistant.io/docs/core/entity/binary-sensor/#properties _attr_device_class = BinarySensorDeviceClass.CONNECTIVITY - def __init__( - self, title: str, uniq_id: str | None, serial_reader: LinkyTICReader - ) -> None: + def __init__(self, title: str, unique_id: str, serial_reader: LinkyTICReader) -> None: """Initialize the SerialConnectivity binary sensor.""" _LOGGER.debug("%s: initializing Serial Connectivity binary sensor", title) + super().__init__(serial_reader) self._title = title - self._attr_unique_id = f"{DOMAIN}_{uniq_id}_serial_connectivity" - self._serial_controller = serial_reader - self._device_uniq_id = uniq_id if uniq_id is not None else "yaml_legacy" + self._attr_unique_id = f"{DOMAIN}_{unique_id}_serial_connectivity" @property - def device_info(self) -> DeviceInfo: - """Return the device info.""" - return DeviceInfo( - # connections={(DID_CONNECTION_TYPE, self._serial_controller._port)}, - identifiers={(DOMAIN, self._serial_controller.device_identification[DID_REGNUMBER])}, - manufacturer=self._serial_controller.device_identification[DID_CONSTRUCTOR], - model=self._serial_controller.device_identification[DID_TYPE], - name=DID_DEFAULT_NAME, - ) + def is_on(self) -> bool: + """Value of the sensor.""" + return self._serial_controller.is_connected + + +class StatusRegisterBinarySensor(LinkyTICEntity, BinarySensorEntity): + """Binary sensor for binary status register fields.""" + + _attr_entity_category = EntityCategory.DIAGNOSTIC + + _binary_state: bool + _tag = "STGE" + + def __init__( + self, + name: str, + config_title: str, + unique_id: str, + serial_reader: LinkyTICReader, + field: StatusRegister, + device_class: BinarySensorDeviceClass | None = None, + icon_on: str | None = None, + icon_off: str | None = None, + inverted: bool = False, + ) -> None: + """Initialize the status register binary sensor.""" + _LOGGER.debug("%s: initializing %s binary sensor", config_title, field.name) + super().__init__(serial_reader) + + self._config_title = config_title + self._binary_state = False # Default state. + self._inverted = inverted + self._field = field + self._attr_name = name + self._attr_unique_id = f"{DOMAIN}_{unique_id}_{field.name.lower()}" + if device_class: + self._attr_device_class = device_class + + self._icon_on = icon_on + self._icon_off = icon_off @property def is_on(self) -> bool: """Value of the sensor.""" - return self._serial_controller.is_connected() + return self._binary_state ^ self._inverted + + @property + def icon(self) -> str | None: + """Return icon of the sensor.""" + if not self._icon_off or not self._icon_on: + return self._icon_on or self._icon_off or super().icon + + if self.is_on: + return self._icon_on + else: + return self._icon_off + + def update(self) -> None: + """Update the state of the sensor.""" + value, _ = self._update() + if not value: + return + self._binary_state = cast(bool, self._field.value.get_status(value)) + + # TODO: factor _update function to remove copy from sensors entities + def _update(self) -> tuple[Optional[str], Optional[str]]: + """Get value and/or timestamp from cached data. Responsible for updating sensor availability.""" + value, timestamp = self._serial_controller.get_values(self._tag) + _LOGGER.debug( + "%s: retrieved %s value from serial controller: (%s, %s)", self._config_title, self._tag, value, timestamp + ) + + if not value and not timestamp: # No data returned. + if not self.available: + # Sensor is already unavailable, no need to check why. + return None, None + if not self._serial_controller.is_connected: + _LOGGER.debug( + "%s: marking the %s sensor as unavailable: serial connection lost", + self._config_title, + self._tag, + ) + self._attr_available = False + elif self._serial_controller.has_read_full_frame: + _LOGGER.info( + "%s: marking the %s sensor as unavailable: a full frame has been read but %s has not been found", + self._config_title, + self._tag, + self._tag, + ) + self._attr_available = False + else: + # A frame has not been read yet (it should!) or is already unavailable and no new data was fetched. + # Let sensor in current availability state. + pass + return None, None + + if not self.available: + # Data is available, so is sensor + self._attr_available = True + _LOGGER.info( + "%s: marking the %s sensor as available now !", + self._config_title, + self._tag, + ) + + return value, timestamp diff --git a/custom_components/linkytic/config_flow.py b/custom_components/linkytic/config_flow.py index 8e57d6c..79898fa 100644 --- a/custom_components/linkytic/config_flow.py +++ b/custom_components/linkytic/config_flow.py @@ -1,4 +1,5 @@ """Config flow for linkytic integration.""" + from __future__ import annotations # import dataclasses @@ -13,6 +14,7 @@ from homeassistant.core import callback from homeassistant.data_entry_flow import FlowResult from homeassistant.helpers import selector +from homeassistant.components import usb from .const import ( DOMAIN, @@ -35,21 +37,17 @@ STEP_USER_DATA_SCHEMA = vol.Schema( { - vol.Required(SETUP_SERIAL, default=SETUP_SERIAL_DEFAULT): str, - vol.Required(SETUP_TICMODE, default=TICMODE_HISTORIC): selector.SelectSelector( + vol.Required(SETUP_SERIAL, default=SETUP_SERIAL_DEFAULT): str, # type: ignore + vol.Required(SETUP_TICMODE, default=TICMODE_HISTORIC): selector.SelectSelector( # type: ignore selector.SelectSelectorConfig( options=[ - selector.SelectOptionDict( - value=TICMODE_HISTORIC, label=TICMODE_HISTORIC_LABEL - ), - selector.SelectOptionDict( - value=TICMODE_STANDARD, label=TICMODE_STANDARD_LABEL - ), + selector.SelectOptionDict(value=TICMODE_HISTORIC, label=TICMODE_HISTORIC_LABEL), + selector.SelectOptionDict(value=TICMODE_STANDARD, label=TICMODE_STANDARD_LABEL), ] ), ), - vol.Required(SETUP_PRODUCER, default=SETUP_PRODUCER_DEFAULT): bool, - vol.Required(SETUP_THREEPHASE, default=SETUP_THREEPHASE_DEFAULT): bool, + vol.Required(SETUP_PRODUCER, default=SETUP_PRODUCER_DEFAULT): bool, # type: ignore + vol.Required(SETUP_THREEPHASE, default=SETUP_THREEPHASE_DEFAULT): bool, # type: ignore } ) @@ -58,43 +56,41 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): """Handle a config flow for linkytic.""" VERSION = 1 + MINOR_VERSION = 2 - async def async_step_user( - self, user_input: dict[str, Any] | None = None - ) -> FlowResult: + async def async_step_user(self, user_input: dict[str, Any] | None = None) -> FlowResult: """Handle the initial step.""" # No input if user_input is None: - return self.async_show_form( - step_id="user", data_schema=STEP_USER_DATA_SCHEMA - ) + return self.async_show_form(step_id="user", data_schema=STEP_USER_DATA_SCHEMA) # Validate input await self.async_set_unique_id(DOMAIN + "_" + user_input[SETUP_SERIAL]) self._abort_if_unique_id_configured() + + # Search for serial/by-id, which SHOULD be a persistent name to serial interface. + _port = await self.hass.async_add_executor_job(usb.get_serial_by_id, user_input[SETUP_SERIAL]) + errors = {} title = user_input[SETUP_SERIAL] try: linky_tic_tester( - device=user_input[SETUP_SERIAL], + device=_port, std_mode=user_input[SETUP_TICMODE] == TICMODE_STANDARD, ) except CannotConnect as cannot_connect: _LOGGER.error("%s: can not connect: %s", title, cannot_connect) errors["base"] = "cannot_connect" except CannotRead as cannot_read: - _LOGGER.error( - "%s: can not read a line after connection: %s", title, cannot_read - ) + _LOGGER.error("%s: can not read a line after connection: %s", title, cannot_read) errors["base"] = "cannot_read" except Exception as exc: # pylint: disable=broad-except _LOGGER.exception("Unexpected exception: %s", exc) errors["base"] = "unknown" else: + user_input[SETUP_SERIAL] = _port return self.async_create_entry(title=title, data=user_input) - return self.async_show_form( - step_id="user", data_schema=STEP_USER_DATA_SCHEMA, errors=errors - ) + return self.async_show_form(step_id="user", data_schema=STEP_USER_DATA_SCHEMA, errors=errors) # async def async_step_usb(self, discovery_info: UsbServiceInfo) -> FlowResult: # """Handle a flow initialized by USB discovery.""" @@ -116,9 +112,7 @@ def __init__(self, config_entry: config_entries.ConfigEntry) -> None: """Initialize options flow.""" self.config_entry = config_entry - async def async_step_init( - self, user_input: dict[str, Any] | None = None - ) -> FlowResult: + async def async_step_init(self, user_input: dict[str, Any] | None = None) -> FlowResult: """Manage the options.""" if user_input is not None: return self.async_create_entry(title="", data=user_input) @@ -129,7 +123,7 @@ async def async_step_init( { vol.Required( OPTIONS_REALTIME, - default=self.config_entry.options.get(OPTIONS_REALTIME), + default=self.config_entry.options.get(OPTIONS_REALTIME), # type: ignore ): bool } ), diff --git a/custom_components/linkytic/const.py b/custom_components/linkytic/const.py index b1e1ee8..07d845d 100644 --- a/custom_components/linkytic/const.py +++ b/custom_components/linkytic/const.py @@ -1,9 +1,12 @@ """Constants for the linkytic integration.""" -import serial +from termios import error +from serial import SerialException, SEVENBITS, PARITY_EVEN, STOPBITS_ONE DOMAIN = "linkytic" +# Some termios exceptions are uncaught by pyserial +LINKY_IO_ERRORS = (SerialException, error) # Config Flow @@ -25,9 +28,9 @@ # Protocol configuration # # https://www.enedis.fr/media/2035/download -BYTESIZE = serial.SEVENBITS -PARITY = serial.PARITY_EVEN -STOPBITS = serial.STOPBITS_ONE +BYTESIZE = SEVENBITS +PARITY = PARITY_EVEN +STOPBITS = STOPBITS_ONE MODE_STANDARD_BAUD_RATE = 9600 MODE_STANDARD_FIELD_SEPARATOR = b"\x09" diff --git a/custom_components/linkytic/entity.py b/custom_components/linkytic/entity.py new file mode 100644 index 0000000..93543f7 --- /dev/null +++ b/custom_components/linkytic/entity.py @@ -0,0 +1,42 @@ +"""Entity for linkytic integration.""" + +from __future__ import annotations +from typing import cast + +from homeassistant.helpers.entity import Entity +from homeassistant.helpers.device_registry import DeviceInfo + +from .const import ( + DID_DEFAULT_MANUFACTURER, + DID_CONSTRUCTOR, + DID_DEFAULT_MODEL, + DID_DEFAULT_NAME, + DID_REGNUMBER, + DID_TYPE, + DOMAIN, +) +from .serial_reader import LinkyTICReader + + +class LinkyTICEntity(Entity): + """Base class for all linkytic entities.""" + + _serial_controller: LinkyTICReader + _attr_should_poll = True + _attr_has_entity_name = True + + def __init__(self, reader: LinkyTICReader): + """Init Linkytic entity.""" + self._serial_controller = reader + + @property + def device_info(self) -> DeviceInfo: + """Return a device description for device registry.""" + did = self._serial_controller.device_identification + + return DeviceInfo( + identifiers={(DOMAIN, cast(str, did.get(DID_REGNUMBER)))}, + manufacturer=did.get(DID_CONSTRUCTOR, DID_DEFAULT_MANUFACTURER), + model=did.get(DID_TYPE, DID_DEFAULT_MODEL), + name=DID_DEFAULT_NAME, + ) diff --git a/custom_components/linkytic/manifest.json b/custom_components/linkytic/manifest.json index 51fbc6c..f5a5642 100644 --- a/custom_components/linkytic/manifest.json +++ b/custom_components/linkytic/manifest.json @@ -5,7 +5,9 @@ "@hekmon" ], "config_flow": true, - "dependencies": [], + "dependencies": [ + "usb" + ], "documentation": "https://github.com/hekmon/linkytic/tree/v3.0.0-beta3", "iot_class": "local_polling", "issue_tracker": "https://github.com/hekmon/linkytic/issues", diff --git a/custom_components/linkytic/sensor.py b/custom_components/linkytic/sensor.py index 01b9bf6..d58dd78 100644 --- a/custom_components/linkytic/sensor.py +++ b/custom_components/linkytic/sensor.py @@ -1,10 +1,10 @@ """Sensors for Linky TIC integration.""" + from __future__ import annotations -import asyncio -from enum import Enum +from collections.abc import Callable import logging -from typing import Callable +from typing import Generic, Optional, TypeVar, cast from homeassistant.components.sensor import ( SensorDeviceClass, @@ -16,51 +16,29 @@ EntityCategory, UnitOfApparentPower, UnitOfElectricCurrent, + UnitOfElectricPotential, UnitOfEnergy, UnitOfPower, - UnitOfElectricPotential, ) from homeassistant.core import HomeAssistant, callback -from homeassistant.helpers.entity import DeviceInfo from homeassistant.helpers.entity_platform import AddEntitiesCallback +from .entity import LinkyTICEntity from .const import ( - DID_CONNECTION_TYPE, DID_CONSTRUCTOR, DID_CONSTRUCTOR_CODE, - DID_DEFAULT_NAME, DID_REGNUMBER, DID_TYPE, DID_TYPE_CODE, DID_YEAR, DOMAIN, + SETUP_PRODUCER, SETUP_THREEPHASE, SETUP_TICMODE, - SETUP_PRODUCER, TICMODE_STANDARD, ) - from .serial_reader import LinkyTICReader - -class StatusRegister(Enum): - CONTACT_SEC = 1, - ORGANE_DE_COUPURE = 2, - ETAT_DU_CACHE_BORNE_DISTRIBUTEUR = 3, - SURTENSION_SUR_UNE_DES_PHASES = 4, - DEPASSEMENT_PUISSANCE_REFERENCE = 5, - PRODUCTEUR_CONSOMMATEUR = 6, - SENS_ENERGIE_ACTIVE = 7, - TARIF_CONTRAT_FOURNITURE = 8, - TARIF_CONTRAT_DISTRIBUTEUR = 9, - MODE_DEGRADE_HORLOGE = 10, - MODE_TIC = 11, - ETAT_SORTIE_COMMUNICATION_EURIDIS = 12, - STATUS_CPL = 13, - SYNCHRO_CPL = 14, - COULEUR_JOUR_CONTRAT_TEMPO = 15, - COULEUR_LENDEMAIN_CONTRAT_TEMPO = 16, - PREAVIS_POINTES_MOBILES = 17, - POINTE_MOBILE = 18 +from .status_register import StatusRegister _LOGGER = logging.getLogger(__name__) @@ -82,24 +60,6 @@ async def async_setup_entry( config_entry.title, ) return - # Wait a bit for the controller to feed on serial frames (home assistant warns after 10s) - _LOGGER.debug( - "%s: waiting at most 9s before setting up sensor plateform in order for the async serial reader to have time to parse a full frame", - config_entry.title, - ) - for i in range(9): - await asyncio.sleep(1) - if serial_reader.has_read_full_frame(): - _LOGGER.debug( - "%s: a full frame has been read, initializing sensors", - config_entry.title, - ) - break - if i == 8: - _LOGGER.warning( - "%s: wait time is over but a full frame has yet to be read: initializing sensors anyway", - config_entry.title, - ) # Init sensors sensors = [] if config_entry.data.get(SETUP_TICMODE) == TICMODE_STANDARD: @@ -111,38 +71,34 @@ async def async_setup_entry( config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, ), - RegularStrSensor( + LinkyTICStringSensor( tag="VTIC", name="Version de la TIC", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, icon="mdi:tag", - category=EntityCategory.DIAGNOSTIC, ), DateEtHeureSensor( config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, - category=EntityCategory.DIAGNOSTIC, ), - RegularStrSensor( + LinkyTICStringSensor( tag="NGTF", name="Nom du calendrier tarifaire fournisseur", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, icon="mdi:cash-check", - category=EntityCategory.DIAGNOSTIC, ), - RegularStrSensor( + LinkyTICStringSensor( tag="LTARF", name="Libellé tarif fournisseur en cours", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, icon="mdi:cash-check", - category=EntityCategory.DIAGNOSTIC, ), EnergyIndexSensor( tag="EAST", @@ -242,463 +198,422 @@ async def async_setup_entry( config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, ), - RegularIntSensor( + CurrentSensor( tag="IRMS1", name="Courant efficace, phase 1", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, - device_class=SensorDeviceClass.CURRENT, - native_unit_of_measurement=UnitOfElectricCurrent.AMPERE, state_class=SensorStateClass.MEASUREMENT, register_callback=True, ), - RegularIntSensor( + VoltageSensor( tag="URMS1", name="Tension efficace, phase 1", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, - device_class=SensorDeviceClass.VOLTAGE, - native_unit_of_measurement=UnitOfElectricPotential.VOLT, state_class=SensorStateClass.MEASUREMENT, register_callback=True, ), - RegularIntSensor( + ApparentPowerSensor( tag="PREF", name="Puissance app. de référence", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, - device_class=SensorDeviceClass.APPARENT_POWER, - native_unit_of_measurement=UnitOfApparentPower.VOLT_AMPERE, - state_class=SensorStateClass.MEASUREMENT, register_callback=True, - conversion_function=(lambda x: x * 1000) # kVA conversion + category=EntityCategory.DIAGNOSTIC, + conversion_function=(lambda x: x * 1000), # kVA conversion ), - RegularIntSensor( + ApparentPowerSensor( tag="PCOUP", name="Puissance app. de coupure", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, - device_class=SensorDeviceClass.APPARENT_POWER, - native_unit_of_measurement=UnitOfApparentPower.VOLT_AMPERE, + register_callback=True, + category=EntityCategory.DIAGNOSTIC, + conversion_function=(lambda x: x * 1000), # kVA conversion + ), + ApparentPowerSensor( + tag="SINSTS", + name="Puissance app. instantanée soutirée", + config_title=config_entry.title, + config_uniq_id=config_entry.entry_id, + serial_reader=serial_reader, state_class=SensorStateClass.MEASUREMENT, register_callback=True, - conversion_function=(lambda x: x * 1000) # kVA conversion ), - RegularIntSensor( + ApparentPowerSensor( + tag="SMAXSN", + name="Puissance app. max. soutirée n", + config_title=config_entry.title, + config_uniq_id=config_entry.entry_id, + serial_reader=serial_reader, + register_callback=True, + ), + ApparentPowerSensor( + tag="SMAXSN-1", + name="Puissance app. max. soutirée n-1", + config_title=config_entry.title, + config_uniq_id=config_entry.entry_id, + serial_reader=serial_reader, + register_callback=True, + ), + PowerSensor( tag="CCASN", name="Point n de la courbe de charge active soutirée", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, - device_class=SensorDeviceClass.POWER, - native_unit_of_measurement=UnitOfPower.WATT, ), - RegularIntSensor( + PowerSensor( tag="CCASN-1", name="Point n-1 de la courbe de charge active soutirée", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, - device_class=SensorDeviceClass.POWER, - native_unit_of_measurement=UnitOfPower.WATT, ), - RegularIntSensor( + VoltageSensor( tag="UMOY1", name="Tension moy. ph. 1", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, - device_class=SensorDeviceClass.VOLTAGE, - native_unit_of_measurement=UnitOfElectricPotential.VOLT, - state_class=SensorStateClass.MEASUREMENT, + state_class=SensorStateClass.MEASUREMENT, # Is this a curent value? register_callback=True, ), - RegularStrSensor( + LinkyTICStringSensor( tag="DPM1", name="Début pointe mobile 1", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, icon="mdi:clock-start", - category=EntityCategory.DIAGNOSTIC, ), - RegularStrSensor( + LinkyTICStringSensor( tag="FPM1", name="Fin pointe mobile 1", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, icon="mdi:clock-end", - category=EntityCategory.DIAGNOSTIC, ), - RegularStrSensor( + LinkyTICStringSensor( tag="DPM2", name="Début pointe mobile 2", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, icon="mdi:clock-start", - category=EntityCategory.DIAGNOSTIC, ), - RegularStrSensor( + LinkyTICStringSensor( tag="FPM2", name="Fin pointe mobile 2", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, icon="mdi:clock-end", - category=EntityCategory.DIAGNOSTIC, ), - RegularStrSensor( + LinkyTICStringSensor( tag="DPM3", name="Début pointe mobile 3", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, icon="mdi:clock-start", - category=EntityCategory.DIAGNOSTIC, ), - RegularStrSensor( + LinkyTICStringSensor( tag="FPM3", name="Fin pointe mobile 3", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, icon="mdi:clock-end", - category=EntityCategory.DIAGNOSTIC, ), - RegularStrSensor( + LinkyTICStringSensor( tag="MSG1", name="Message court", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, icon="mdi:message-text-outline", - category=EntityCategory.DIAGNOSTIC, ), - RegularStrSensor( + LinkyTICStringSensor( tag="MSG2", name="Message Ultra court", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, icon="mdi:message-text-outline", - category=EntityCategory.DIAGNOSTIC, ), - RegularStrSensor( + LinkyTICStringSensor( tag="PRM", name="PRM", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, icon="mdi:tag", - category=EntityCategory.DIAGNOSTIC, ), - RegularStrSensor( + LinkyTICStringSensor( tag="RELAIS", name="Relais", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, icon="mdi:electric-switch", - category=EntityCategory.DIAGNOSTIC, ), - RegularStrSensor( + LinkyTICStringSensor( tag="NTARF", name="Numéro de l’index tarifaire en cours", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, icon="mdi:cash-check", - category=EntityCategory.DIAGNOSTIC, ), - RegularStrSensor( + LinkyTICStringSensor( tag="NJOURF", name="Numéro du jour en cours calendrier fournisseur", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, icon="mdi:calendar-month-outline", - category=EntityCategory.DIAGNOSTIC, ), - RegularStrSensor( + LinkyTICStringSensor( tag="NJOURF+1", name="Numéro du prochain jour calendrier fournisseur", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, icon="mdi:calendar-month-outline", - category=EntityCategory.DIAGNOSTIC, ), ProfilDuProchainJourCalendrierFournisseurSensor( config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, - category=EntityCategory.DIAGNOSTIC, ), - RegularStrSensor( + LinkyTICStringSensor( tag="PPOINTE", name="Profil du prochain jour de pointe", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, icon="mdi:calendar-month-outline", - category=EntityCategory.DIAGNOSTIC, ), - RegularStrSensor( + LinkyTICStringSensor( tag="STGE", name="Registre de statuts", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, icon="mdi:list-status", - category=EntityCategory.DIAGNOSTIC, - ), - StatusRegisterData( - name="Statut contact sec", - config_title=config_entry.title, - config_uniq_id=config_entry.entry_id, - serial_reader=serial_reader, - icon="mdi:electric-switch", - data=StatusRegister.CONTACT_SEC, - category=EntityCategory.DIAGNOSTIC, ), - StatusRegisterData( + # LinkyTICStatusRegisterSensor( + # name="Statut contact sec", + # config_title=config_entry.title, + # config_uniq_id=config_entry.entry_id, + # serial_reader=serial_reader, + # icon="mdi:electric-switch", + # field=StatusRegister.CONTACT_SEC, + # ), + LinkyTICStatusRegisterSensor( name="Statut organe de coupure", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, icon="mdi:connection", - data=StatusRegister.ORGANE_DE_COUPURE, - category=EntityCategory.DIAGNOSTIC, - ), - StatusRegisterData( - name="Statut état du cache-bornes distributeur", - config_title=config_entry.title, - config_uniq_id=config_entry.entry_id, - serial_reader=serial_reader, - icon="mdi:toy-brick-outline", - data=StatusRegister.ETAT_DU_CACHE_BORNE_DISTRIBUTEUR, - category=EntityCategory.DIAGNOSTIC, - ), - StatusRegisterData( - name="Statut surtension sur une des phases", - config_title=config_entry.title, - config_uniq_id=config_entry.entry_id, - serial_reader=serial_reader, - icon="mdi:flash-alert", - data=StatusRegister.SURTENSION_SUR_UNE_DES_PHASES, - category=EntityCategory.DIAGNOSTIC, - ), - StatusRegisterData( - name="Statut dépassement de la puissance de référence", - config_title=config_entry.title, - config_uniq_id=config_entry.entry_id, - serial_reader=serial_reader, - icon="mdi:flash-alert", - data=StatusRegister.DEPASSEMENT_PUISSANCE_REFERENCE, - category=EntityCategory.DIAGNOSTIC, - ), - StatusRegisterData( - name="Statut producteur/consommateur", - config_title=config_entry.title, - config_uniq_id=config_entry.entry_id, - serial_reader=serial_reader, - icon="mdi:transmission-tower", - data=StatusRegister.PRODUCTEUR_CONSOMMATEUR, - category=EntityCategory.DIAGNOSTIC, - ), - StatusRegisterData( - name="Statut sens de l’énergie active", - config_title=config_entry.title, - config_uniq_id=config_entry.entry_id, - serial_reader=serial_reader, - icon="mdi:transmission-tower", - data=StatusRegister.SENS_ENERGIE_ACTIVE, - category=EntityCategory.DIAGNOSTIC, - ), - StatusRegisterData( + field=StatusRegister.ORGANE_DE_COUPURE, + ), + # LinkyTICStatusRegisterSensor( + # name="Statut état du cache-bornes distributeur", + # config_title=config_entry.title, + # config_uniq_id=config_entry.entry_id, + # serial_reader=serial_reader, + # icon="mdi:toy-brick-outline", + # field=StatusRegister.ETAT_DU_CACHE_BORNE_DISTRIBUTEUR, + # ), + # LinkyTICStatusRegisterSensor( + # name="Statut surtension sur une des phases", + # config_title=config_entry.title, + # config_uniq_id=config_entry.entry_id, + # serial_reader=serial_reader, + # icon="mdi:flash-alert", + # field=StatusRegister.SURTENSION_SUR_UNE_DES_PHASES, + # ), + # LinkyTICStatusRegisterSensor( + # name="Statut dépassement de la puissance de référence", + # config_title=config_entry.title, + # config_uniq_id=config_entry.entry_id, + # serial_reader=serial_reader, + # icon="mdi:flash-alert", + # field=StatusRegister.DEPASSEMENT_PUISSANCE_REFERENCE, + # ), + # LinkyTICStatusRegisterSensor( + # name="Statut producteur/consommateur", + # config_title=config_entry.title, + # config_uniq_id=config_entry.entry_id, + # serial_reader=serial_reader, + # icon="mdi:transmission-tower", + # field=StatusRegister.PRODUCTEUR_CONSOMMATEUR, + # ), + # LinkyTICStatusRegisterSensor( + # name="Statut sens de l’énergie active", + # config_title=config_entry.title, + # config_uniq_id=config_entry.entry_id, + # serial_reader=serial_reader, + # icon="mdi:transmission-tower", + # field=StatusRegister.SENS_ENERGIE_ACTIVE, + # ), + LinkyTICStatusRegisterSensor( name="Statut tarif contrat fourniture", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, icon="mdi:cash-check", - data=StatusRegister.TARIF_CONTRAT_FOURNITURE, - category=EntityCategory.DIAGNOSTIC, + field=StatusRegister.TARIF_CONTRAT_FOURNITURE, ), - StatusRegisterData( + LinkyTICStatusRegisterSensor( name="Statut tarif contrat distributeur", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, icon="mdi:cash-check", - data=StatusRegister.TARIF_CONTRAT_DISTRIBUTEUR, - category=EntityCategory.DIAGNOSTIC, - ), - StatusRegisterData( - name="Statut mode dégradée de l'horloge", - config_title=config_entry.title, - config_uniq_id=config_entry.entry_id, - serial_reader=serial_reader, - icon="mdi:clock-alert-outline", - data=StatusRegister.MODE_DEGRADE_HORLOGE, - category=EntityCategory.DIAGNOSTIC, - ), - StatusRegisterData( - name="Statut sortie télé-information", - config_title=config_entry.title, - config_uniq_id=config_entry.entry_id, - serial_reader=serial_reader, - icon="mdi:tag", - data=StatusRegister.MODE_TIC, - category=EntityCategory.DIAGNOSTIC, - ), - StatusRegisterData( + field=StatusRegister.TARIF_CONTRAT_DISTRIBUTEUR, + ), + # LinkyTICStatusRegisterSensor( + # name="Statut mode dégradée de l'horloge", + # config_title=config_entry.title, + # config_uniq_id=config_entry.entry_id, + # serial_reader=serial_reader, + # icon="mdi:clock-alert-outline", + # field=StatusRegister.MODE_DEGRADE_HORLOGE, + # ), + # LinkyTICStatusRegisterSensor( + # name="Statut sortie télé-information", + # config_title=config_entry.title, + # config_uniq_id=config_entry.entry_id, + # serial_reader=serial_reader, + # icon="mdi:tag", + # field=StatusRegister.MODE_TIC, + # ), + LinkyTICStatusRegisterSensor( name="Statut sortie communication Euridis", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, icon="mdi:tag", - data=StatusRegister.ETAT_SORTIE_COMMUNICATION_EURIDIS, - category=EntityCategory.DIAGNOSTIC, + field=StatusRegister.ETAT_SORTIE_COMMUNICATION_EURIDIS, ), - StatusRegisterData( + LinkyTICStatusRegisterSensor( name="Statut CPL", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, icon="mdi:tag", - data=StatusRegister.STATUS_CPL, - category=EntityCategory.DIAGNOSTIC, - ), - StatusRegisterData( - name="Statut synchronisation CPL", - config_title=config_entry.title, - config_uniq_id=config_entry.entry_id, - serial_reader=serial_reader, - icon="mdi:sync", - data=StatusRegister.SYNCHRO_CPL, - category=EntityCategory.DIAGNOSTIC, - ), - StatusRegisterData( + field=StatusRegister.STATUS_CPL, + ), + # LinkyTICStatusRegisterSensor( + # name="Statut synchronisation CPL", + # config_title=config_entry.title, + # config_uniq_id=config_entry.entry_id, + # serial_reader=serial_reader, + # icon="mdi:sync", + # field=StatusRegister.SYNCHRO_CPL, + # ), + LinkyTICStatusRegisterSensor( name="Statut couleur du jour tempo", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, icon="mdi:palette", - data=StatusRegister.COULEUR_JOUR_CONTRAT_TEMPO, - category=EntityCategory.DIAGNOSTIC, + field=StatusRegister.COULEUR_JOUR_CONTRAT_TEMPO, ), - StatusRegisterData( + LinkyTICStatusRegisterSensor( name="Statut couleur du lendemain tempo", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, icon="mdi:palette", - data=StatusRegister.COULEUR_LENDEMAIN_CONTRAT_TEMPO, - category=EntityCategory.DIAGNOSTIC, + field=StatusRegister.COULEUR_LENDEMAIN_CONTRAT_TEMPO, ), - StatusRegisterData( + LinkyTICStatusRegisterSensor( name="Statut préavis pointes mobiles", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, icon="mdi:clock-alert-outline", - data=StatusRegister.PREAVIS_POINTES_MOBILES, - category=EntityCategory.DIAGNOSTIC, + field=StatusRegister.PREAVIS_POINTES_MOBILES, ), - StatusRegisterData( + LinkyTICStatusRegisterSensor( name="Statut pointe mobile", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, icon="mdi:progress-clock", - data=StatusRegister.POINTE_MOBILE, - category=EntityCategory.DIAGNOSTIC, + field=StatusRegister.POINTE_MOBILE, ), ] # Handle protocol deviation for experimental/pilote modules if (serial_reader.device_identification[DID_TYPE_CODE]=="67"): sensors.append( - RegularIntSensor( + ApparentPowerSensor( tag="SINST1", name="Puissance app. instantanée soutirée", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, - device_class=SensorDeviceClass.APPARENT_POWER, - native_unit_of_measurement=UnitOfApparentPower.VOLT_AMPERE, - state_class=SensorStateClass.MEASUREMENT, register_callback=True, ) ) sensors.append( - RegularIntSensor( + ApparentPowerSensor( tag="SMAXN", name="Puissance app. max. soutirée n", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, - device_class=SensorDeviceClass.APPARENT_POWER, - native_unit_of_measurement=UnitOfApparentPower.VOLT_AMPERE, - state_class=SensorStateClass.MEASUREMENT, register_callback=True, ) ) sensors.append( - RegularIntSensor( + ApparentPowerSensor( tag="SMAXN-1", name="Puissance app. max. soutirée n-1", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, - device_class=SensorDeviceClass.APPARENT_POWER, - native_unit_of_measurement=UnitOfApparentPower.VOLT_AMPERE, - state_class=SensorStateClass.MEASUREMENT, register_callback=True, ) ) else: sensors.append( - RegularIntSensor( + ApparentPowerSensor( tag="SINSTS", name="Puissance app. instantanée soutirée", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, - device_class=SensorDeviceClass.APPARENT_POWER, - native_unit_of_measurement=UnitOfApparentPower.VOLT_AMPERE, - state_class=SensorStateClass.MEASUREMENT, register_callback=True, ) ) sensors.append( - RegularIntSensor( + ApparentPowerSensor( tag="SMAXSN", name="Puissance app. max. soutirée n", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, - device_class=SensorDeviceClass.APPARENT_POWER, - native_unit_of_measurement=UnitOfApparentPower.VOLT_AMPERE, - state_class=SensorStateClass.MEASUREMENT, register_callback=True, ) ) sensors.append( - RegularIntSensor( + ApparentPowerSensor( tag="SMAXSN-1", name="Puissance app. max. soutirée n-1", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, - device_class=SensorDeviceClass.APPARENT_POWER, - native_unit_of_measurement=UnitOfApparentPower.VOLT_AMPERE, - state_class=SensorStateClass.MEASUREMENT, register_callback=True, ) ) @@ -755,264 +670,216 @@ async def async_setup_entry( ) ) sensors.append( - RegularIntSensor( + ApparentPowerSensor( tag="SINSTI", name="Puissance app. instantanée injectée", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, - device_class=SensorDeviceClass.APPARENT_POWER, - native_unit_of_measurement=UnitOfApparentPower.VOLT_AMPERE, state_class=SensorStateClass.MEASUREMENT, register_callback=True, icon="mdi:transmission-tower-import", ) ) sensors.append( - RegularIntSensor( + ApparentPowerSensor( tag="SMAXIN", name="Puissance app. max. injectée n", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, - device_class=SensorDeviceClass.APPARENT_POWER, - native_unit_of_measurement=UnitOfApparentPower.VOLT_AMPERE, - state_class=SensorStateClass.MEASUREMENT, register_callback=True, icon="mdi:transmission-tower-import", ) ) sensors.append( - RegularIntSensor( + ApparentPowerSensor( tag="SMAXIN-1", name="Puissance app. max. injectée n-1", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, - device_class=SensorDeviceClass.APPARENT_POWER, - native_unit_of_measurement=UnitOfApparentPower.VOLT_AMPERE, - state_class=SensorStateClass.MEASUREMENT, register_callback=True, icon="mdi:transmission-tower-import", ) ) sensors.append( - RegularIntSensor( + PowerSensor( tag="CCAIN", name="Point n de la courbe de charge active injectée", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, - device_class=SensorDeviceClass.POWER, - native_unit_of_measurement=UnitOfPower.WATT, icon="mdi:transmission-tower-import", ) ) sensors.append( - RegularIntSensor( + PowerSensor( tag="CCAIN-1", name="Point n-1 de la courbe de charge active injectée", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, - device_class=SensorDeviceClass.POWER, - native_unit_of_measurement=UnitOfPower.WATT, icon="mdi:transmission-tower-import", ) ) # Add three-phase specific sensors if bool(config_entry.data.get(SETUP_THREEPHASE)): sensors.append( - RegularIntSensor( + CurrentSensor( tag="IRMS2", name="Courant efficace, phase 2", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, - device_class=SensorDeviceClass.CURRENT, - native_unit_of_measurement=UnitOfElectricCurrent.AMPERE, state_class=SensorStateClass.MEASUREMENT, register_callback=True, ) ) sensors.append( - RegularIntSensor( + CurrentSensor( tag="IRMS3", name="Courant efficace, phase 3", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, - device_class=SensorDeviceClass.CURRENT, - native_unit_of_measurement=UnitOfElectricCurrent.AMPERE, state_class=SensorStateClass.MEASUREMENT, register_callback=True, ) ) sensors.append( - RegularIntSensor( + VoltageSensor( tag="URMS2", name="Tension efficace, phase 2", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, - device_class=SensorDeviceClass.VOLTAGE, - native_unit_of_measurement=UnitOfElectricPotential.VOLT, state_class=SensorStateClass.MEASUREMENT, register_callback=True, ) ) sensors.append( - RegularIntSensor( + VoltageSensor( tag="URMS3", name="Tension efficace, phase 3", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, - device_class=SensorDeviceClass.VOLTAGE, - native_unit_of_measurement=UnitOfElectricPotential.VOLT, state_class=SensorStateClass.MEASUREMENT, register_callback=True, ) ) sensors.append( - RegularIntSensor( + ApparentPowerSensor( tag="SINSTS1", name="Puissance app. instantanée soutirée phase 1", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, - device_class=SensorDeviceClass.APPARENT_POWER, - native_unit_of_measurement=UnitOfApparentPower.VOLT_AMPERE, state_class=SensorStateClass.MEASUREMENT, register_callback=True, ) ) sensors.append( - RegularIntSensor( + ApparentPowerSensor( tag="SINSTS2", name="Puissance app. instantanée soutirée phase 2", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, - device_class=SensorDeviceClass.APPARENT_POWER, - native_unit_of_measurement=UnitOfApparentPower.VOLT_AMPERE, state_class=SensorStateClass.MEASUREMENT, register_callback=True, ) ) sensors.append( - RegularIntSensor( + ApparentPowerSensor( tag="SINSTS3", name="Puissance app. instantanée soutirée phase 3", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, - device_class=SensorDeviceClass.APPARENT_POWER, - native_unit_of_measurement=UnitOfApparentPower.VOLT_AMPERE, state_class=SensorStateClass.MEASUREMENT, register_callback=True, ) ) sensors.append( - RegularIntSensor( + ApparentPowerSensor( tag="SMAXSN1", name="Puissance app max. soutirée n phase 1", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, - device_class=SensorDeviceClass.APPARENT_POWER, - native_unit_of_measurement=UnitOfApparentPower.VOLT_AMPERE, - state_class=SensorStateClass.MEASUREMENT, register_callback=True, ) ) sensors.append( - RegularIntSensor( + ApparentPowerSensor( tag="SMAXSN2", name="Puissance app max. soutirée n phase 2", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, - device_class=SensorDeviceClass.APPARENT_POWER, - native_unit_of_measurement=UnitOfApparentPower.VOLT_AMPERE, - state_class=SensorStateClass.MEASUREMENT, register_callback=True, ) ) sensors.append( - RegularIntSensor( + ApparentPowerSensor( tag="SMAXSN3", name="Puissance app max. soutirée n phase 3", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, - device_class=SensorDeviceClass.APPARENT_POWER, - native_unit_of_measurement=UnitOfApparentPower.VOLT_AMPERE, - state_class=SensorStateClass.MEASUREMENT, register_callback=True, ) ) sensors.append( - RegularIntSensor( + ApparentPowerSensor( tag="SMAXSN1-1", name="Puissance app max. soutirée n-1 phase 1", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, - device_class=SensorDeviceClass.APPARENT_POWER, - native_unit_of_measurement=UnitOfApparentPower.VOLT_AMPERE, - state_class=SensorStateClass.MEASUREMENT, register_callback=True, ) ) sensors.append( - RegularIntSensor( + ApparentPowerSensor( tag="SMAXSN2-1", name="Puissance app max. soutirée n-1 phase 2", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, - device_class=SensorDeviceClass.APPARENT_POWER, - native_unit_of_measurement=UnitOfApparentPower.VOLT_AMPERE, - state_class=SensorStateClass.MEASUREMENT, register_callback=True, ) ) sensors.append( - RegularIntSensor( + ApparentPowerSensor( tag="SMAXSN3-1", name="Puissance app max. soutirée n-1 phase 3", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, - device_class=SensorDeviceClass.APPARENT_POWER, - native_unit_of_measurement=UnitOfApparentPower.VOLT_AMPERE, - state_class=SensorStateClass.MEASUREMENT, register_callback=True, ) ) sensors.append( - RegularIntSensor( + VoltageSensor( tag="UMOY2", name="Tension moy. ph. 2", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, - device_class=SensorDeviceClass.VOLTAGE, - native_unit_of_measurement=UnitOfElectricPotential.VOLT, state_class=SensorStateClass.MEASUREMENT, register_callback=True, ) ) sensors.append( - RegularIntSensor( + VoltageSensor( tag="UMOY3", name="Tension moy. ph. 3", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, - device_class=SensorDeviceClass.VOLTAGE, - native_unit_of_measurement=UnitOfElectricPotential.VOLT, state_class=SensorStateClass.MEASUREMENT, register_callback=True, ) @@ -1027,7 +894,7 @@ async def async_setup_entry( config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, ), - RegularStrSensor( + LinkyTICStringSensor( tag="OPTARIF", name="Option tarifaire choisie", config_title=config_entry.title, @@ -1036,15 +903,13 @@ async def async_setup_entry( icon="mdi:cash-check", category=EntityCategory.DIAGNOSTIC, ), - RegularIntSensor( + CurrentSensor( tag="ISOUSC", name="Intensité souscrite", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, category=EntityCategory.DIAGNOSTIC, - device_class=SensorDeviceClass.CURRENT, - native_unit_of_measurement=UnitOfElectricCurrent.AMPERE, ), EnergyIndexSensor( tag="BASE", @@ -1069,8 +934,7 @@ async def async_setup_entry( ), EnergyIndexSensor( tag="EJPHN", - name="Index option EJP - Heures Normal" - + "es", # workaround for codespell in HA pre commit hook + name="Index option EJP - Heures Normal" + "es", # workaround for codespell in HA pre commit hook config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, @@ -1129,7 +993,7 @@ async def async_setup_entry( config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, ), - RegularStrSensor( + LinkyTICStringSensor( tag="PTEC", name="Période Tarifaire en cours", config_title=config_entry.title, @@ -1137,7 +1001,7 @@ async def async_setup_entry( serial_reader=serial_reader, icon="mdi:calendar-expand-horizontal", ), - RegularStrSensor( + LinkyTICStringSensor( tag="DEMAIN", name="Couleur du lendemain", config_title=config_entry.title, @@ -1145,18 +1009,16 @@ async def async_setup_entry( serial_reader=serial_reader, icon="mdi:palette", ), - RegularIntSensor( + ApparentPowerSensor( tag="PAPP", name="Puissance apparente", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, - device_class=SensorDeviceClass.APPARENT_POWER, - native_unit_of_measurement=UnitOfApparentPower.VOLT_AMPERE, state_class=SensorStateClass.MEASUREMENT, register_callback=True, ), - RegularStrSensor( + LinkyTICStringSensor( tag="HHPHC", name="Horaire Heures Pleines Heures Creuses", config_title=config_entry.title, @@ -1165,10 +1027,9 @@ async def async_setup_entry( icon="mdi:clock-outline", enabled_by_default=False, ), - RegularStrSensor( + LinkyTICStringSensor( tag="MOTDETAT", - name="Mo" - + "t d'état du compteur", # workaround for codespell in HA pre commit hook + name="Mo" + "t d'état du compteur", # workaround for codespell in HA pre commit hook config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, @@ -1181,90 +1042,76 @@ async def async_setup_entry( if bool(config_entry.data.get(SETUP_THREEPHASE)): # three-phase - concat specific sensors sensors.append( - RegularIntSensor( + CurrentSensor( tag="IINST1", name="Intensité Instantanée (phase 1)", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, - device_class=SensorDeviceClass.CURRENT, - native_unit_of_measurement=UnitOfElectricCurrent.AMPERE, state_class=SensorStateClass.MEASUREMENT, register_callback=True, ) ) sensors.append( - RegularIntSensor( + CurrentSensor( tag="IINST2", name="Intensité Instantanée (phase 2)", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, - device_class=SensorDeviceClass.CURRENT, - native_unit_of_measurement=UnitOfElectricCurrent.AMPERE, state_class=SensorStateClass.MEASUREMENT, register_callback=True, ) ) sensors.append( - RegularIntSensor( + CurrentSensor( tag="IINST3", name="Intensité Instantanée (phase 3)", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, - device_class=SensorDeviceClass.CURRENT, - native_unit_of_measurement=UnitOfElectricCurrent.AMPERE, state_class=SensorStateClass.MEASUREMENT, register_callback=True, ) ) sensors.append( - RegularIntSensor( + CurrentSensor( tag="IMAX1", name="Intensité maximale appelée (phase 1)", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, - device_class=SensorDeviceClass.CURRENT, - native_unit_of_measurement=UnitOfElectricCurrent.AMPERE, ) ) sensors.append( - RegularIntSensor( + CurrentSensor( tag="IMAX2", name="Intensité maximale appelée (phase 2)", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, - device_class=SensorDeviceClass.CURRENT, - native_unit_of_measurement=UnitOfElectricCurrent.AMPERE, ) ) sensors.append( - RegularIntSensor( + CurrentSensor( tag="IMAX3", name="Intensité maximale appelée (phase 3)", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, - device_class=SensorDeviceClass.CURRENT, - native_unit_of_measurement=UnitOfElectricCurrent.AMPERE, ) ) sensors.append( - RegularIntSensor( + PowerSensor( # documentation says unit is Watt but description talks about VoltAmp :/ tag="PMAX", name="Puissance maximale triphasée atteinte (jour n-1)", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, - device_class=SensorDeviceClass.POWER, - native_unit_of_measurement=UnitOfPower.WATT, # documentation says unit is Watt but description talks about VoltAmp :/ ) ) sensors.append( - RegularStrSensor( + LinkyTICStringSensor( tag="PPOT", name="Présence des potentiels", config_title=config_entry.title, @@ -1275,135 +1122,164 @@ async def async_setup_entry( ) # Burst sensors sensors.append( - RegularIntSensor( + CurrentSensor( tag="ADIR1", name="Avertissement de Dépassement d'intensité de réglage (phase 1)", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, - device_class=SensorDeviceClass.CURRENT, - native_unit_of_measurement=UnitOfElectricCurrent.AMPERE, register_callback=True, ) ) sensors.append( - RegularIntSensor( + CurrentSensor( tag="ADIR2", name="Avertissement de Dépassement d'intensité de réglage (phase 2)", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, - device_class=SensorDeviceClass.CURRENT, - native_unit_of_measurement=UnitOfElectricCurrent.AMPERE, register_callback=True, ) ) sensors.append( - RegularIntSensor( + CurrentSensor( tag="ADIR3", name="Avertissement de Dépassement d'intensité de réglage (phase 3)", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, - device_class=SensorDeviceClass.CURRENT, - native_unit_of_measurement=UnitOfElectricCurrent.AMPERE, register_callback=True, ) ) - _LOGGER.info( - "Adding %d sensors for the three phase historic mode", len(sensors) - ) + _LOGGER.info("Adding %d sensors for the three phase historic mode", len(sensors)) else: # single phase - concat specific sensors sensors.append( - RegularIntSensor( + CurrentSensor( tag="IINST", name="Intensité Instantanée", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, - device_class=SensorDeviceClass.CURRENT, - native_unit_of_measurement=UnitOfElectricCurrent.AMPERE, state_class=SensorStateClass.MEASUREMENT, register_callback=True, ) ) sensors.append( - RegularIntSensor( + CurrentSensor( tag="ADPS", name="Avertissement de Dépassement De Puissance Souscrite", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, - device_class=SensorDeviceClass.CURRENT, - native_unit_of_measurement=UnitOfElectricCurrent.AMPERE, state_class=SensorStateClass.MEASUREMENT, register_callback=True, ) ) sensors.append( - RegularIntSensor( + CurrentSensor( tag="IMAX", name="Intensité maximale appelée", config_title=config_entry.title, config_uniq_id=config_entry.entry_id, serial_reader=serial_reader, - device_class=SensorDeviceClass.CURRENT, - native_unit_of_measurement=UnitOfElectricCurrent.AMPERE, ) ) - _LOGGER.info( - "Adding %d sensors for the single phase historic mode", len(sensors) - ) + _LOGGER.info("Adding %d sensors for the single phase historic mode", len(sensors)) # Add the entities to HA if len(sensors) > 0: async_add_entities(sensors, True) -class ADSSensor(SensorEntity): +T = TypeVar("T") + + +class LinkyTICSensor(LinkyTICEntity, SensorEntity, Generic[T]): + """Base class for all Linky TIC sensor entities.""" + + _attr_should_poll = True + _last_value: T | None + + def __init__(self, tag: str, config_title: str, reader: LinkyTICReader) -> None: + """Init sensor entity.""" + super().__init__(reader) + self._last_value = None + self._tag = tag + self._config_title = config_title + + @property + def native_value(self) -> T | None: + """Value of the sensor.""" + return self._last_value + + def _update(self) -> tuple[Optional[str], Optional[str]]: + """Get value and/or timestamp from cached data. Responsible for updating sensor availability.""" + value, timestamp = self._serial_controller.get_values(self._tag) + _LOGGER.debug( + "%s: retrieved %s value from serial controller: (%s, %s)", + self._config_title, + self._tag, + value, + timestamp + ) + + if not value and not timestamp: # No data returned. + if not self.available: + # Sensor is already unavailable, no need to check why. + return None, None + if not self._serial_controller.is_connected: + _LOGGER.debug( + "%s: marking the %s sensor as unavailable: serial connection lost", + self._config_title, + self._tag, + ) + self._attr_available = False + elif self._serial_controller.has_read_full_frame: + _LOGGER.info( + "%s: marking the %s sensor as unavailable: a full frame has been read but %s has not been found", + self._config_title, + self._tag, + self._tag, + ) + self._attr_available = False + else: + # A frame has not been read yet (it should!) or is already unavailable and no new data was fetched. + # Let sensor in current availability state. + pass + return None, None + + if not self.available: + # Data is available, so is sensor + self._attr_available = True + _LOGGER.info( + "%s: marking the %s sensor as available now !", + self._config_title, + self._tag, + ) + + return value, timestamp + + +class ADSSensor(LinkyTICSensor[str]): """Ad resse du compteur entity.""" + # ADSSensor is a subclass and not an instance of StringSensor because it binds to two tags. + # Generic properties # https://developers.home-assistant.io/docs/core/entity#generic-properties - _attr_has_entity_name = True + _attr_entity_category = EntityCategory.DIAGNOSTIC - _attr_name = ( - "A" + "dress" + "e du compteur" - ) # workaround for codespell in HA pre commit hook - _attr_should_poll = True + _attr_name = "A" + "dress" + "e du compteur" # workaround for codespell in HA pre commit hook _attr_icon = "mdi:tag" - def __init__( - self, config_title: str, tag: str, config_uniq_id: str, serial_reader: LinkyTICReader - ) -> None: + def __init__(self, config_title: str, tag: str, config_uniq_id: str, serial_reader: LinkyTICReader) -> None: """Initialize an ADCO/ADSC Sensor.""" _LOGGER.debug("%s: initializing %s sensor", config_title, tag) - # Linky TIC sensor properties - self._config_title = config_title - self._config_uniq_id = config_uniq_id - self._last_value: str | None = None - self._serial_controller = serial_reader - self._tag = tag + super().__init__(tag, config_title, serial_reader) # Generic entity properties self._attr_unique_id = f"{DOMAIN}_{config_uniq_id}_adco" self._extra: dict[str, str] = {} - @property - def device_info(self) -> DeviceInfo: - """Return the device info.""" - return DeviceInfo( - # connections={(DID_CONNECTION_TYPE, self._serial_controller._port)}, - identifiers={(DOMAIN, self._serial_controller.device_identification[DID_REGNUMBER] or "Unknown")}, - manufacturer=self._serial_controller.device_identification[DID_CONSTRUCTOR], - model=self._serial_controller.device_identification[DID_TYPE], - name=DID_DEFAULT_NAME, - ) - - @property - def native_value(self) -> str | None: - """Value of the sensor.""" - return self._last_value - @property def extra_state_attributes(self) -> dict[str, str]: """Get HA sensor extra attributes.""" @@ -1413,69 +1289,27 @@ def extra_state_attributes(self) -> dict[str, str]: def update(self): """Update the value of the sensor from the thread object memory cache.""" # Get last seen value from controller - value, _ = self._serial_controller.get_values(self._tag) - _LOGGER.debug( - "%s: retrieved %s value from serial controller: %s", - self._config_title, - self._tag, - repr(value), - ) - # Handle entity availability - if value is None: - if self._attr_available: - self._extra = {} - if not self._serial_controller.is_connected(): - _LOGGER.debug( - "%s: marking the %s sensor as unavailable: serial connection lost", - self._config_title, - self._tag, - ) - self._attr_available = False - elif self._serial_controller.has_read_full_frame(): - _LOGGER.info( - "%s: marking the %s sensor as unavailable: a full frame has been read but %s has not been found", - self._config_title, - self._tag, - self._tag, - ) - self._attr_available = False - # else: we are connected but a full frame has not been read yet, let's wait a little longer before marking it unavailable - else: - # Set this sensor extra attributes - constructor_str = ( - f"{self._serial_controller.device_identification[DID_CONSTRUCTOR]} ({self._serial_controller.device_identification[DID_CONSTRUCTOR_CODE]})" - if self._serial_controller.device_identification[DID_CONSTRUCTOR] is not None - else f"Inconnu ({self._serial_controller.device_identification[DID_CONSTRUCTOR_CODE]})" - ) - type_str = ( - f"{self._serial_controller.device_identification[DID_TYPE]} ({self._serial_controller.device_identification[DID_TYPE_CODE]})" - if self._serial_controller.device_identification[DID_TYPE] is not None - else f"Inconnu ({self._serial_controller.device_identification[DID_TYPE_CODE]})" - ) - self._extra = { - "constructeur": constructor_str, - "année de construction": f"20{self._serial_controller.device_identification[DID_YEAR]}", - "type de l'appareil": type_str, - "matricule de l'appareil": self._serial_controller.device_identification[DID_REGNUMBER] or "Unknown", - } - if not self._attr_available: - _LOGGER.info( - "%s: marking the %s sensor as available now !", - self._config_title, - self._tag, - ) - self._attr_available = True + value, _ = self._update() + + if not value: + return + + # Set this sensor extra attributes + did = self._serial_controller.device_identification + self._extra = { + "constructeur": f"{did[DID_CONSTRUCTOR] or 'Inconnu'} ({did[DID_CONSTRUCTOR_CODE]})", + "année de construction": f"20{did[DID_YEAR]}", + "type de l'appareil": f"{did[DID_TYPE] or 'Inconnu'} ({did[DID_TYPE_CODE]})", + "matricule de l'appareil": did[DID_REGNUMBER] or "Inconnu", + } # Save value self._last_value = value -class RegularStrSensor(SensorEntity): +class LinkyTICStringSensor(LinkyTICSensor[str]): """Common class for text sensor.""" - # Generic entity properties - # https://developers.home-assistant.io/docs/core/entity#generic-properties - _attr_has_entity_name = True - _attr_should_poll = True + _attr_entity_category = EntityCategory.DIAGNOSTIC def __init__( self, @@ -1490,12 +1324,8 @@ def __init__( ) -> None: """Initialize a Regular Str Sensor.""" _LOGGER.debug("%s: initializing %s sensor", config_title, tag.upper()) - # Linky TIC sensor properties - self._config_title = config_title - self._config_uniq_id = config_uniq_id - self._last_value: str | None = None - self._serial_controller = serial_reader - self._tag = tag.upper() + super().__init__(tag, config_title, serial_reader) + # Generic Entity properties self._attr_name = name self._attr_unique_id = f"{DOMAIN}_{config_uniq_id}_{tag.lower()}" @@ -1505,71 +1335,18 @@ def __init__( self._attr_entity_category = category self._attr_entity_registry_enabled_default = enabled_by_default - @property - def device_info(self) -> DeviceInfo: - """Return the device info.""" - return DeviceInfo( - # connections={(DID_CONNECTION_TYPE, self._serial_controller._port)}, - identifiers={(DOMAIN, self._serial_controller.device_identification[DID_REGNUMBER] or "Unknown")}, - manufacturer=self._serial_controller.device_identification[DID_CONSTRUCTOR], - model=self._serial_controller.device_identification[DID_TYPE], - name=DID_DEFAULT_NAME, - ) - - @property - def native_value(self) -> str | None: - """Value of the sensor.""" - return self._last_value - @callback def update(self): """Update the value of the sensor from the thread object memory cache.""" # Get last seen value from controller - value, _ = self._serial_controller.get_values(self._tag) - _LOGGER.debug( - "%s: retrieved %s value from serial controller: %s", - self._config_title, - self._tag, - repr(value), - ) - # Handle entity availability - if value is None: - if self._attr_available: - if not self._serial_controller.is_connected(): - _LOGGER.debug( - "%s: marking the %s sensor as unavailable: serial connection lost", - self._config_title, - self._tag, - ) - self._attr_available = False - elif self._serial_controller.has_read_full_frame(): - _LOGGER.info( - "%s: marking the %s sensor as unavailable: a full frame has been read but %s has not been found", - self._config_title, - self._tag, - self._tag, - ) - self._attr_available = False - # else: we are connected but a full frame has not been read yet, let's wait a little longer before marking it unavailable - else: - if not self._attr_available: - _LOGGER.info( - "%s: marking the %s sensor as available now !", - self._config_title, - self._tag, - ) - self._attr_available = True - # Save value - self._last_value = value - + value, _ = self._update() + if not value: + return + self._last_value = ' '.join(value.split()) -class RegularIntSensor(SensorEntity): - """Common class for energy index counters.""" - # Generic entity properties - # https://developers.home-assistant.io/docs/core/entity#generic-properties - _attr_has_entity_name = True - _attr_should_poll = True +class RegularIntSensor(LinkyTICSensor[int]): + """Common class for int sensors.""" def __init__( self, @@ -1584,25 +1361,18 @@ def __init__( native_unit_of_measurement: str | None = None, state_class: SensorStateClass | None = None, register_callback: bool = False, - conversion_function: Callable[[int], int] | None = None + conversion_function: Callable[[int], int] | None = None, ) -> None: """Initialize a Regular Int Sensor.""" _LOGGER.debug("%s: initializing %s sensor", config_title, tag.upper()) - # Linky TIC sensor properties - self._config_title = config_title - self._config_uniq_id = config_uniq_id - self._last_value: int | None = None - self._serial_controller = serial_reader - self._tag = tag.upper() + super().__init__(tag, config_title, serial_reader) + self._attr_name = name if register_callback: - self._serial_controller.register_push_notif( - self._tag, self.update_notification - ) + self._serial_controller.register_push_notif(self._tag, self.update_notification) # Generic Entity properties if category: self._attr_entity_category = category - self._attr_name = name self._attr_unique_id = f"{DOMAIN}_{config_uniq_id}_{tag.lower()}" if icon: self._attr_icon = icon @@ -1613,66 +1383,20 @@ def __init__( self._attr_native_unit_of_measurement = native_unit_of_measurement if state_class: self._attr_state_class = state_class - - self._conversion_function = conversion_function - @property - def device_info(self) -> DeviceInfo: - """Return the device info.""" - return DeviceInfo( - # connections={(DID_CONNECTION_TYPE, self._serial_controller._port)}, - identifiers={(DOMAIN, self._serial_controller.device_identification[DID_REGNUMBER] or "Unknown")}, - manufacturer=self._serial_controller.device_identification[DID_CONSTRUCTOR], - model=self._serial_controller.device_identification[DID_TYPE], - name=DID_DEFAULT_NAME, - ) - - @property - def native_value(self) -> int | None: - """Value of the sensor.""" - return self._last_value + self._conversion_function = conversion_function @callback def update(self): """Update the value of the sensor from the thread object memory cache.""" - # Get last seen value from controller - value, _ = self._serial_controller.get_values(self._tag) - _LOGGER.debug( - "%s: retrieved %s value from serial controller: %s", - self._config_title, - self._tag, - repr(value), - ) - # Handle entity availability and save value - if value is None: - if self._attr_available: - if not self._serial_controller.is_connected(): - _LOGGER.debug( - "%s: marking the %s sensor as unavailable: serial connection lost", - self._config_title, - self._tag, - ) - self._attr_available = False - elif self._serial_controller.has_read_full_frame(): - _LOGGER.info( - "%s: marking the %s sensor as unavailable: a full frame has been read but %s has not been found", - self._config_title, - self._tag, - self._tag, - ) - self._attr_available = False - # else: we are connected but a full frame has not been read yet, let's wait a little longer before marking it unavailable - # Nullify value - self._last_value = None - else: - self._last_value = int(value) if not self._conversion_function else self._conversion_function(int(value)) - if not self._attr_available: - _LOGGER.debug( - "%s: marking the %s sensor as available now !", - self._config_title, - self._tag, - ) - self._attr_available = True + value, _ = self._update() + if not value: + return + try: + value_int = int(value) + except ValueError: + return + self._last_value = self._conversion_function(value_int) if self._conversion_function else value_int def update_notification(self, realtime_option: bool) -> None: """Receive a notification from the serial reader when our tag has been read on the wire.""" @@ -1683,9 +1407,7 @@ def update_notification(self, realtime_option: bool) -> None: self._tag, ) if not self._attr_should_poll: - self._attr_should_poll = ( - True # realtime option disable, HA should poll us - ) + self._attr_should_poll = True # realtime option disable, HA should poll us return # Realtime on _LOGGER.debug( @@ -1693,242 +1415,161 @@ def update_notification(self, realtime_option: bool) -> None: self._tag, ) if self._attr_should_poll: - self._attr_should_poll = False # now that user has activated realtime, we will push data, no need for HA to poll us + self._attr_should_poll = ( + False # now that user has activated realtime, we will push data, no need for HA to poll us + ) self.schedule_update_ha_state(force_refresh=True) class EnergyIndexSensor(RegularIntSensor): - """Common class for energy index counters.""" + """Common class for energy index counters, in Watt-hours.""" - def __init__( - self, - tag: str, - name: str, - config_title: str, - config_uniq_id: str, - serial_reader: LinkyTICReader, - icon: str | None = "mdi:counter", - ) -> None: - """Initialize an Energy Index sensor.""" - super().__init__( - tag=tag, - name=name, - config_title=config_title, - config_uniq_id=config_uniq_id, - serial_reader=serial_reader, - icon=icon, - device_class=SensorDeviceClass.ENERGY, - native_unit_of_measurement=UnitOfEnergy.WATT_HOUR, - state_class=SensorStateClass.TOTAL_INCREASING, - ) + _attr_device_class = SensorDeviceClass.ENERGY + _attr_native_unit_of_measurement = UnitOfEnergy.WATT_HOUR + _attr_state_class = SensorStateClass.TOTAL_INCREASING + + +class VoltageSensor(RegularIntSensor): + """Common class for voltage sensors, in Volts.""" + + _attr_device_class = SensorDeviceClass.VOLTAGE + _attr_native_unit_of_measurement = UnitOfElectricPotential.VOLT + + +class CurrentSensor(RegularIntSensor): + """Common class for electric current sensors, in Amperes.""" + + _attr_device_class = SensorDeviceClass.CURRENT + _attr_native_unit_of_measurement = UnitOfElectricCurrent.AMPERE + + +class PowerSensor(RegularIntSensor): + """Common class for real power sensors, in Watts.""" + + _attr_device_class = SensorDeviceClass.POWER + _attr_native_unit_of_measurement = UnitOfPower.WATT + + +class ApparentPowerSensor(RegularIntSensor): + """Common class for apparent power sensors, in Volt-Amperes.""" + _attr_device_class = SensorDeviceClass.APPARENT_POWER + _attr_native_unit_of_measurement = UnitOfApparentPower.VOLT_AMPERE -class PEJPSensor(SensorEntity): + +class PEJPSensor(LinkyTICStringSensor): """Préavis Début EJP (30 min) sensor.""" # # This sensor could be improved I think (minutes as integer), but I do not have it to check and test its values # Leaving it as it is to facilitate future modifications # - - # Generic properties - # https://developers.home-assistant.io/docs/core/entity#generic-properties - _attr_has_entity_name = True - _attr_name = "Préavis Début EJP" - _attr_should_poll = True _attr_icon = "mdi:clock-start" - def __init__( - self, config_title: str, config_uniq_id: str, serial_reader: LinkyTICReader - ) -> None: + def __init__(self, config_title: str, config_uniq_id: str, serial_reader: LinkyTICReader) -> None: """Initialize a PEJP sensor.""" _LOGGER.debug("%s: initializing PEJP sensor", config_title) - # Linky TIC sensor properties - self._config_title = config_title - self._config_uniq_id = config_uniq_id - self._last_value: str | None = None - self._serial_controller = serial_reader - self._tag = "PEJP" - # Generic Entity properties - self._attr_unique_id = f"{DOMAIN}_{config_uniq_id}_{self._tag.lower()}" - - @property - def device_info(self) -> DeviceInfo: - """Return the device info.""" - return DeviceInfo( - # connections={(DID_CONNECTION_TYPE, self._serial_controller._port)}, - identifiers={(DOMAIN, self._serial_controller.device_identification[DID_REGNUMBER] or "Unknown")}, - manufacturer=self._serial_controller.device_identification[DID_CONSTRUCTOR], - model=self._serial_controller.device_identification[DID_TYPE], - name=DID_DEFAULT_NAME, + super().__init__( + tag="PEJP", + name="Préavis Début EJP", + config_title=config_title, + config_uniq_id=config_uniq_id, + serial_reader=serial_reader, ) - @property - def native_value(self) -> str | None: - """Value of the sensor.""" - return self._last_value - - @callback - def update(self): - """Update the value of the sensor from the thread object memory cache.""" - # Get last seen value from controller - value, _ = self._serial_controller.get_values(self._tag) - _LOGGER.debug( - "%s: retrieved %s value from serial controller: %s", - self._config_title, - self._tag, - repr(value), - ) - # Handle entity availability - if value is None: - if self._attr_available: - if not self._serial_controller.is_connected(): - _LOGGER.debug( - "%s: marking the %s sensor as unavailable: serial connection lost", - self._config_title, - self._tag, - ) - self._attr_available = False - elif self._serial_controller.has_read_full_frame(): - _LOGGER.info( - "%s: marking the %s sensor as unavailable: a full frame has been read but %s has not been found", - self._config_title, - self._tag, - self._tag, - ) - self._attr_available = False - # else: we are connected but a full frame has not been read yet, let's wait a little longer before marking it unavailable - else: - if not self._attr_available: - _LOGGER.info( - "%s: marking the %s sensor as available now !", - self._config_title, - self._tag, - ) - self._attr_available = True - # Save value - self._last_value = value + self._attr_unique_id = f"{DOMAIN}_{config_uniq_id}_{self._tag.lower()}" -class DateEtHeureSensor(RegularStrSensor): +class DateEtHeureSensor(LinkyTICStringSensor): """Date et heure courante sensor.""" + _attr_icon = "mdi:clock-outline" + def __init__( - self, config_title: str, config_uniq_id: str, serial_reader: LinkyTICReader, category: EntityCategory | None = None, + self, + config_title: str, + config_uniq_id: str, + serial_reader: LinkyTICReader, ) -> None: """Initialize a Date et heure sensor.""" _LOGGER.debug("%s: initializing Date et heure courante sensor", config_title) - super().__init__(tag="DATE", name="Date et heure courante", - config_title=config_title, config_uniq_id=config_uniq_id, - serial_reader=serial_reader, icon="mdi:clock-outline", - category=category) + super().__init__( + tag="DATE", + name="Date et heure courante", + config_title=config_title, + config_uniq_id=config_uniq_id, + serial_reader=serial_reader, + ) @callback def update(self): """Update the value of the sensor from the thread object memory cache.""" # Get last seen value from controller - _, horodate = self._serial_controller.get_values(self._tag) - _LOGGER.debug( - "%s: retrieved %s value from serial controller: %s", - self._config_title, - self._tag, - repr(horodate), - ) - # Handle entity availability - if horodate is None: - if self._attr_available: - if not self._serial_controller.is_connected(): - _LOGGER.debug( - "%s: marking the %s sensor as unavailable: serial connection lost", - self._config_title, - self._tag, - ) - self._attr_available = False - elif self._serial_controller.has_read_full_frame(): - _LOGGER.info( - "%s: marking the %s sensor as unavailable: a full frame has been read but %s has not been found", - self._config_title, - self._tag, - self._tag, - ) - self._attr_available = False - # else: we are connected but a full frame has not been read yet, let's wait a little longer before marking it unavailable - else: - if not self._attr_available: - _LOGGER.info( - "%s: marking the %s sensor as available now !", - self._config_title, - self._tag, - ) - self._attr_available = True - # Save value - saison = "" - if horodate[0:1] == 'E': + _, timestamp = self._update() + + if not timestamp: + return + # Save value + saison = "" + try: + if timestamp[0:1] == "E": saison = " (Eté)" - elif horodate[0:1] == 'H': + elif timestamp[0:1] == "H": saison = " (Hiver)" - self._last_value = horodate[5:7] + "/" + horodate[3:5] + "/" + horodate[1:3] + " " + horodate[7:9] + ":" + horodate[9:11] + saison + self._last_value = ( + timestamp[5:7] + + "/" + + timestamp[3:5] + + "/" + + timestamp[1:3] + + " " + + timestamp[7:9] + + ":" + + timestamp[9:11] + + saison + ) + except IndexError: + return -class ProfilDuProchainJourCalendrierFournisseurSensor(RegularStrSensor): +class ProfilDuProchainJourCalendrierFournisseurSensor(LinkyTICStringSensor): """Profil du prochain jour du calendrier fournisseur sensor.""" + _attr_icon = "mdi:calendar-month-outline" + def __init__( - self, config_title: str, config_uniq_id: str, serial_reader: LinkyTICReader, category: EntityCategory | None = None, + self, + config_title: str, + config_uniq_id: str, + serial_reader: LinkyTICReader, + category: EntityCategory | None = None, ) -> None: """Initialize a Profil du prochain jour du calendrier fournisseur sensor.""" _LOGGER.debug("%s: initializing Date et heure courante sensor", config_title) - super().__init__(tag="PJOURF+1", name="Profil du prochain jour calendrier fournisseur", - config_title=config_title, config_uniq_id=config_uniq_id, - serial_reader=serial_reader, icon="mdi:calendar-month-outline", - category=category) + super().__init__( + tag="PJOURF+1", + name="Profil du prochain jour calendrier fournisseur", + config_title=config_title, + config_uniq_id=config_uniq_id, + serial_reader=serial_reader, + ) @callback def update(self): """Update the value of the sensor from the thread object memory cache.""" # Get last seen value from controller - value, horodate = self._serial_controller.get_values(self._tag) - _LOGGER.debug( - "%s: retrieved %s value from serial controller: %s", - self._config_title, - self._tag, - repr(value), - ) - # Handle entity availability - if value is None: - if self._attr_available: - if not self._serial_controller.is_connected(): - _LOGGER.debug( - "%s: marking the %s sensor as unavailable: serial connection lost", - self._config_title, - self._tag, - ) - self._attr_available = False - elif self._serial_controller.has_read_full_frame(): - _LOGGER.info( - "%s: marking the %s sensor as unavailable: a full frame has been read but %s has not been found", - self._config_title, - self._tag, - self._tag, - ) - self._attr_available = False - # else: we are connected but a full frame has not been read yet, let's wait a little longer before marking it unavailable - else: - if not self._attr_available: - _LOGGER.info( - "%s: marking the %s sensor as available now !", - self._config_title, - self._tag, - ) - self._attr_available = True - # Save value - self._last_value = value.replace(" NONUTILE", "") + value, _ = self._update() + if not value: + return + self._last_value = value.replace("NONUTILE", "").strip() -class StatusRegisterData(RegularStrSensor): +class LinkyTICStatusRegisterSensor(LinkyTICStringSensor): """Data from status register.""" + _attr_has_entity_name = True _attr_should_poll = True + _attr_device_class = SensorDeviceClass.ENUM def __init__( self, @@ -1936,186 +1577,36 @@ def __init__( config_title: str, config_uniq_id: str, serial_reader: LinkyTICReader, - data: StatusRegister, + field: StatusRegister, enabled_by_default: bool = True, icon: str | None = None, - category: EntityCategory | None = None, ) -> None: """Initialize a status register data sensor.""" _LOGGER.debug("%s: initializing a status register data sensor", config_title) - self._data = data - super().__init__(tag="STGE", name=name, config_title=config_title, - config_uniq_id=config_uniq_id, serial_reader=serial_reader, - icon=icon, category=category, enabled_by_default=enabled_by_default) - self._attr_unique_id = f"{DOMAIN}_{config_uniq_id}_stge_{data.value}" + self._field = field + super().__init__( + tag="STGE", + name=name, + config_title=config_title, + config_uniq_id=config_uniq_id, + serial_reader=serial_reader, + icon=icon, + enabled_by_default=enabled_by_default, + ) + self._attr_unique_id = f"{DOMAIN}_{config_uniq_id}_{field.name.lower()}" # Breaking changes here. + # For SensorDeviceClass.ENUM, _attr_options contains all the possible values for the sensor. + self._attr_options = list(cast(dict[int, str], field.value.options).values()) @callback def update(self): """Update the value of the sensor from the thread object memory cache.""" # Get last seen value from controller - value, _ = self._serial_controller.get_values(self._tag) - _LOGGER.debug( - "%s: retrieved %s value from serial controller: %s", - self._config_title, - self._tag, - repr(value), - ) - # Handle entity availability - if value is None: - if self._attr_available: - if not self._serial_controller.is_connected(): - _LOGGER.debug( - "%s: marking the %s sensor as unavailable: serial connection lost", - self._config_title, - self._tag, - ) - self._attr_available = False - elif self._serial_controller.has_read_full_frame(): - _LOGGER.info( - "%s: marking the %s sensor as unavailable: a full frame has been read but %s has not been found", - self._config_title, - self._tag, - self._tag, - ) - self._attr_available = False - # else: we are connected but a full frame has not been read yet, let's wait a little longer before marking it unavailable - else: - if not self._attr_available: - _LOGGER.info( - "%s: marking the %s sensor as available now !", - self._config_title, - self._tag, - ) - self._attr_available = True - - try: - val = int(value, 16) - - # Save value - if self._data == StatusRegister.CONTACT_SEC: - self._last_value = "Ouvert" if (val & 0x01) else "Fermé" - - elif self._data == StatusRegister.ORGANE_DE_COUPURE: - val_organe_de_coupure = (val >> 1) & 0x07 - if val_organe_de_coupure == 0: - self._last_value = "Fermé" - elif val_organe_de_coupure == 1: - self._last_value = "Ouvert sur surpuissance" - elif val_organe_de_coupure == 2: - self._last_value = "Ouvert sur surtension" - elif val_organe_de_coupure == 3: - self._last_value = "Ouvert sur délestage" - elif val_organe_de_coupure == 4: - self._last_value = "Ouvert sur ordre CPL ou Euridis" - elif val_organe_de_coupure == 5: - self._last_value = "Ouvert sur une surchauffe (>Imax)" - elif val_organe_de_coupure == 6: - self._last_value = "Ouvert sur une surchauffe (> 4) & 0x01) else "Fermé" - - elif self._data == StatusRegister.SURTENSION_SUR_UNE_DES_PHASES: - self._last_value = "Surtension" if ((val >> 6) & 0x01) else "Pas de surtension" - - elif self._data == StatusRegister.DEPASSEMENT_PUISSANCE_REFERENCE: - self._last_value = "Dépassement en cours" if ((val >> 7) & 0x01) else "Pas de dépassement" - - elif self._data == StatusRegister.PRODUCTEUR_CONSOMMATEUR: - self._last_value = "Producteur" if ((val >> 8) & 0x01) else "Consommateur" - - elif self._data == StatusRegister.SENS_ENERGIE_ACTIVE: - self._last_value = "Energie active négative" if ((val >> 9) & 0x01) else "Energie active positive" - - elif self._data == StatusRegister.TARIF_CONTRAT_FOURNITURE: - index = (val >> 10) & 0x0F - self._last_value = "Energie ventillée sur index " + str(index + 1) - - elif self._data == StatusRegister.TARIF_CONTRAT_DISTRIBUTEUR: - index = (val >> 14) & 0x03 - self._last_value = "Energie ventillée sur index " + str(index + 1) - - elif self._data == StatusRegister.MODE_DEGRADE_HORLOGE: - self._last_value = "Horloge en mode dégradée" if ((val >> 16) & 0x01) else "Horloge correcte" - - elif self._data == StatusRegister.MODE_TIC: - self._last_value = "Mode standard" if ((val >> 17) & 0x01) else "Mode historique" - - elif self._data == StatusRegister.ETAT_SORTIE_COMMUNICATION_EURIDIS: - etat = (val >> 19) & 0x03 - if etat == 0: - self._last_value = "Désactivée" - elif etat == 1: - self._last_value = "Activée sans sécurité" - elif etat == 3: - self._last_value = "Activée avec sécurité" - else: - self._last_value = "Inconnue" - - elif self._data == StatusRegister.STATUS_CPL: - etat = (val >> 21) & 0x03 - if etat == 0: - self._last_value = "New/Unlock" - elif etat == 1: - self._last_value = "New/Lock" - elif etat == 2: - self._last_value = "Registered" - else: - self._last_value = "Inconnue" - - elif self._data == StatusRegister.SYNCHRO_CPL: - self._last_value = "Compteur synchronisé" if ((val >> 23) & 0x01) else "Compteur non synchronisé" - - elif self._data == StatusRegister.COULEUR_JOUR_CONTRAT_TEMPO: - etat = (val >> 24) & 0x03 - if etat == 0: - self._last_value = "Pas d'annonce" - elif etat == 1: - self._last_value = "Bleu" - elif etat == 2: - self._last_value = "Blanc" - else: - self._last_value = "Rouge" - - elif self._data == StatusRegister.COULEUR_LENDEMAIN_CONTRAT_TEMPO: - etat = (val >> 26) & 0x03 - if etat == 0: - self._last_value = "Pas d'annonce" - elif etat == 1: - self._last_value = "Bleu" - elif etat == 2: - self._last_value = "Blanc" - else: - self._last_value = "Rouge" - - elif self._data == StatusRegister.PREAVIS_POINTES_MOBILES: - etat = (val >> 28) & 0x03 - if etat == 0: - self._last_value = "Pas de préavis en cours" - elif etat == 1: - self._last_value = "Préavis PM1 en cours" - elif etat == 2: - self._last_value = "Préavis PM2 en cours" - else: - self._last_value = "Préavis PM3 en cours" - - elif self._data == StatusRegister.POINTE_MOBILE: - etat = (val >> 28) & 0x03 - if etat == 0: - self._last_value = "Pas de pointe mobile" - elif etat == 1: - self._last_value = "PM1 en cours" - elif etat == 2: - self._last_value = "PM2 en cours" - else: - self._last_value = "PM3 en cours" - - else: - self._last_value = self._data.name - - except ValueError: - _LOGGER.error( - "%s: Invalid status register : %s", - self._config_title, - value, - ) + value, _ = self._update() + + if not value: + return + + try: + self._last_value = cast(str, self._field.value.get_status(value)) + except IndexError: + pass # Failsafe, value is unchanged. diff --git a/custom_components/linkytic/serial_reader.py b/custom_components/linkytic/serial_reader.py index 4ffacbd..72ae33a 100644 --- a/custom_components/linkytic/serial_reader.py +++ b/custom_components/linkytic/serial_reader.py @@ -1,4 +1,5 @@ """The linkytic integration serial reader.""" + from __future__ import annotations from collections.abc import Callable @@ -23,6 +24,7 @@ DID_YEAR, FRAME_END, LINE_END, + LINKY_IO_ERRORS, MODE_HISTORIC_BAUD_RATE, MODE_HISTORIC_FIELD_SEPARATOR, MODE_STANDARD_BAUD_RATE, @@ -39,10 +41,8 @@ class LinkyTICReader(threading.Thread): """Implements the reading of a serial Linky TIC.""" - def __init__( - self, title: str, port, std_mode, producer_mode, three_phase, real_time: bool | None = False - ) -> None: - """Init the LinkyTIC thread serial reader.""" # Thread + def __init__(self, title: str, port, std_mode, producer_mode, three_phase, real_time: bool | None = False) -> None: + """Init the LinkyTIC thread serial reader.""" # Thread self._stopsignal = False self._title = title # Options @@ -51,13 +51,9 @@ def __init__( self._realtime = real_time # Build self._port = port - self._baudrate = ( - MODE_STANDARD_BAUD_RATE if std_mode else MODE_HISTORIC_BAUD_RATE - ) + self._baudrate = MODE_STANDARD_BAUD_RATE if std_mode else MODE_HISTORIC_BAUD_RATE self._std_mode = std_mode - self._producer_mode = ( - producer_mode if std_mode else False - ) + self._producer_mode = producer_mode if std_mode else False self._three_phase = three_phase # Run self._reader: serial.Serial | None = None @@ -74,11 +70,15 @@ def __init__( } # will be set by the ADCO/ADSC tag self._notif_callbacks: dict[str, Callable[[bool], None]] = {} # Init parent thread class + self._serial_number = None super().__init__(name=f"LinkyTIC for {title}") + # Open port: failure will be reported to async_setup_entry + self._open_serial() + def get_values(self, tag) -> tuple[str | None, str | None]: """Get tag value and timestamp from the thread memory cache.""" - if not self.is_connected(): + if not self.is_connected: return None, None try: payload = self._values[tag] @@ -86,84 +86,98 @@ def get_values(self, tag) -> tuple[str | None, str | None]: except KeyError: return None, None + @property def has_read_full_frame(self) -> bool: """Use to known if at least one complete frame has been read on the serial connection.""" return self._frames_read >= 1 + @property def is_connected(self) -> bool: """Use to know if the reader is actually connected to a serial connection.""" if self._reader is None: return False return self._reader.is_open + @property + def serial_number(self) -> str | None: + """Returns meter serial number (ADSC or ADCO tag).""" + return self._serial_number + + @property + def port(self) -> str: + """Returns serial port.""" + return self._port + def run(self): """Continuously read the the serial connection and extract TIC values.""" while not self._stopsignal: - try: - # Try to open a connection - if self._reader is None: - self._open_serial() - continue - # Now that we have a connection, read its output + # Reader should have been opened. + assert self._reader is not None + if not self._reader.is_open: + # NOTE: implement a maximum retry, and go in failure mode if the connection can't be renewed? try: - line = self._reader.readline() - except serial.SerialException as exc: - _LOGGER.exception( - "Error while reading serial device %s: %s. Will retry in 5s", - self._port, - exc, - ) - self._reset_state() + self._reader.open() + except LINKY_IO_ERRORS: + time.sleep(5) # Cooldown to prevent spamming logs. + _LOGGER.warning("Could not open port") + finally: continue - # Parse the line - tag = self._parse_line(line) + try: + line = self._reader.readline() + except LINKY_IO_ERRORS as exc: + _LOGGER.error( + "Error while reading serial device %s: %s. Will retry in 5s", + self._port, + exc, + ) + self._reset_state() + self._reader.close() + continue + + # Parse the line if non empty (prevent errors from read timeout that returns empty byte string) + if not line: + continue + tag = self._parse_line(line) + if tag is not None: + # Mark this tag as seen for end of frame cache cleanup + self._tags_seen.append(tag) + # Handle short burst for tri-phase historic mode + if ( + not self._std_mode + and self._three_phase + and not self._within_short_frame + and tag in SHORT_FRAME_DETECTION_TAGS + ): + _LOGGER.warning( + "Short trame burst detected (%s): switching to forced update mode", + tag, + ) + self._within_short_frame = True + # If we have a notification callback for this tag, call it + try: + notif_callback = self._notif_callbacks[tag] + _LOGGER.debug("We have a notification callback for %s: executing", tag) + forced_update = self._realtime + # Special case for forced_update: historic tree-phase short frame + if self._within_short_frame and tag in SHORT_FRAME_FORCED_UPDATE_TAGS: + forced_update = True + # Special case for forced_update: historic single-phase ADPS + if tag == "ADPS": + forced_update = True + notif_callback(forced_update) + except KeyError: + pass + # Handle frame end + if FRAME_END in line: + if self._within_short_frame: + # burst / short frame (exceptional) + self._within_short_frame = False + else: + # regular long frame + self._frames_read += 1 + self._cleanup_cache() if tag is not None: - # Mark this tag as seen for end of frame cache cleanup - self._tags_seen.append(tag) - # Handle short burst for tri-phase historic mode - if ( - not self._std_mode - and self._three_phase - and not self._within_short_frame - and tag in SHORT_FRAME_DETECTION_TAGS - ): - _LOGGER.warning( - "Short trame burst detected (%s): switching to forced update mode", - tag, - ) - self._within_short_frame = True - # If we have a notification callback for this tag, call it - try: - notif_callback = self._notif_callbacks[tag] - _LOGGER.debug( - "We have a notification callback for %s: executing", tag - ) - forced_update = self._realtime - # Special case for forced_update: historic tree-phase short frame - if ( - self._within_short_frame - and tag in SHORT_FRAME_FORCED_UPDATE_TAGS - ): - forced_update = True - # Special case for forced_update: historic single-phase ADPS - if tag == "ADPS": - forced_update = True - notif_callback(forced_update) - except KeyError: - pass - # Handle frame end - if FRAME_END in line: - if self._within_short_frame: - # burst / short frame (exceptional) - self._within_short_frame = False - else: - # regular long frame - self._frames_read += 1 - self._cleanup_cache() - if tag is not None: - _LOGGER.debug("End of frame, last tag read: %s", tag) - except Exception as e: - _LOGGER.exception("encountered an unexpected exception on the serial thread, catching it to avoid thread crash: %s", e) + _LOGGER.debug("End of frame, last tag read: %s", tag) # Stop flag as been activated _LOGGER.info("Thread stop: closing the serial connection") if self._reader: @@ -178,9 +192,7 @@ def register_push_notif(self, tag: str, notif_callback: Callable[[bool], None]): def signalstop(self, event): """Activate the stop flag in order to stop the thread from within.""" if self.is_alive(): - _LOGGER.info( - "Stopping %s serial thread reader (received %s)", self._title, event - ) + _LOGGER.info("Stopping %s serial thread reader (received %s)", self._title, event) self._stopsignal = True def update_options(self, real_time: bool): @@ -190,9 +202,7 @@ def update_options(self, real_time: bool): def _cleanup_cache(self): """Call to cleanup the data cache to allow some sensors to get back to undefined/unavailable if they are not present in the last frame.""" - for cached_tag in list( - self._values.keys() - ): # pylint: disable=consider-using-dict-items,consider-iterating-dictionary + for cached_tag in list(self._values.keys()): # pylint: disable=consider-using-dict-items,consider-iterating-dictionary if cached_tag not in self._tags_seen: _LOGGER.debug( "tag %s was present in cache but has not been seen in previous frame: removing from cache", @@ -210,29 +220,22 @@ def _cleanup_cache(self): def _open_serial(self): """Create (and open) the serial connection.""" - try: - self._reader = serial.serial_for_url( - url=self._port, - baudrate=self._baudrate, - bytesize=BYTESIZE, - parity=PARITY, - stopbits=STOPBITS, - timeout=1, - ) - _LOGGER.info("Serial connection is now open") - except serial.serialutil.SerialException as exc: - _LOGGER.error( - "Unable to connect to the serial device %s: %s", - self._port, - exc, - ) - self._reset_state() + self._reset_state() + self._reader = serial.serial_for_url( + url=self._port, + baudrate=self._baudrate, + bytesize=BYTESIZE, + parity=PARITY, + stopbits=STOPBITS, + timeout=1, + ) + _LOGGER.info("Serial connection is now open at %s", self._port) def _reset_state(self): """Reinitialize the controller (by nullifying it) and wait 5s for other methods to re start init after a pause.""" _LOGGER.debug("Resetting serial reader state and wait 10s") - self._reader = None self._values = {} + self._serial_number = None # Inform sensor in push mode to come fetch data (will get None and switch to unavailable) for notif_callback in self._notif_callbacks.values(): notif_callback(self._realtime) @@ -247,7 +250,6 @@ def _reset_state(self): DID_TYPE_CODE: None, DID_YEAR: None, } - time.sleep(10) def _parse_line(self, line) -> str | None: """Parse a line when a full line has been read from serial. It parses it as Linky TIC infos, validate its checksum and save internally the line infos.""" @@ -260,6 +262,8 @@ def _parse_line(self, line) -> str | None: _LOGGER.debug("line to parse: %s", repr(line)) # cleanup the line line = line.rstrip(LINE_END).rstrip(FRAME_END) + if not line: + return None # extract the fields by parsing the line given the mode timestamp = None if self._std_mode: @@ -323,9 +327,7 @@ def _parse_line(self, line) -> str | None: self.parse_ads(payload["value"]) return tag - def _validate_checksum( - self, tag: bytes, timestamp: bytes | None, value: bytes, checksum: bytes - ): + def _validate_checksum(self, tag: bytes, timestamp: bytes | None, value: bytes, checksum: bytes): # rebuild the frame if self._std_mode: sep = MODE_STANDARD_FIELD_SEPARATOR @@ -345,14 +347,10 @@ def _validate_checksum( # validate try: if computed_checksum != ord(checksum): - raise InvalidChecksum( - tag, timestamp, value, sum1, truncated, computed_checksum, checksum - ) + raise InvalidChecksum(tag, timestamp, value, sum1, truncated, computed_checksum, checksum) except TypeError as exc: # see https://github.com/hekmon/linkytic/issues/9 - _LOGGER.exception( - "Encountered an unexpected checksum (%s): %s", exc, checksum - ) + _LOGGER.exception("Encountered an unexpected checksum (%s): %s", exc, checksum) raise InvalidChecksum( tag, timestamp, @@ -360,9 +358,7 @@ def _validate_checksum( sum1, truncated, computed_checksum, - bytes( - "0", encoding="ascii" - ), # fake expected checksum to avoid type error on ord() + bytes("0", encoding="ascii"), # fake expected checksum to avoid type error on ord() ) from exc def parse_ads(self, ads): @@ -380,14 +376,20 @@ def parse_ads(self, ads): ads, ) return + + # Because S/N is a device identifier, only parse it once. + if self.serial_number: + return + + # Save serial number + self._serial_number = ads + # let's parse ADS as EURIDIS device_identification = {DID_YEAR: ads[2:4], DID_REGNUMBER: ads[6:]} # # Parse constructor code device_identification[DID_CONSTRUCTOR_CODE] = ads[0:2] try: - device_identification[DID_CONSTRUCTOR] = CONSTRUCTORS_CODES[ - device_identification[DID_CONSTRUCTOR_CODE] - ] + device_identification[DID_CONSTRUCTOR] = CONSTRUCTORS_CODES[device_identification[DID_CONSTRUCTOR_CODE]] except KeyError: _LOGGER.warning( "%s: constructor code is unknown: %s", @@ -400,9 +402,7 @@ def parse_ads(self, ads): try: device_identification[DID_TYPE] = f"{DEVICE_TYPES[device_identification[DID_TYPE_CODE]]}" except KeyError: - _LOGGER.warning( - "%s: ADS device type is unknown: %s", self._title, device_identification[DID_TYPE_CODE] - ) + _LOGGER.warning("%s: ADS device type is unknown: %s", self._title, device_identification[DID_TYPE_CODE]) device_identification[DID_TYPE] = None # # Update device infos self.device_identification = device_identification @@ -475,9 +475,7 @@ def linky_tic_tester(device: str, std_mode: bool) -> None: timeout=1, ) except serial.serialutil.SerialException as exc: - raise CannotConnect( - f"Unable to connect to the serial device {device}: {exc}" - ) from exc + raise CannotConnect(f"Unable to connect to the serial device {device}: {exc}") from exc # Try to read a line try: serial_reader.readline() @@ -491,7 +489,7 @@ def linky_tic_tester(device: str, std_mode: bool) -> None: class CannotConnect(Exception): """Error to indicate we cannot connect.""" - def __init__(self, message): + def __init__(self, message) -> None: """Initialize the CannotConnect error with an explanation message.""" super().__init__(message) @@ -499,6 +497,6 @@ def __init__(self, message): class CannotRead(Exception): """Error to indicate that the serial connection was open successfully but an error occurred while reading a line.""" - def __init__(self, message): + def __init__(self, message) -> None: """Initialize the CannotRead error with an explanation message.""" super().__init__(message) diff --git a/custom_components/linkytic/status_register.py b/custom_components/linkytic/status_register.py new file mode 100644 index 0000000..8d93fd5 --- /dev/null +++ b/custom_components/linkytic/status_register.py @@ -0,0 +1,85 @@ +"""Definition of status register fields and handlers.""" + +from enum import Enum +from typing import NamedTuple + + +class StatusRegisterEnumValueType(NamedTuple): + """Represent a field in the status register. + lsb is the position of the least significant bit of the field in the status register. + len if the length in bit of the field (default 1 bit). + options is a dictionary mapping field int value to string. If no options, the value is treated as a boolean. + """ + + lsb: int + len: int = 1 + options: dict[int, str] | None = None + + def get_status(self, register: str) -> str | bool: + try: + int_register = int(register, base=16) + except TypeError: + return False + val = (int_register >> self.lsb) & (1 << (self.len - 1)) + + if self.options is None: + return bool(val) + + return self.options[val] # Let IndexError propagate if val is unknwon. + + +organe_coupure = { + 0: "Fermé", + 1: "Ouvert sur surpuissance", + 2: "Ouvert sur surtension", + 3: "Ouvert sur délestage", + 4: "Ouvert sur ordre CPL ou Euridis", + 5: "Ouvert sur surchauffe (>Imax)", + 6: "Ouvert sur surchauffe (