Source code for pitop.battery.battery

import atexit
import logging

from pitop.common.ptdm import Message, PTDMRequestClient, PTDMSubscribeClient

logger = logging.getLogger(__name__)


[docs]class Battery: def __init__(self): # Battery capacity change events from ptdm self.when_low = None self.when_critical = None # Battery full capacity as charging state change event from ptdm self.when_full = None # Internal battery charging state change events self.when_charging = None self.when_discharging = None self.on_capacity_change = None ( self.__previous_charging_state, self.__previous_capacity, ) = [int(p) for p in Battery.get_full_state()[:2]] self.__ptdm_subscribe_client = None self.__setup_subscribe_client() atexit.register(self.__clean_up) def __setup_subscribe_client(self): def on_state_changed(parameters): charging_state, capacity = [int(p) for p in parameters[:2]] if self.__previous_capacity != capacity: if callable(self.on_capacity_change): self.on_capacity_change(capacity) self.__previous_capacity = capacity if charging_state not in range(0, 3): logger.warning("Invalid charging state from pi-top device manager") return if charging_state == self.__previous_charging_state: logger.debug("Charging state has not changed - doing nothing...") return self.__previous_charging_state = charging_state funcs_to_invoke = { 2: self.when_full, 1: self.when_charging, 0: self.when_discharging, } func = funcs_to_invoke[charging_state] if callable(func): func() self.__ptdm_subscribe_client = PTDMSubscribeClient() self.__ptdm_subscribe_client.initialise( { Message.PUB_LOW_BATTERY_WARNING: lambda: self.when_low, Message.PUB_CRITICAL_BATTERY_WARNING: lambda: self.when_critical, Message.PUB_BATTERY_STATE_CHANGED: on_state_changed, } ) self.__ptdm_subscribe_client.start_listening() def __clean_up(self): try: self.__ptdm_subscribe_client.stop_listening() except Exception: pass
[docs] @classmethod def get_full_state(cls): message = Message.from_parts(Message.REQ_GET_BATTERY_STATE, []) with PTDMRequestClient() as request_client: response = request_client.send_message(message) return response.parameters
@property def is_charging(self): __charging_state, _, _, _ = Battery.get_full_state() return __charging_state != "0" @property def is_full(self): __charging_state, _, _, _ = Battery.get_full_state() return __charging_state == "2" @property def capacity(self): _, __capacity, _, _ = Battery.get_full_state() return int(__capacity) @property def time_remaining(self): _, _, __time_remaining, _ = Battery.get_full_state() return int(__time_remaining) @property def wattage(self): _, _, _, __wattage = Battery.get_full_state() return int(__wattage)