mirror of
https://github.com/Tediore/battery2mqtt.git
synced 2025-04-03 23:13:44 +02:00
code refactor
This commit is contained in:
parent
5173160c2b
commit
8e8f832e67
2
.gitignore
vendored
2
.gitignore
vendored
@ -1,2 +0,0 @@
|
||||
battery2mqttv2.py
|
||||
battery2mqttAC.py
|
215
battery2mqtt.py
215
battery2mqtt.py
@ -3,13 +3,13 @@ import sys
|
||||
import json
|
||||
from time import sleep
|
||||
import paho.mqtt.client as mqtt
|
||||
from threading import Thread as t
|
||||
import logging
|
||||
|
||||
MQTT_HOST = os.getenv('MQTT_HOST')
|
||||
MQTT_PORT = int(os.getenv('MQTT_PORT', 1883))
|
||||
MQTT_USER = os.getenv('MQTT_USER')
|
||||
MQTT_PASSWORD = os.getenv('MQTT_PASSWORD')
|
||||
MQTT_CLIENT = os.getenv('MQTT_CLIENT', 'battery2mqtt')
|
||||
MQTT_QOS = int(os.getenv('MQTT_QOS', 1))
|
||||
MQTT_TOPIC = os.getenv('MQTT_TOPIC', 'server')
|
||||
INTERVAL = int(os.getenv('INTERVAL', 60))
|
||||
@ -20,126 +20,129 @@ TIME_REMAINING = int(os.getenv('TIME_REMAINING', 1))
|
||||
AC_ADAPTER = int(os.getenv('AC_ADAPTER', 0))
|
||||
LOG_LEVEL = os.getenv('LOG_LEVEL', 'INFO').upper()
|
||||
|
||||
if LOG_LEVEL.lower() not in ['debug', 'info', 'warning', 'error']:
|
||||
logging.basicConfig(level='INFO', format='%(asctime)s %(levelname)s: %(message)s')
|
||||
logging.warning(f'Selected log level "{LOG_LEVEL}" is not valid; using default')
|
||||
else:
|
||||
logging.basicConfig(level=LOG_LEVEL, format='%(asctime)s %(levelname)s: %(message)s')
|
||||
class Battery:
|
||||
def __init__(self):
|
||||
self.monitored_conditions = MONITORED_CONDITIONS.split(',')
|
||||
self.payload = {}
|
||||
self.health_calc = {}
|
||||
self.time_remaining = {}
|
||||
|
||||
client = mqtt.Client(MQTT_CLIENT)
|
||||
def check_conditions(self):
|
||||
# Check that the conditions the user requested are present on the system
|
||||
for dir in dirs:
|
||||
if not dir.startswith('AC'):
|
||||
for name in self.monitored_conditions:
|
||||
try:
|
||||
with open(path + dir + '/' + name, 'r') as file:
|
||||
file.read()
|
||||
if LOG_LEVEL == 'DEBUG':
|
||||
logging.debug(f'Condition "{name}" found.')
|
||||
except:
|
||||
logging.warning(f'Condition "{name}" not found.')
|
||||
|
||||
monitored_conditions = MONITORED_CONDITIONS.split(',')
|
||||
path = "/sys/class/power_supply/"
|
||||
dirs = os.listdir(path)
|
||||
def get_info(self):
|
||||
# Get requested conditions and generate/send MQTT payload
|
||||
while True:
|
||||
for dir in dirs:
|
||||
if AC_ADAPTER:
|
||||
if dir.startswith('AC'):
|
||||
try:
|
||||
with open(path + dir + '/online', 'r') as file:
|
||||
self.payload['ac_adapter'] = 'online' if int(file.read()) else 'offline'
|
||||
except:
|
||||
pass
|
||||
|
||||
payload = {}
|
||||
health_calc = {}
|
||||
time_remaining = {}
|
||||
mqtt_connected = False
|
||||
for name in self.monitored_conditions:
|
||||
try:
|
||||
with open(path + dir + '/' + name, 'r') as file:
|
||||
if name in ['alarm', 'capacity', 'cycle_count', 'online', 'present']:
|
||||
self.payload[name] = int(file.read().replace('\n',''))
|
||||
elif name.startswith('voltage') or name.startswith('energy') or name.startswith('power'):
|
||||
if name.startswith('voltage'):
|
||||
unit = ' V' if SHOW_UNITS else ''
|
||||
elif name.startswith('energy'):
|
||||
unit = ' Wh' if SHOW_UNITS else ''
|
||||
else:
|
||||
unit = ' W' if SHOW_UNITS else ''
|
||||
|
||||
if SHOW_UNITS:
|
||||
self.payload[name] = str(round(float(file.read().replace('\n','')) / 1000000,2)) + unit
|
||||
else:
|
||||
self.payload[name] = round(float(file.read().replace('\n','')) / 1000000,2)
|
||||
else:
|
||||
self.payload[name] = file.read().replace('\n','')
|
||||
except:
|
||||
pass
|
||||
|
||||
if BATTERY_HEALTH:
|
||||
unit = ' %' if SHOW_UNITS else ''
|
||||
try:
|
||||
for name in ['energy_full_design', 'energy_full']:
|
||||
with open(path + dir + '/' + name, 'r') as file:
|
||||
self.health_calc[name] = int(file.read().replace('\n',''))
|
||||
if SHOW_UNITS:
|
||||
self.payload['battery_health'] = str(round((self.health_calc['energy_full'] / self.health_calc['energy_full_design']) * 100,2)) + unit
|
||||
else:
|
||||
self.payload['battery_health'] = round((self.health_calc['energy_full'] / self.health_calc['energy_full_design']) * 100,2)
|
||||
except:
|
||||
pass
|
||||
|
||||
if TIME_REMAINING:
|
||||
unit = ' h' if SHOW_UNITS else ''
|
||||
try:
|
||||
for name in ['energy_now', 'power_now']:
|
||||
with open(path + dir + '/' + name, 'r') as file:
|
||||
self.time_remaining[name] = int(file.read().replace('\n',''))
|
||||
if SHOW_UNITS:
|
||||
self.payload['time_remaining'] = str(round((self.time_remaining['energy_now'] / self.time_remaining['power_now']),2) if round((self.time_remaining['energy_now'] / self.time_remaining['power_now']),2) < 24 else '> 24') + unit
|
||||
else:
|
||||
self.payload['time_remaining'] = round((self.time_remaining['energy_now'] / self.time_remaining['power_now']),2) if round((self.time_remaining['energy_now'] / self.time_remaining['power_now']),2) < 24 else '> 24'
|
||||
except:
|
||||
pass
|
||||
|
||||
try:
|
||||
if not dir.startswith('AC'):
|
||||
client.publish("battery2mqtt/" + MQTT_TOPIC + '/' + dir, json.dumps(self.payload), MQTT_QOS, False)
|
||||
if LOG_LEVEL == 'DEBUG':
|
||||
logging.debug('Sending MQTT payload: ' + str(self.payload))
|
||||
except Exception as e:
|
||||
logging.error(f'Message send failed: {e}')
|
||||
try:
|
||||
client.publish("battery2mqtt/" + MQTT_TOPIC + '/status', 'online', 0, True)
|
||||
except Exception as e:
|
||||
logging.error(f'Message send failed: {e}')
|
||||
sleep(INTERVAL)
|
||||
|
||||
def mqtt_connect():
|
||||
# Connect to MQTT broker, set LWT, and start loop
|
||||
global mqtt_connected
|
||||
# Connect to MQTT broker and set LWT
|
||||
try:
|
||||
client.username_pw_set(MQTT_USER, MQTT_PASSWORD)
|
||||
client.will_set("battery2mqtt/" + MQTT_TOPIC + '/status', 'offline', 0, True)
|
||||
client.connect(MQTT_HOST, MQTT_PORT)
|
||||
client.loop_start()
|
||||
client.publish("battery2mqtt/" + MQTT_TOPIC + '/status', 'online', 0, True)
|
||||
logging.info('Connected to MQTT broker.')
|
||||
mqtt_connected = True
|
||||
except Exception as e:
|
||||
logging.error(f'Unable to connect to MQTT broker: {e}')
|
||||
sys.exit()
|
||||
|
||||
def check_conditions():
|
||||
# Check that the conditions the user requested are present on the system
|
||||
for dir in dirs:
|
||||
if not dir.startswith('AC'):
|
||||
for name in monitored_conditions:
|
||||
try:
|
||||
with open(path + dir + '/' + name, 'r') as file:
|
||||
file.read()
|
||||
if LOG_LEVEL == 'DEBUG':
|
||||
logging.debug(f'Condition "{name}" found.')
|
||||
except:
|
||||
logging.warning(f'Condition "{name}" not found.')
|
||||
if __name__ == '__main__':
|
||||
if MQTT_HOST == None:
|
||||
logging.error('Please specify the IP address or hostname of your MQTT broker.')
|
||||
sys.exit()
|
||||
|
||||
def get_info():
|
||||
# Get requested conditions and generate/send MQTT payload
|
||||
global payload
|
||||
for dir in dirs:
|
||||
if AC_ADAPTER:
|
||||
if dir.startswith('AC'):
|
||||
try:
|
||||
with open(path + dir + '/online', 'r') as file:
|
||||
payload['ac_adapter'] = 'online' if int(file.read()) else 'offline'
|
||||
except:
|
||||
pass
|
||||
if LOG_LEVEL.lower() not in ['debug', 'info', 'warning', 'error']:
|
||||
logging.basicConfig(level='INFO', format='%(asctime)s %(levelname)s: %(message)s')
|
||||
logging.warning(f'Selected log level "{LOG_LEVEL}" is not valid; using default')
|
||||
else:
|
||||
logging.basicConfig(level=LOG_LEVEL, format='%(asctime)s %(levelname)s: %(message)s')
|
||||
|
||||
for name in monitored_conditions:
|
||||
try:
|
||||
with open(path + dir + '/' + name, 'r') as file:
|
||||
if name in ['alarm', 'capacity', 'cycle_count', 'online', 'present']:
|
||||
payload[name] = int(file.read().replace('\n',''))
|
||||
elif name.startswith('voltage') or name.startswith('energy') or name.startswith('power'):
|
||||
if name.startswith('voltage'):
|
||||
unit = ' V' if SHOW_UNITS else ''
|
||||
elif name.startswith('energy'):
|
||||
unit = ' Wh' if SHOW_UNITS else ''
|
||||
else:
|
||||
unit = ' W' if SHOW_UNITS else ''
|
||||
client = mqtt.Client(f'battery2mqtt_{MQTT_TOPIC}')
|
||||
path = "/sys/class/power_supply/"
|
||||
dirs = os.listdir(path)
|
||||
b = Battery()
|
||||
|
||||
if SHOW_UNITS:
|
||||
payload[name] = str(round(float(file.read().replace('\n','')) / 1000000,2)) + unit
|
||||
else:
|
||||
payload[name] = round(float(file.read().replace('\n','')) / 1000000,2)
|
||||
else:
|
||||
payload[name] = file.read().replace('\n','')
|
||||
except:
|
||||
pass
|
||||
|
||||
if BATTERY_HEALTH:
|
||||
unit = ' %' if SHOW_UNITS else ''
|
||||
try:
|
||||
for name in ['energy_full_design', 'energy_full']:
|
||||
with open(path + dir + '/' + name, 'r') as file:
|
||||
health_calc[name] = int(file.read().replace('\n',''))
|
||||
if SHOW_UNITS:
|
||||
payload['battery_health'] = str(round((health_calc['energy_full'] / health_calc['energy_full_design']) * 100,2)) + unit
|
||||
else:
|
||||
payload['battery_health'] = round((health_calc['energy_full'] / health_calc['energy_full_design']) * 100,2)
|
||||
except:
|
||||
pass
|
||||
|
||||
if TIME_REMAINING:
|
||||
unit = ' h' if SHOW_UNITS else ''
|
||||
try:
|
||||
for name in ['energy_now', 'power_now']:
|
||||
with open(path + dir + '/' + name, 'r') as file:
|
||||
time_remaining[name] = int(file.read().replace('\n',''))
|
||||
if SHOW_UNITS:
|
||||
payload['time_remaining'] = str(round((time_remaining['energy_now'] / time_remaining['power_now']),2) if round((time_remaining['energy_now'] / time_remaining['power_now']),2) < 24 else '> 24') + unit
|
||||
else:
|
||||
payload['time_remaining'] = round((time_remaining['energy_now'] / time_remaining['power_now']),2) if round((time_remaining['energy_now'] / time_remaining['power_now']),2) < 24 else '> 24'
|
||||
except:
|
||||
pass
|
||||
|
||||
try:
|
||||
if not dir.startswith('AC'):
|
||||
client.publish("battery2mqtt/" + MQTT_TOPIC + '/' + dir, json.dumps(payload), MQTT_QOS, False)
|
||||
if LOG_LEVEL == 'DEBUG':
|
||||
logging.debug('Sending MQTT payload: ' + str(payload))
|
||||
except Exception as e:
|
||||
logging.error(f'Message send failed: {e}')
|
||||
try:
|
||||
client.publish("battery2mqtt/" + MQTT_TOPIC + '/status', 'online', 0, True)
|
||||
except Exception as e:
|
||||
logging.error(f'Message send failed: {e}')
|
||||
|
||||
check_conditions()
|
||||
mqtt_connect()
|
||||
|
||||
while mqtt_connected:
|
||||
get_info()
|
||||
sleep(INTERVAL)
|
||||
b.check_conditions()
|
||||
mqtt_connect()
|
||||
polling_thread = t(target=b.get_info, daemon=True)
|
||||
polling_thread.start()
|
||||
client.loop_forever()
|
Loading…
x
Reference in New Issue
Block a user