From 8e8f832e67df4079e819caf398533503bf17e092 Mon Sep 17 00:00:00 2001 From: Tediore Date: Tue, 15 Feb 2022 21:12:15 -0600 Subject: [PATCH] code refactor --- .gitignore | 2 - battery2mqtt.py | 215 ++++++++++++++++++++++++------------------------ 2 files changed, 109 insertions(+), 108 deletions(-) delete mode 100644 .gitignore diff --git a/.gitignore b/.gitignore deleted file mode 100644 index f4a5388..0000000 --- a/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -battery2mqttv2.py -battery2mqttAC.py diff --git a/battery2mqtt.py b/battery2mqtt.py index 27cd217..d45013f 100644 --- a/battery2mqtt.py +++ b/battery2mqtt.py @@ -3,13 +3,13 @@ import sys import json from time import sleep import paho.mqtt.client as mqtt +from threading import Thread as t import logging MQTT_HOST = os.getenv('MQTT_HOST') MQTT_PORT = int(os.getenv('MQTT_PORT', 1883)) MQTT_USER = os.getenv('MQTT_USER') MQTT_PASSWORD = os.getenv('MQTT_PASSWORD') -MQTT_CLIENT = os.getenv('MQTT_CLIENT', 'battery2mqtt') MQTT_QOS = int(os.getenv('MQTT_QOS', 1)) MQTT_TOPIC = os.getenv('MQTT_TOPIC', 'server') INTERVAL = int(os.getenv('INTERVAL', 60)) @@ -20,126 +20,129 @@ TIME_REMAINING = int(os.getenv('TIME_REMAINING', 1)) AC_ADAPTER = int(os.getenv('AC_ADAPTER', 0)) LOG_LEVEL = os.getenv('LOG_LEVEL', 'INFO').upper() -if LOG_LEVEL.lower() not in ['debug', 'info', 'warning', 'error']: - logging.basicConfig(level='INFO', format='%(asctime)s %(levelname)s: %(message)s') - logging.warning(f'Selected log level "{LOG_LEVEL}" is not valid; using default') -else: - logging.basicConfig(level=LOG_LEVEL, format='%(asctime)s %(levelname)s: %(message)s') +class Battery: + def __init__(self): + self.monitored_conditions = MONITORED_CONDITIONS.split(',') + self.payload = {} + self.health_calc = {} + self.time_remaining = {} -client = mqtt.Client(MQTT_CLIENT) + def check_conditions(self): + # Check that the conditions the user requested are present on the system + for dir in dirs: + if not dir.startswith('AC'): + for name in self.monitored_conditions: + try: + with open(path + dir + '/' + name, 'r') as file: + file.read() + if LOG_LEVEL == 'DEBUG': + logging.debug(f'Condition "{name}" found.') + except: + logging.warning(f'Condition "{name}" not found.') -monitored_conditions = MONITORED_CONDITIONS.split(',') -path = "/sys/class/power_supply/" -dirs = os.listdir(path) + def get_info(self): + # Get requested conditions and generate/send MQTT payload + while True: + for dir in dirs: + if AC_ADAPTER: + if dir.startswith('AC'): + try: + with open(path + dir + '/online', 'r') as file: + self.payload['ac_adapter'] = 'online' if int(file.read()) else 'offline' + except: + pass -payload = {} -health_calc = {} -time_remaining = {} -mqtt_connected = False + for name in self.monitored_conditions: + try: + with open(path + dir + '/' + name, 'r') as file: + if name in ['alarm', 'capacity', 'cycle_count', 'online', 'present']: + self.payload[name] = int(file.read().replace('\n','')) + elif name.startswith('voltage') or name.startswith('energy') or name.startswith('power'): + if name.startswith('voltage'): + unit = ' V' if SHOW_UNITS else '' + elif name.startswith('energy'): + unit = ' Wh' if SHOW_UNITS else '' + else: + unit = ' W' if SHOW_UNITS else '' + + if SHOW_UNITS: + self.payload[name] = str(round(float(file.read().replace('\n','')) / 1000000,2)) + unit + else: + self.payload[name] = round(float(file.read().replace('\n','')) / 1000000,2) + else: + self.payload[name] = file.read().replace('\n','') + except: + pass + + if BATTERY_HEALTH: + unit = ' %' if SHOW_UNITS else '' + try: + for name in ['energy_full_design', 'energy_full']: + with open(path + dir + '/' + name, 'r') as file: + self.health_calc[name] = int(file.read().replace('\n','')) + if SHOW_UNITS: + self.payload['battery_health'] = str(round((self.health_calc['energy_full'] / self.health_calc['energy_full_design']) * 100,2)) + unit + else: + self.payload['battery_health'] = round((self.health_calc['energy_full'] / self.health_calc['energy_full_design']) * 100,2) + except: + pass + + if TIME_REMAINING: + unit = ' h' if SHOW_UNITS else '' + try: + for name in ['energy_now', 'power_now']: + with open(path + dir + '/' + name, 'r') as file: + self.time_remaining[name] = int(file.read().replace('\n','')) + if SHOW_UNITS: + self.payload['time_remaining'] = str(round((self.time_remaining['energy_now'] / self.time_remaining['power_now']),2) if round((self.time_remaining['energy_now'] / self.time_remaining['power_now']),2) < 24 else '> 24') + unit + else: + self.payload['time_remaining'] = round((self.time_remaining['energy_now'] / self.time_remaining['power_now']),2) if round((self.time_remaining['energy_now'] / self.time_remaining['power_now']),2) < 24 else '> 24' + except: + pass + + try: + if not dir.startswith('AC'): + client.publish("battery2mqtt/" + MQTT_TOPIC + '/' + dir, json.dumps(self.payload), MQTT_QOS, False) + if LOG_LEVEL == 'DEBUG': + logging.debug('Sending MQTT payload: ' + str(self.payload)) + except Exception as e: + logging.error(f'Message send failed: {e}') + try: + client.publish("battery2mqtt/" + MQTT_TOPIC + '/status', 'online', 0, True) + except Exception as e: + logging.error(f'Message send failed: {e}') + sleep(INTERVAL) def mqtt_connect(): - # Connect to MQTT broker, set LWT, and start loop - global mqtt_connected + # Connect to MQTT broker and set LWT try: client.username_pw_set(MQTT_USER, MQTT_PASSWORD) client.will_set("battery2mqtt/" + MQTT_TOPIC + '/status', 'offline', 0, True) client.connect(MQTT_HOST, MQTT_PORT) - client.loop_start() client.publish("battery2mqtt/" + MQTT_TOPIC + '/status', 'online', 0, True) logging.info('Connected to MQTT broker.') - mqtt_connected = True except Exception as e: logging.error(f'Unable to connect to MQTT broker: {e}') sys.exit() -def check_conditions(): - # Check that the conditions the user requested are present on the system - for dir in dirs: - if not dir.startswith('AC'): - for name in monitored_conditions: - try: - with open(path + dir + '/' + name, 'r') as file: - file.read() - if LOG_LEVEL == 'DEBUG': - logging.debug(f'Condition "{name}" found.') - except: - logging.warning(f'Condition "{name}" not found.') +if __name__ == '__main__': + if MQTT_HOST == None: + logging.error('Please specify the IP address or hostname of your MQTT broker.') + sys.exit() -def get_info(): - # Get requested conditions and generate/send MQTT payload - global payload - for dir in dirs: - if AC_ADAPTER: - if dir.startswith('AC'): - try: - with open(path + dir + '/online', 'r') as file: - payload['ac_adapter'] = 'online' if int(file.read()) else 'offline' - except: - pass + if LOG_LEVEL.lower() not in ['debug', 'info', 'warning', 'error']: + logging.basicConfig(level='INFO', format='%(asctime)s %(levelname)s: %(message)s') + logging.warning(f'Selected log level "{LOG_LEVEL}" is not valid; using default') + else: + logging.basicConfig(level=LOG_LEVEL, format='%(asctime)s %(levelname)s: %(message)s') - for name in monitored_conditions: - try: - with open(path + dir + '/' + name, 'r') as file: - if name in ['alarm', 'capacity', 'cycle_count', 'online', 'present']: - payload[name] = int(file.read().replace('\n','')) - elif name.startswith('voltage') or name.startswith('energy') or name.startswith('power'): - if name.startswith('voltage'): - unit = ' V' if SHOW_UNITS else '' - elif name.startswith('energy'): - unit = ' Wh' if SHOW_UNITS else '' - else: - unit = ' W' if SHOW_UNITS else '' + client = mqtt.Client(f'battery2mqtt_{MQTT_TOPIC}') + path = "/sys/class/power_supply/" + dirs = os.listdir(path) + b = Battery() - if SHOW_UNITS: - payload[name] = str(round(float(file.read().replace('\n','')) / 1000000,2)) + unit - else: - payload[name] = round(float(file.read().replace('\n','')) / 1000000,2) - else: - payload[name] = file.read().replace('\n','') - except: - pass - - if BATTERY_HEALTH: - unit = ' %' if SHOW_UNITS else '' - try: - for name in ['energy_full_design', 'energy_full']: - with open(path + dir + '/' + name, 'r') as file: - health_calc[name] = int(file.read().replace('\n','')) - if SHOW_UNITS: - payload['battery_health'] = str(round((health_calc['energy_full'] / health_calc['energy_full_design']) * 100,2)) + unit - else: - payload['battery_health'] = round((health_calc['energy_full'] / health_calc['energy_full_design']) * 100,2) - except: - pass - - if TIME_REMAINING: - unit = ' h' if SHOW_UNITS else '' - try: - for name in ['energy_now', 'power_now']: - with open(path + dir + '/' + name, 'r') as file: - time_remaining[name] = int(file.read().replace('\n','')) - if SHOW_UNITS: - payload['time_remaining'] = str(round((time_remaining['energy_now'] / time_remaining['power_now']),2) if round((time_remaining['energy_now'] / time_remaining['power_now']),2) < 24 else '> 24') + unit - else: - payload['time_remaining'] = round((time_remaining['energy_now'] / time_remaining['power_now']),2) if round((time_remaining['energy_now'] / time_remaining['power_now']),2) < 24 else '> 24' - except: - pass - - try: - if not dir.startswith('AC'): - client.publish("battery2mqtt/" + MQTT_TOPIC + '/' + dir, json.dumps(payload), MQTT_QOS, False) - if LOG_LEVEL == 'DEBUG': - logging.debug('Sending MQTT payload: ' + str(payload)) - except Exception as e: - logging.error(f'Message send failed: {e}') - try: - client.publish("battery2mqtt/" + MQTT_TOPIC + '/status', 'online', 0, True) - except Exception as e: - logging.error(f'Message send failed: {e}') - -check_conditions() -mqtt_connect() - -while mqtt_connected: - get_info() - sleep(INTERVAL) + b.check_conditions() + mqtt_connect() + polling_thread = t(target=b.get_info, daemon=True) + polling_thread.start() + client.loop_forever() \ No newline at end of file