Skip to content

Commit

Permalink
Merge branch 'main' into add_early_linky
Browse files Browse the repository at this point in the history
  • Loading branch information
hekmon authored Nov 14, 2024
2 parents 591595b + 9f57992 commit ef97941
Show file tree
Hide file tree
Showing 9 changed files with 967 additions and 1,167 deletions.
77 changes: 67 additions & 10 deletions custom_components/linkytic/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
"""The linkytic integration."""

from __future__ import annotations
import asyncio

import logging

from homeassistant.config_entries import ConfigEntry
from homeassistant.config_entries import ConfigEntry, ConfigEntryNotReady
from homeassistant.const import EVENT_HOMEASSISTANT_STOP, Platform
from homeassistant.core import HomeAssistant
from homeassistant.components import usb

from .const import (
DOMAIN,
Expand All @@ -15,6 +18,7 @@
SETUP_TICMODE,
SETUP_PRODUCER,
TICMODE_STANDARD,
LINKY_IO_ERRORS,
)
from .serial_reader import LinkyTICReader

Expand All @@ -26,15 +30,39 @@
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up linkytic from a config entry."""
# Create the serial reader thread and start it
serial_reader = LinkyTICReader(
title=entry.title,
port=entry.data.get(SETUP_SERIAL),
std_mode=entry.data.get(SETUP_TICMODE) == TICMODE_STANDARD,
producer_mode=entry.data.get(SETUP_PRODUCER),
three_phase=entry.data.get(SETUP_THREEPHASE),
real_time=entry.options.get(OPTIONS_REALTIME),
)
serial_reader.start()
port = entry.data.get(SETUP_SERIAL)
try:
serial_reader = LinkyTICReader(
title=entry.title,
port=port,
std_mode=entry.data.get(SETUP_TICMODE) == TICMODE_STANDARD,
producer_mode=entry.data.get(SETUP_PRODUCER),
three_phase=entry.data.get(SETUP_THREEPHASE),
real_time=entry.options.get(OPTIONS_REALTIME),
)
serial_reader.start()

async def read_serial_number(serial: LinkyTICReader):
while serial.serial_number is None:
await asyncio.sleep(1)
return serial.serial_number

s_n = await asyncio.wait_for(read_serial_number(serial_reader), timeout=5)
# TODO: check if S/N is the one saved in config entry, if not this is a different meter!

# Error when opening serial port.
except LINKY_IO_ERRORS as e:
raise ConfigEntryNotReady(f"Couldn't open serial port {port}: {e}") from e

# Timeout waiting for S/N to be read.
except TimeoutError as e:
serial_reader.signalstop("linkytic_timeout")
raise ConfigEntryNotReady(
"Connected to serial port but coulnd't read serial number before timeout: check if TIC is connected and active."
) from e

_LOGGER.info(f"Device connected with serial number: {s_n}")

hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, serial_reader.signalstop)
# Add options callback
entry.async_on_unload(entry.add_update_listener(update_listener))
Expand Down Expand Up @@ -71,3 +99,32 @@ async def update_listener(hass: HomeAssistant, entry: ConfigEntry):
return
# Update its options
serial_reader.update_options(entry.options.get(OPTIONS_REALTIME))


async def async_migrate_entry(hass: HomeAssistant, config_entry: ConfigEntry):
"""Migrate old entry."""
_LOGGER.info("Migrating from version %d.%d", config_entry.version, config_entry.minor_version)

if config_entry.version == 1:
new = {**config_entry.data}

if config_entry.minor_version < 2:
# Migrate to serial by-id.
serial_by_id = await hass.async_add_executor_job(usb.get_serial_by_id, new[SETUP_SERIAL])
if serial_by_id == new[SETUP_SERIAL]:
_LOGGER.warning(
f"Couldn't find a persistent /dev/serial/by-id alias for {serial_by_id}. "
"Problems might occur at startup if device names are not persistent."
)
else:
new[SETUP_SERIAL] = serial_by_id

config_entry.minor_version = 2
hass.config_entries.async_update_entry(config_entry, data=new)

_LOGGER.info(
"Migration to version %d.%d successful",
config_entry.version,
config_entry.minor_version,
)
return True
230 changes: 179 additions & 51 deletions custom_components/linkytic/binary_sensor.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,73 @@
"""Binary sensors for linkytic integration."""

from __future__ import annotations

import asyncio
import logging
from typing import Optional, cast

from homeassistant.components.binary_sensor import (
BinarySensorDeviceClass,
BinarySensorEntity,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity import DeviceInfo, EntityCategory
from homeassistant.helpers.entity import EntityCategory
from homeassistant.helpers.entity_platform import AddEntitiesCallback

from .const import (
DID_CONNECTION_TYPE,
DID_CONSTRUCTOR,
DID_DEFAULT_NAME,
DID_REGNUMBER,
DID_TYPE,
DOMAIN,
)
from .const import DOMAIN, SETUP_TICMODE, TICMODE_STANDARD
from .serial_reader import LinkyTICReader
from .entity import LinkyTICEntity
from .status_register import StatusRegister

_LOGGER = logging.getLogger(__name__)

STATUS_REGISTER_SENSORS = (
(
StatusRegister.CONTACT_SEC,
"Contact sec",
BinarySensorDeviceClass.OPENING,
"mdi:electric-switch-closed",
"mdi-electric-switch",
False,
),
(
StatusRegister.ETAT_DU_CACHE_BORNE_DISTRIBUTEUR,
"Cache-borne",
BinarySensorDeviceClass.OPENING,
"mdi:toy-brick",
"mdi:toy-brick-outline",
False,
),
(
StatusRegister.SURTENSION_SUR_UNE_DES_PHASES,
"Surtension",
BinarySensorDeviceClass.PRESENCE,
"mdi:flash-triangle-outline",
"mdi:flash-triangle",
False,
),
(
StatusRegister.DEPASSEMENT_PUISSANCE_REFERENCE,
"Dépassement puissance",
BinarySensorDeviceClass.PRESENCE,
"mdi:transmission-tower",
"mdi:transmission-tower-off",
False,
),
(StatusRegister.PRODUCTEUR_CONSOMMATEUR, "Producteur", None, "mdi:transmission-tower-export", None, False),
(StatusRegister.SENS_ENERGIE_ACTIVE, "Sens énergie active", None, "mdi:transmission-tower-export", None, False),
(
StatusRegister.MODE_DEGRADE_HORLOGE,
"Synchronisation horloge",
BinarySensorDeviceClass.LOCK,
"mdi:sync",
"mdi:sync-off",
False,
),
(StatusRegister.MODE_TIC, "Mode historique", None, "mdi:tag", None, False),
(StatusRegister.SYNCHRO_CPL, "Synchronisation CPL", BinarySensorDeviceClass.LOCK, "mdi:sync", "mdi:sync-off", True),
)


# config flow setup
async def async_setup_entry(
Expand All @@ -43,67 +86,152 @@ async def async_setup_entry(
config_entry.title,
)
return
# Wait a bit for the controller to feed on serial frames (home assistant warns after 10s)
_LOGGER.debug(
"%s: waiting at most 9s before setting up binary sensor plateform in order for the async serial reader to have time to parse a full frame",
config_entry.title,
)
for i in range(9):
await asyncio.sleep(1)
if serial_reader.has_read_full_frame():
_LOGGER.debug(
"%s: a full frame has been read, initializing sensors",
config_entry.title,
)
break
if i == 8:
_LOGGER.warning(
"%s: wait time is over but a full frame has yet to be read: initializing sensors anyway",
config_entry.title,
)
# Init sensors
async_add_entities(
[SerialConnectivity(config_entry.title, config_entry.entry_id, serial_reader)],
True,
)
sensors: list[BinarySensorEntity] = [SerialConnectivity(config_entry.title, config_entry.entry_id, serial_reader)]

if config_entry.data.get(SETUP_TICMODE) == TICMODE_STANDARD:
sensors.extend(
StatusRegisterBinarySensor(
name=name,
config_title=config_entry.title,
field=field,
serial_reader=serial_reader,
unique_id=config_entry.entry_id,
device_class=devclass,
icon_off=icon_off,
icon_on=icon_on,
inverted=inverted,
)
for field, name, devclass, icon_off, icon_on, inverted in STATUS_REGISTER_SENSORS
)

async_add_entities(sensors, True)

class SerialConnectivity(BinarySensorEntity):

class SerialConnectivity(LinkyTICEntity, BinarySensorEntity):
"""Serial connectivity to the Linky TIC serial interface."""

# Generic properties
# https://developers.home-assistant.io/docs/core/entity#generic-properties
_attr_has_entity_name = True
_attr_entity_category = EntityCategory.DIAGNOSTIC
_attr_name = "Connectivité du lien série"
_attr_should_poll = True

# Binary sensor properties
# https://developers.home-assistant.io/docs/core/entity/binary-sensor/#properties
_attr_device_class = BinarySensorDeviceClass.CONNECTIVITY

def __init__(
self, title: str, uniq_id: str | None, serial_reader: LinkyTICReader
) -> None:
def __init__(self, title: str, unique_id: str, serial_reader: LinkyTICReader) -> None:
"""Initialize the SerialConnectivity binary sensor."""
_LOGGER.debug("%s: initializing Serial Connectivity binary sensor", title)
super().__init__(serial_reader)
self._title = title
self._attr_unique_id = f"{DOMAIN}_{uniq_id}_serial_connectivity"
self._serial_controller = serial_reader
self._device_uniq_id = uniq_id if uniq_id is not None else "yaml_legacy"
self._attr_unique_id = f"{DOMAIN}_{unique_id}_serial_connectivity"

@property
def device_info(self) -> DeviceInfo:
"""Return the device info."""
return DeviceInfo(
# connections={(DID_CONNECTION_TYPE, self._serial_controller._port)},
identifiers={(DOMAIN, self._serial_controller.device_identification[DID_REGNUMBER])},
manufacturer=self._serial_controller.device_identification[DID_CONSTRUCTOR],
model=self._serial_controller.device_identification[DID_TYPE],
name=DID_DEFAULT_NAME,
)
def is_on(self) -> bool:
"""Value of the sensor."""
return self._serial_controller.is_connected


class StatusRegisterBinarySensor(LinkyTICEntity, BinarySensorEntity):
"""Binary sensor for binary status register fields."""

_attr_entity_category = EntityCategory.DIAGNOSTIC

_binary_state: bool
_tag = "STGE"

def __init__(
self,
name: str,
config_title: str,
unique_id: str,
serial_reader: LinkyTICReader,
field: StatusRegister,
device_class: BinarySensorDeviceClass | None = None,
icon_on: str | None = None,
icon_off: str | None = None,
inverted: bool = False,
) -> None:
"""Initialize the status register binary sensor."""
_LOGGER.debug("%s: initializing %s binary sensor", config_title, field.name)
super().__init__(serial_reader)

self._config_title = config_title
self._binary_state = False # Default state.
self._inverted = inverted
self._field = field
self._attr_name = name
self._attr_unique_id = f"{DOMAIN}_{unique_id}_{field.name.lower()}"
if device_class:
self._attr_device_class = device_class

self._icon_on = icon_on
self._icon_off = icon_off

@property
def is_on(self) -> bool:
"""Value of the sensor."""
return self._serial_controller.is_connected()
return self._binary_state ^ self._inverted

@property
def icon(self) -> str | None:
"""Return icon of the sensor."""
if not self._icon_off or not self._icon_on:
return self._icon_on or self._icon_off or super().icon

if self.is_on:
return self._icon_on
else:
return self._icon_off

def update(self) -> None:
"""Update the state of the sensor."""
value, _ = self._update()
if not value:
return
self._binary_state = cast(bool, self._field.value.get_status(value))

# TODO: factor _update function to remove copy from sensors entities
def _update(self) -> tuple[Optional[str], Optional[str]]:
"""Get value and/or timestamp from cached data. Responsible for updating sensor availability."""
value, timestamp = self._serial_controller.get_values(self._tag)
_LOGGER.debug(
"%s: retrieved %s value from serial controller: (%s, %s)", self._config_title, self._tag, value, timestamp
)

if not value and not timestamp: # No data returned.
if not self.available:
# Sensor is already unavailable, no need to check why.
return None, None
if not self._serial_controller.is_connected:
_LOGGER.debug(
"%s: marking the %s sensor as unavailable: serial connection lost",
self._config_title,
self._tag,
)
self._attr_available = False
elif self._serial_controller.has_read_full_frame:
_LOGGER.info(
"%s: marking the %s sensor as unavailable: a full frame has been read but %s has not been found",
self._config_title,
self._tag,
self._tag,
)
self._attr_available = False
else:
# A frame has not been read yet (it should!) or is already unavailable and no new data was fetched.
# Let sensor in current availability state.
pass
return None, None

if not self.available:
# Data is available, so is sensor
self._attr_available = True
_LOGGER.info(
"%s: marking the %s sensor as available now !",
self._config_title,
self._tag,
)

return value, timestamp
Loading

0 comments on commit ef97941

Please sign in to comment.