"""The Linky Tariff integration.""" from __future__ import annotations import logging from typing import Any from homeassistant.config_entries import ConfigEntry from homeassistant.const import CONF_IEEE from homeassistant.core import HomeAssistant from homeassistant.helpers import device_registry as dr from homeassistant.helpers.update_coordinator import DataUpdateCoordinator from .const import ( DOMAIN, CONF_POLL_INTERVAL, CLUSTER_ID, ATTRIBUTE_ID, ENDPOINT_ID, PLATFORMS, ) _LOGGER = logging.getLogger(__name__) async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: """Set up Linky Tariff from a config entry.""" hass.data.setdefault(DOMAIN, {}) coordinator = LinkyTariffCoordinator( hass, entry.data[CONF_IEEE], entry.options.get(CONF_POLL_INTERVAL, entry.data.get(CONF_POLL_INTERVAL, 60)), ) await coordinator.async_config_entry_first_refresh() hass.data[DOMAIN][entry.entry_id] = coordinator await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS) entry.async_on_unload(entry.add_update_listener(async_update_options)) return True async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: """Unload a config entry.""" if unload_ok := await hass.config_entries.async_unload_platforms(entry, PLATFORMS): hass.data[DOMAIN].pop(entry.entry_id) return unload_ok async def async_update_options(hass: HomeAssistant, entry: ConfigEntry) -> None: """Update options.""" await hass.config_entries.async_reload(entry.entry_id) class LinkyTariffCoordinator(DataUpdateCoordinator): """Class to manage fetching Linky Tariff data.""" def __init__(self, hass: HomeAssistant, ieee: str, poll_interval: int) -> None: """Initialize.""" super().__init__( hass, _LOGGER, name=DOMAIN, update_interval=poll_interval, ) self.ieee = ieee async def _async_update_data(self) -> dict[str, Any]: """Fetch data from ZHA device.""" try: if "zha" not in self.hass.data: raise ValueError("ZHA integration not loaded") zha_gateway = self.hass.data["zha"].get("zha_gateway") if not zha_gateway: raise ValueError("ZHA gateway not available") zha_device = zha_gateway.devices.get(self.ieee) if not zha_device: raise ValueError(f"Device {self.ieee} not found in ZHA") endpoint = zha_device.endpoints.get(ENDPOINT_ID) if not endpoint: raise ValueError(f"Endpoint {ENDPOINT_ID} not found") cluster = endpoint.in_clusters.get(CLUSTER_ID) if not cluster: raise ValueError(f"Cluster {CLUSTER_ID} not found") result = await cluster.read_attributes([ATTRIBUTE_ID]) value = result.get(ATTRIBUTE_ID, "unknown") return { "value": value, "last_update": self.hass.config.time_utc(), } except Exception as ex: _LOGGER.error("Error reading Linky tariff: %s", ex) return {"value": "error", "last_update": self.hass.config.time_utc()}