docker-thermostat/thermostat.py
2024-06-07 00:50:05 +02:00

391 lines
14 KiB
Python
Executable File

from flask import Flask, request
from flask_restx import Api, Resource, fields
from functools import wraps
import json
import logging
import yaml
import os
import sys
import datetime
from datetime import timezone
from datetime import datetime
import time
import requests
import subprocess
from threading import Thread
from threading import Lock
from threading import Event
import signal
import sqlite3
# This code has been written for
# python3 3.11.2-1+b1
# python3-flask 2.2.2-3 all micro web framework based on Werkzeug and Jinja2 - Python 3.x
# flask-restx 1.3.0
# Flask-RESTX documentation: https://flask-restx.readthedocs.io/en/latest/
xprint_lock = Lock()
logging.basicConfig(level=logging.WARNING)
def xprint(*args, **kwargs):
"""Thread safe print function"""
with xprint_lock:
print(*args, **kwargs)
sys.stdout.flush()
authorizations = {
'apikey': {
'type': 'apiKey',
'in': 'header',
'name': 'X-API-KEY'
}
}
with open('./conf.yml') as conf:
yaml_conf = yaml.safe_load(conf)
flask_settings = yaml_conf.get("flask_settings")
api_key = yaml_conf.get("api_key")
if os.environ['FLASK_ENV'] == 'development':
flask_settings_env = yaml_conf.get("flask_settings_dev")
logging.getLogger().setLevel(logging.DEBUG)
elif os.environ['FLASK_ENV'] == 'production':
flask_settings_env = yaml_conf.get("flask_settings_prod")
logging.getLogger().setLevel(logging.INFO)
else:
logging.error("FLASK_ENV must be set to development or production.")
sys.exit(1)
targets = yaml_conf.get("targets")
modes = yaml_conf.get("modes")
http_port = yaml_conf.get("http_port")
shedding_order = yaml_conf.get("shedding_order")
rooms_settings = yaml_conf.get("rooms_settings")
default_target = yaml_conf.get("default_target")
relays_load = yaml_conf.get("relays_load")
awake_hour = yaml_conf.get("awake_hour")
sleep_hour = yaml_conf.get("sleep_hour")
forced_mode_duration = yaml_conf.get("forced_mode_duration")
load_shedder_interval = yaml_conf.get("load_shedder_interval")
relay_control_interval = yaml_conf.get("relay_control_interval")
hysteresis = yaml_conf.get("hysteresis")
max_load = yaml_conf.get("max_load")
load_margin = yaml_conf.get("load_margin")
def auth_required(func):
func = api.doc(security='apikey')(func)
@wraps(func)
def check_auth(*args, **kwargs):
if 'X-API-KEY' not in request.headers:
api.abort(401, 'API key required')
key = request.headers['X-API-KEY']
# Check key validity
if key != api_key:
api.abort(401, 'Wrong API key')
return func(*args, **kwargs)
return check_auth
##
def now():
return "["+datetime.now().strftime("%c")+"]"
def enabled_rooms():
rooms_list=[]
for room in rooms_settings:
if rooms_settings[room]["enabled"]:
rooms_list.append(room)
return rooms_list
def status_as_text():
return 'target: '+target_name+'\n'\
+'forced_mode: '+str(forced_mode)+'\n'\
+'\n'.join(['Target temperature for '+room+': '+str(rooms_settings[room][target_name])+'\n'+'Current temperature for '+room+': '+str(get_metric(rooms_settings[room]["metric"], current_time, relay_control_interval)) for room in enabled_rooms()])
def status_as_dict():
return {"target": target_name,\
"forced_mode": str(forced_mode),\
"enabled_rooms": {room: {'Target temperature': str(rooms_settings[room][target_name]), 'Current temperature': str(get_metric(rooms_settings[room]["metric"], current_time, relay_control_interval))} for room in enabled_rooms()}}
def relay_state(relay):
try:
returned_output = subprocess.check_output(["./relay.py", relay, "status"])
return returned_output.strip().decode("utf-8")
except Exception as e:
logging.error(e)
logging.error("relay "+relay+" status: Failed to command relays board.")
sys.stdout.flush()
return "Failed"
def set_relay(relay, state):
try:
returned_output = subprocess.check_output(["./relay.py", relay, state])
logging.info("set relay "+relay+" to "+state+", new global status: "+returned_output.decode("utf-8").split('\n')[1])
sys.stdout.flush()
return "OK"
except Exception as e:
logging.error(e)
logging.error("set relay "+relay+" to "+state+": Failed to command relays board.")
sys.stdout.flush()
return "KO"
def get_metric(metric, current_time, interval):
url = "http://localhost:3000/"+metric
try:
r = requests.get(url)
data = json.loads(r.text)
timestamp = datetime.fromisoformat(data['timestamp']).replace(tzinfo=timezone.utc).timestamp()
if current_time - timestamp < interval * 2:
return data['value']
else:
logging.warning("WARNING: No recent load data available.")
except Exception as e:
logging.error(e)
sys.stdout.flush()
return None
def get_forced_mode(cursor):
cursor.execute("SELECT value, timestamp FROM set_mode WHERE name='mode'")
row = cursor.fetchone()
if row is None:
return None
data = dict(zip(['value', 'timestamp'], row))
timestamp = datetime.fromisoformat(data['timestamp']).replace(tzinfo=timezone.utc).timestamp()
# We ignore old targets but never ignore absence modes
if data['value'] in targets and time.time() - timestamp > forced_mode_duration:
logging.debug("Ignoring old set mode.")
return None
else:
return data['value']
##
stop = Event()
def handler(signum, frame):
global stop
logging.info("Got interrupt: "+str(signum))
stop.set()
logging.info("Shutdown")
signal.signal(signal.SIGTERM,handler)
signal.signal(signal.SIGINT,handler)
app = Flask(__name__)
app.config.from_mapping(flask_settings)
app.config.from_mapping(flask_settings_env)
api = Api(app, version='1.0', title='Thermostat and load shedder',
description='API to read and set thermostat.', authorizations=authorizations)
ns_thermostat = api.namespace('thermostat/', description='Thermostat API')
# if the database file does not exist it will be created automatically
dbconn = sqlite3.connect("./instance/thermostat.db")
cursor = dbconn.cursor()
# we will only use name="mode" for set_mode table
# only modes that are set manually will be recorded in the database
cursor.execute("CREATE TABLE IF NOT EXISTS set_mode (name TEXT PRIMARY KEY DEFAULT 'mode' NOT NULL, \
value TEXT NOT NULL, \
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL)")
Set_mode_parser = api.parser()
Set_mode_parser.add_argument(
"value", type=str, choices=targets+modes, required=True, help="Thermostat mode", location="json"
)
Set_mode_resource_fields = {
'value': fields.String(required=True, description='Thermostat mode', enum=targets+modes),
}
Set_mode_model = api.model('Set_mode_Model', Set_mode_resource_fields)
#TODO: Add this to swagger documentation
#'Temporary forced modes (will stay in effect for '+str(int(forced_mode_duration))+' seconds):\n'\
#+'\n'.join(['# /'+target+'\n' for target in targets])\
#+'\n'\
#+'Permanent forced modes:\n'\
#+'\n'.join(['# /'+mode+'\n' for mode in modes])
@ns_thermostat.route('/set_mode')
class Set_mode_thermostat(Resource):
@auth_required
@api.expect(Set_mode_model, validate=True)
def post(self):
global new_forced_mode
try:
args = Set_mode_parser.parse_args()
new_forced_mode = args["value"]
except Exception as e:
logging.error(e)
return "K0", 400
return "OK", 201
@ns_thermostat.route('/status')
class Status_thermostat(Resource):
@auth_required
def get(self):
result = status_as_dict()
return result
Set_verbosity_parser = api.parser()
Set_verbosity_parser.add_argument(
"value", type=str, choices=["DEBUG", "INFO", "WARNING"], required=True, help="Verbosity", location="json"
)
Set_verbosity_resource_fields = {
'value': fields.String(required=True, description='Verbosity', enum=["DEBUG", "INFO", "WARNING"])
}
Set_verbosity_model = api.model('Set_verbosity_Model', Set_verbosity_resource_fields)
@ns_thermostat.route('/set_verbosity')
class Set_verbosity_thermostat(Resource):
@auth_required
@api.expect(Set_verbosity_model, validate=True)
def put(self):
try:
args = Set_verbosity_parser.parse_args()
if args["value"] == 'DEBUG':
logging.getLogger().setLevel(logging.DEBUG)
elif args["value"] == 'INFO':
logging.getLogger().setLevel(logging.INFO)
elif args["value"] == 'WARNING':
logging.getLogger().setLevel(logging.WARNING)
else:
return "Bad request", 400
return "OK", 201
except Exception as e:
logging.error(e)
return "K0", 400
api.add_namespace(ns_thermostat)
# TODO: Get Linky overload warning
#cursor.execute("SELECT * FROM set_mode")
#rows = cursor.fetchall()
#for row in rows:
# xprint(row)
#sys.stdout.flush()
target_name = default_target
forced_mode = None
current_time = time.time()
new_forced_mode = None
def thermostat_loop():
start_time = time.time()
last_control_time = None
first_loop = True
global target_name
global forced_mode
global new_forced_mode
global current_time
dbconn = sqlite3.connect("./instance/thermostat.db")
cursor = dbconn.cursor()
logging.info("====== Starting ======")
while True:
if stop.is_set():
dbconn.close()
break
if new_forced_mode is not None:
cursor.execute("INSERT OR REPLACE INTO set_mode (value) VALUES ('"+new_forced_mode+"')")
dbconn.commit()
logging.info("Switch to "+new_forced_mode)
target_name = new_forced_mode
new_forced_mode = None
# Force immediate action:
last_control_time = None
current_time = time.time()
current_date = datetime.now()
today_awake_time = current_date.replace(hour=int(awake_hour.split(':')[0]), minute=int(awake_hour.split(':')[1]), second=0, microsecond=0)
today_sleep_time = current_date.replace(hour=int(sleep_hour.split(':')[0]), minute=int(sleep_hour.split(':')[1]), second=0, microsecond=0)
forced_mode = get_forced_mode(cursor)
if forced_mode is not None and forced_mode in targets:
if target_name != forced_mode:
target_name = forced_mode
logging.info("Switch to "+forced_mode)
else:
if forced_mode == "long_absence":
if target_name != "target_frost_protection" or first_loop:
target_name = "target_frost_protection"
logging.info("Switch to "+target_name)
elif forced_mode == "short_absence" or first_loop:
if target_name != "target_sleep_temperature":
target_name = "target_sleep_temperature"
logging.info("Switch to "+target_name)
elif current_date > today_awake_time and current_date < today_sleep_time:
if target_name != "target_unconfirmed_awake_temperature" and target_name != "target_awake_temperature":
target_name = "target_unconfirmed_awake_temperature"
logging.info("Switch to unconfirmed awake mode.")
elif current_date < today_awake_time or current_date > today_sleep_time:
if target_name != "target_unconfirmed_sleep_temperature" and target_name != "target_sleep_temperature":
target_name = "target_unconfirmed_sleep_temperature"
logging.info("Switch to unconfirmed sleep mode.")
first_loop = False
# Load shedder
current_load = get_metric("Modane_elec_main_power", current_time, load_shedder_interval)
if current_load is None:
time.sleep(load_shedder_interval)
continue
elif max_load - current_load < load_margin:
logging.warning("Load too high: "+str(current_load)+"VA")
total_shedded = 0
for room in shedding_order:
current_state = relay_state(rooms_settings[room]["relays"])
if current_state != "Failed":
logging.debug("Got relay_state: '"+current_state+"'")
if current_state == "1":
result = set_relay(rooms_settings[room]["relays"], "off")
if result == "OK":
total_shedded += relays_load[rooms_settings[room]["relays"]]
if max_load - current_load - total_shedded < load_margin:
logging.info("Load should be back to normal.")
break
# Thermostat
if last_control_time is None or current_time - last_control_time > relay_control_interval:
last_control_time = current_time
for room in rooms_settings:
if not rooms_settings[room]["enabled"]:
continue
target = rooms_settings[room][target_name]
logging.debug("Target: "+str(target))
temperature = get_metric(rooms_settings[room]["metric"], current_time, relay_control_interval)
if temperature is None:
continue
logging.debug(room+": "+str(temperature))
current_state = relay_state(rooms_settings[room]["relays"])
if current_state != "Failed":
logging.debug("Got relay_state: '"+current_state+"'")
if temperature < target - hysteresis:
if current_state == "0":
logging.info(room+": Target temperature is "+str(target))
logging.info(room+": Current temperature is "+str(temperature))
if current_load + relays_load[rooms_settings[room]["relays"]] + load_margin > max_load:
logging.warning(room+": Load too high cannot start heaters.")
else:
logging.info(room+": Starting heaters.")
# set_relay(rooms_settings[room]["relays"], "on")
sys.stdout.flush()
else:
logging.debug("Relay already on.")
elif temperature > target + hysteresis:
if current_state == "1":
logging.info(room+": Target temperature is "+str(target))
logging.info(room+": Current temperature is "+str(temperature))
logging.info(room+": Stopping heaters.")
sys.stdout.flush()
# set_relay(rooms_settings[room]["relays"], "off")
else:
logging.debug("Relay already off.")
time.sleep(load_shedder_interval)
logging.info("====== Ended successfully ======")
if __name__ == '__main__':
#t1 = Thread(target=thermostat_loop)
#t1.daemon = True
#t1.start()
app.run(host='0.0.0.0', port=3002)