#!/usr/bin/python3 from flask import Flask, request from flask_restx import Api, Resource, fields from functools import wraps import json import logging import yaml import os import sys import datetime from datetime import timezone from datetime import datetime import time import requests import subprocess from threading import Thread from threading import Lock from threading import Event import signal import sqlite3 # This code has been written for # python3 3.11.2-1+b1 # python3-flask 2.2.2-3 all micro web framework based on Werkzeug and Jinja2 - Python 3.x # flask-restx 1.3.0 # Flask-RESTX documentation: https://flask-restx.readthedocs.io/en/latest/ xprint_lock = Lock() logging.basicConfig(level=logging.WARNING) def xprint(*args, **kwargs): """Thread safe print function""" with xprint_lock: print(*args, **kwargs) sys.stdout.flush() authorizations = { 'apikey': { 'type': 'apiKey', 'in': 'header', 'name': 'X-API-KEY' } } with open('./conf.yml') as conf: yaml_conf = yaml.safe_load(conf) flask_settings = yaml_conf.get("flask_settings") api_key = yaml_conf.get("api_key") if os.environ['FLASK_ENV'] == 'development': flask_settings_env = yaml_conf.get("flask_settings_dev") logging.getLogger().setLevel(logging.DEBUG) elif os.environ['FLASK_ENV'] == 'production': flask_settings_env = yaml_conf.get("flask_settings_prod") logging.getLogger().setLevel(logging.INFO) else: logging.error("FLASK_ENV must be set to development or production.") sys.exit(1) targets = yaml_conf.get("targets") modes = yaml_conf.get("modes") http_port = yaml_conf.get("http_port") shedding_order = yaml_conf.get("shedding_order") rooms_settings = yaml_conf.get("rooms_settings") default_target = yaml_conf.get("default_target") relays_load = yaml_conf.get("relays_load") awake_hour = yaml_conf.get("awake_hour") sleep_hour = yaml_conf.get("sleep_hour") forced_mode_duration = yaml_conf.get("forced_mode_duration") load_shedder_interval = yaml_conf.get("load_shedder_interval") relay_control_interval = yaml_conf.get("relay_control_interval") hysteresis = yaml_conf.get("hysteresis") max_load = yaml_conf.get("max_load") load_margin = yaml_conf.get("load_margin") def auth_required(func): func = api.doc(security='apikey')(func) @wraps(func) def check_auth(*args, **kwargs): if 'X-API-KEY' not in request.headers: api.abort(401, 'API key required') key = request.headers['X-API-KEY'] # Check key validity if key != api_key: api.abort(401, 'Wrong API key') return func(*args, **kwargs) return check_auth ## def now(): return "["+datetime.now().strftime("%c")+"]" def enabled_rooms(): rooms_list=[] for room in rooms_settings: if rooms_settings[room]["enabled"]: rooms_list.append(room) return rooms_list def status_as_text(): return 'target: '+target_name+'\n'\ +'forced_mode: '+str(forced_mode)+'\n'\ +'\n'.join(['Target temperature for '+room+': '+str(rooms_settings[room][target_name])+'\n'+'Current temperature for '+room+': '+str(get_metric(rooms_settings[room]["metric"], current_time, relay_control_interval)) for room in enabled_rooms()]) def status_as_dict(): return {"target": target_name,\ "forced_mode": str(forced_mode),\ "enabled_rooms": {room: {'Target temperature': str(rooms_settings[room][target_name]), 'Current temperature': str(get_metric(rooms_settings[room]["metric"], current_time, relay_control_interval))} for room in enabled_rooms()}} def relay_state(relay): try: returned_output = subprocess.check_output(["./relay.py", relay, "status"]) return returned_output.strip().decode("utf-8") except Exception as e: logging.error(e) logging.error("relay "+relay+" status: Failed to command relays board.") sys.stdout.flush() return "Failed" def set_relay(relay, state): try: returned_output = subprocess.check_output(["./relay.py", relay, state]) logging.info("set relay "+relay+" to "+state+", new global status: "+returned_output.decode("utf-8").split('\n')[1]) sys.stdout.flush() return "OK" except Exception as e: logging.error(e) logging.error("set relay "+relay+" to "+state+": Failed to command relays board.") sys.stdout.flush() return "KO" def get_metric(metric, current_time, interval): url = "http://localhost:3000/"+metric try: r = requests.get(url) data = json.loads(r.text) timestamp = datetime.fromisoformat(data['timestamp']).replace(tzinfo=timezone.utc).timestamp() if current_time - timestamp < interval * 2: return data['value'] else: logging.warning("WARNING: No recent load data available.") except Exception as e: logging.error(e) sys.stdout.flush() return None def get_forced_mode(cursor): cursor.execute("SELECT value, timestamp FROM set_mode WHERE name='mode'") row = cursor.fetchone() if row is None: return None data = dict(zip(['value', 'timestamp'], row)) timestamp = datetime.fromisoformat(data['timestamp']).replace(tzinfo=timezone.utc).timestamp() # We ignore old targets but never ignore absence modes if data['value'] in targets and time.time() - timestamp > forced_mode_duration: logging.debug("Ignoring old set mode.") return None else: return data['value'] ## stop = Event() def handler(signum, frame): global stop logging.info("Got interrupt: "+str(signum)) stop.set() logging.info("Shutdown") signal.signal(signal.SIGTERM,handler) signal.signal(signal.SIGINT,handler) app = Flask(__name__) app.config.from_mapping(flask_settings) app.config.from_mapping(flask_settings_env) api = Api(app, version='1.0', title='Thermostat and load shedder', description='API to read and set thermostat.', authorizations=authorizations) ns_thermostat = api.namespace('thermostat/', description='Thermostat API') # if the database file does not exist it will be created automatically dbconn = sqlite3.connect("./instance/thermostat.db") cursor = dbconn.cursor() # we will only use name="mode" for set_mode table # only modes that are set manually will be recorded in the database cursor.execute("CREATE TABLE IF NOT EXISTS set_mode (name TEXT PRIMARY KEY DEFAULT 'mode' NOT NULL, \ value TEXT NOT NULL, \ timestamp DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL)") Set_mode_parser = api.parser() Set_mode_parser.add_argument( "value", type=str, choices=targets+modes, required=True, help="Thermostat mode", location="json" ) Set_mode_resource_fields = { 'value': fields.String(required=True, description='Thermostat mode', enum=targets+modes), } Set_mode_model = api.model('Set_mode_Model', Set_mode_resource_fields) #TODO: Add this to swagger documentation #'Temporary forced modes (will stay in effect for '+str(int(forced_mode_duration))+' seconds):\n'\ #+'\n'.join(['# /'+target+'\n' for target in targets])\ #+'\n'\ #+'Permanent forced modes:\n'\ #+'\n'.join(['# /'+mode+'\n' for mode in modes]) @ns_thermostat.route('/set_mode') class Set_mode_thermostat(Resource): @auth_required @api.expect(Set_mode_model, validate=True) def post(self): global new_forced_mode try: args = Set_mode_parser.parse_args() new_forced_mode = args["value"] except Exception as e: logging.error(e) return "K0", 400 return "OK", 201 @ns_thermostat.route('/status') class Status_thermostat(Resource): @auth_required def get(self): result = status_as_dict() return result Set_verbosity_parser = api.parser() Set_verbosity_parser.add_argument( "value", type=str, choices=["DEBUG", "INFO", "WARNING"], required=True, help="Verbosity", location="json" ) Set_verbosity_resource_fields = { 'value': fields.String(required=True, description='Verbosity', enum=["DEBUG", "INFO", "WARNING"]) } Set_verbosity_model = api.model('Set_verbosity_Model', Set_verbosity_resource_fields) @ns_thermostat.route('/set_verbosity') class Set_verbosity_thermostat(Resource): @auth_required @api.expect(Set_verbosity_model, validate=True) def put(self): try: args = Set_verbosity_parser.parse_args() if args["value"] == 'DEBUG': logging.getLogger().setLevel(logging.DEBUG) elif args["value"] == 'INFO': logging.getLogger().setLevel(logging.INFO) elif args["value"] == 'WARNING': logging.getLogger().setLevel(logging.WARNING) else: return "Bad request", 400 return "OK", 201 except Exception as e: logging.error(e) return "K0", 400 api.add_namespace(ns_thermostat) # TODO: Get Linky overload warning #cursor.execute("SELECT * FROM set_mode") #rows = cursor.fetchall() #for row in rows: # xprint(row) #sys.stdout.flush() target_name = default_target forced_mode = None current_time = time.time() new_forced_mode = None def thermostat_loop(): start_time = time.time() last_control_time = None first_loop = True global target_name global forced_mode global new_forced_mode global current_time dbconn = sqlite3.connect("./instance/thermostat.db") cursor = dbconn.cursor() logging.info("====== Starting ======") while True: if stop.is_set(): dbconn.close() break if new_forced_mode is not None: cursor.execute("INSERT OR REPLACE INTO set_mode (value) VALUES ('"+new_forced_mode+"')") dbconn.commit() logging.info("Switch to "+new_forced_mode) target_name = new_forced_mode new_forced_mode = None # Force immediate action: last_control_time = None current_time = time.time() current_date = datetime.now() today_awake_time = current_date.replace(hour=int(awake_hour.split(':')[0]), minute=int(awake_hour.split(':')[1]), second=0, microsecond=0) today_sleep_time = current_date.replace(hour=int(sleep_hour.split(':')[0]), minute=int(sleep_hour.split(':')[1]), second=0, microsecond=0) forced_mode = get_forced_mode(cursor) if forced_mode is not None and forced_mode in targets: if target_name != forced_mode: target_name = forced_mode logging.info("Switch to "+forced_mode) else: if forced_mode == "long_absence": if target_name != "target_frost_protection" or first_loop: target_name = "target_frost_protection" logging.info("Switch to "+target_name) elif forced_mode == "short_absence" or first_loop: if target_name != "target_sleep_temperature": target_name = "target_sleep_temperature" logging.info("Switch to "+target_name) elif current_date > today_awake_time and current_date < today_sleep_time: if target_name != "target_unconfirmed_awake_temperature" and target_name != "target_awake_temperature": target_name = "target_unconfirmed_awake_temperature" logging.info("Switch to unconfirmed awake mode.") elif current_date < today_awake_time or current_date > today_sleep_time: if target_name != "target_unconfirmed_sleep_temperature" and target_name != "target_sleep_temperature": target_name = "target_unconfirmed_sleep_temperature" logging.info("Switch to unconfirmed sleep mode.") first_loop = False # Load shedder current_load = get_metric("Modane_elec_main_power", current_time, load_shedder_interval) if current_load is None: time.sleep(load_shedder_interval) continue elif max_load - current_load < load_margin: logging.warning("Load too high: "+str(current_load)+"VA") total_shedded = 0 for room in shedding_order: current_state = relay_state(rooms_settings[room]["relays"]) if current_state != "Failed": logging.debug("Got relay_state: '"+current_state+"'") if current_state == "1": result = set_relay(rooms_settings[room]["relays"], "off") if result == "OK": total_shedded += relays_load[rooms_settings[room]["relays"]] if max_load - current_load - total_shedded < load_margin: logging.info("Load should be back to normal.") break # Thermostat if last_control_time is None or current_time - last_control_time > relay_control_interval: last_control_time = current_time for room in rooms_settings: if not rooms_settings[room]["enabled"]: continue target = rooms_settings[room][target_name] logging.debug("Target: "+str(target)) temperature = get_metric(rooms_settings[room]["metric"], current_time, relay_control_interval) if temperature is None: continue logging.debug(room+": "+str(temperature)) current_state = relay_state(rooms_settings[room]["relays"]) if current_state != "Failed": logging.debug("Got relay_state: '"+current_state+"'") if temperature < target - hysteresis: if current_state == "0": logging.info(room+": Target temperature is "+str(target)) logging.info(room+": Current temperature is "+str(temperature)) if current_load + relays_load[rooms_settings[room]["relays"]] + load_margin > max_load: logging.warning(room+": Load too high cannot start heaters.") else: logging.info(room+": Starting heaters.") # set_relay(rooms_settings[room]["relays"], "on") sys.stdout.flush() else: logging.debug("Relay already on.") elif temperature > target + hysteresis: if current_state == "1": logging.info(room+": Target temperature is "+str(target)) logging.info(room+": Current temperature is "+str(temperature)) logging.info(room+": Stopping heaters.") sys.stdout.flush() # set_relay(rooms_settings[room]["relays"], "off") else: logging.debug("Relay already off.") time.sleep(load_shedder_interval) logging.info("====== Ended successfully ======") if __name__ == '__main__': #t1 = Thread(target=thermostat_loop) #t1.daemon = True #t1.start() app.run(host='0.0.0.0')