from flask import Flask, request from flask_restx import Api, Resource, fields from functools import wraps from flask_sqlalchemy import SQLAlchemy from flask_migrate import Migrate import json import logging import yaml import os import sys import datetime from datetime import timezone from datetime import datetime import time import requests import subprocess from threading import Thread import sqlalchemy.exc # This code has been written for # python3 3.11.2-1+b1 # python3-flask 2.2.2-3 all micro web framework based on Werkzeug and Jinja2 - Python 3.x # python3-flask-migrate 4.0.4-1 all SQLAlchemy migrations for Flask using Alembic and Python 3 # python3-flask-sqlalchemy 3.0.3-1 all adds SQLAlchemy support to your Python 3 Flask application # alembic 1.8.1-2 all lightweight database migration tool for SQLAlchemy # python3-alembic 1.8.1-2 all lightweight database migration tool for SQLAlchemy - Python module # python3-sqlalchemy 1.4.46+ds1-1 all SQL toolkit and Object Relational Mapper for Python 3 # python3-sqlalchemy-ext:i386 1.4.46+ds1-1+b1 i386 SQL toolkit and Object Relational Mapper for Python3 - C extension # Flask-SQLAlchemy documentation: https://flask-sqlalchemy.palletsprojects.com/en/3.0.x/quickstart/ # We use SQLAlchemy ORM style 2.x: https://docs.sqlalchemy.org/en/20/tutorial/data_select.html logging.basicConfig(level=logging.WARNING) authorizations = { 'apikey': { 'type': 'apiKey', 'in': 'header', 'name': 'X-API-KEY' } } with open('./conf.yml') as conf: yaml_conf = yaml.safe_load(conf) flask_settings = yaml_conf.get("flask_settings") api_key = yaml_conf.get("api_key") if os.environ['FLASK_ENV'] == 'development': flask_settings_env = yaml_conf.get("flask_settings_dev") logging.getLogger().setLevel(logging.DEBUG) elif os.environ['FLASK_ENV'] == 'production': flask_settings_env = yaml_conf.get("flask_settings_prod") logging.getLogger().setLevel(logging.WARNING) else: logging.error("FLASK_ENV must be set to development or production.") sys.exit(1) targets = yaml_conf.get("targets") modes = yaml_conf.get("modes") http_port = yaml_conf.get("http_port") shedding_order = yaml_conf.get("shedding_order") rooms_settings = yaml_conf.get("rooms_settings") default_target = yaml_conf.get("default_target") relays_load = yaml_conf.get("relays_load") awake_hour = yaml_conf.get("awake_hour") sleep_hour = yaml_conf.get("sleep_hour") forced_mode_duration = yaml_conf.get("forced_mode_duration") load_shedder_interval = yaml_conf.get("load_shedder_interval") relay_control_interval = yaml_conf.get("relay_control_interval") hysteresis = yaml_conf.get("hysteresis") max_load = yaml_conf.get("max_load") load_margin = yaml_conf.get("load_margin") def auth_required(func): func = api.doc(security='apikey')(func) @wraps(func) def check_auth(*args, **kwargs): if 'X-API-KEY' not in request.headers: api.abort(401, 'API key required') key = request.headers['X-API-KEY'] # Check key validity if key != api_key: api.abort(401, 'Wrong API key') return func(*args, **kwargs) return check_auth ## def now(): return "["+datetime.now().strftime("%c")+"]" def enabled_rooms(): rooms_list=[] for room in rooms_settings: if rooms_settings[room]["enabled"]: rooms_list.append(room) return rooms_list def status_as_text(): return 'target: '+target_name+'\n'\ +'forced_mode: '+str(forced_mode)+'\n'\ +'\n'.join(['Target temperature for '+room+': '+str(rooms_settings[room][target_name])+'\n'+'Current temperature for '+room+': '+str(get_metric(rooms_settings[room]["metric"], current_time, relay_control_interval)) for room in enabled_rooms()]) def status_as_dict(): return {"target": target_name,\ "forced_mode": str(forced_mode),\ "enabled_rooms": {room: {'Target temperature': str(rooms_settings[room][target_name]), 'Current temperature': str(get_metric(rooms_settings[room]["metric"], current_time, relay_control_interval))} for room in enabled_rooms()}} def relay_state(relay): try: returned_output = subprocess.check_output(["./relay.py", relay, "status"]) return returned_output.strip().decode("utf-8") except Exception as e: logging.error(e) logging.error("relay "+relay+" status: Failed to command relays board.") sys.stdout.flush() return "Failed" def set_relay(relay, state): try: returned_output = subprocess.check_output(["./relay.py", relay, state]) logging.info("set relay "+relay+" to "+state+", new global status: "+returned_output.decode("utf-8").split('\n')[1]) sys.stdout.flush() return "OK" except Exception as e: logging.error(e) logging.error("set relay "+relay+" to "+state+": Failed to command relays board.") sys.stdout.flush() return "KO" def get_metric(metric, current_time, interval): url = "http://localhost:3000/"+metric try: r = requests.get(url) data = json.loads(r.text) timestamp = datetime.fromisoformat(data['timestamp']).replace(tzinfo=timezone.utc).timestamp() if current_time - timestamp < interval * 2: return data['value'] else: logging.warning("WARNING: No recent load data available.") except Exception as e: logging.error(e) sys.stdout.flush() return None def get_forced_mode(): with app.app_context(): row = db.session.execute(db.select(Set_mode.value, Set_mode.timestamp)).first() if row is None: return None #cur.execute("SELECT value, timestamp FROM set_mode WHERE name='mode'") #row = cur.fetchone() print(row) data = dict(zip(['value', 'timestamp'], row)) timestamp = datetime.fromisoformat(data['timestamp']).replace(tzinfo=timezone.utc).timestamp() # We ignore old targets but never ignore absence modes if data['value'] in targets and time.time() - timestamp > forced_mode_duration: logging.debug("Ignoring old set mode.") return None else: return data['value'] ## app = Flask(__name__) app.config.from_mapping(flask_settings) app.config.from_mapping(flask_settings_env) db = SQLAlchemy(app) api = Api(app, version='1.0', title='Thermostat', description='API to read and set thermostat.', authorizations=authorizations) ns_thermostat = api.namespace('thermostat/', description='Thermostat API') # we will only use name="mode" for set_mode table # only modes that are set manually will be recorded in the database class Set_mode(db.Model): __tablename__ = "set_mode" name = db.Column(db.String, primary_key=True, default='mode', server_default='mode', nullable=False) value = db.Column(db.String, nullable=False) timestamp = db.Column(db.DateTime, server_default=db.text("CURRENT_TIMESTAMP"), nullable=False) Set_mode_resource_fields = { 'name': fields.String(description='mode type'), 'value': fields.String(required=True, description='value'), 'time': fields.DateTime(dt_format='iso8601'), } Set_mode_model = api.model('Set_mode_Model', Set_mode_resource_fields) @ns_thermostat.route('/set_mode') class Set_mode_thermostat(Resource): @auth_required @api.expect(Set_mode_model, validate=True) def post(self): try: data = Set_mode(**request.json) except Exception as e: logging.error(e) return "K0", 400 try: db.session.add(data) db.session.commit() return "OK", 201 except sqlalchemy.exc.IntegrityError as e: logging.error("catched") return "K0", 400 except Exception as e: logging.error(e) return "K0", 400 @ns_thermostat.route('/status') class Status_thermostat(Resource): @auth_required def get(self): result = status_as_dict() logging.debug(result) return result api.add_namespace(ns_thermostat) with app.app_context(): db.create_all() migrate = Migrate(app, db, compare_type=True) # TODO: Get Linky overload warning #cursor.execute("SELECT * FROM set_mode") #rows = cursor.fetchall() #for row in rows: # print(row) #sys.stdout.flush() target_name = default_target forced_mode = None current_time = time.time() def thermostat_loop(): start_time = time.time() last_control_time = None new_forced_mode = None first_loop = True global target_name global forced_mode global current_time while True: # if stop.is_set(): # httpd.shutdown() # httpd.server_close() # dbconn.close() # break if new_forced_mode is not None: with app.app_context(): data = Set_mode({"value": new_forced_mode}) db.session.add(data) db.session.commit() #cursor.execute("INSERT OR REPLACE INTO set_mode (value) VALUES ('"+new_forced_mode+"')") #dbconn.commit() logging.info("Switch to "+new_forced_mode) target_name = new_forced_mode new_forced_mode = None # Force immediate action: last_control_time = None current_time = time.time() current_date = datetime.now() today_awake_time = current_date.replace(hour=int(awake_hour.split(':')[0]), minute=int(awake_hour.split(':')[1]), second=0, microsecond=0) today_sleep_time = current_date.replace(hour=int(sleep_hour.split(':')[0]), minute=int(sleep_hour.split(':')[1]), second=0, microsecond=0) forced_mode = get_forced_mode() if forced_mode is not None and forced_mode in targets: if target_name != forced_mode: target_name = forced_mode logging.info("Switch to "+forced_mode) else: if forced_mode == "long_absence": if target_name != "target_frost_protection" or first_loop: target_name = "target_frost_protection" logging.info("Switch to "+target_name) elif forced_mode == "short_absence" or first_loop: if target_name != "target_sleep_temperature": target_name = "target_sleep_temperature" logging.info("Switch to "+target_name) elif current_date > today_awake_time and current_date < today_sleep_time: if target_name != "target_unconfirmed_awake_temperature" and target_name != "target_awake_temperature": target_name = "target_unconfirmed_awake_temperature" logging.info("Switch to unconfirmed awake mode.") elif current_date < today_awake_time or current_date > today_sleep_time: if target_name != "target_unconfirmed_sleep_temperature" and target_name != "target_sleep_temperature": target_name = "target_unconfirmed_sleep_temperature" logging.info("Switch to unconfirmed sleep mode.") first_loop = False # Load shedder current_load = get_metric("Modane_elec_main_power", current_time, load_shedder_interval) if current_load is None: time.sleep(load_shedder_interval) continue elif max_load - current_load < load_margin: logging.warning("Load too high: "+str(current_load)+"VA") total_shedded = 0 for room in shedding_order: current_state = relay_state(rooms_settings[room]["relays"]) if current_state != "Failed": logging.debug("Got relay_state: '"+current_state+"'") if current_state == "1": result = set_relay(rooms_settings[room]["relays"], "off") if result == "OK": total_shedded += relays_load[rooms_settings[room]["relays"]] if max_load - current_load - total_shedded < load_margin: logging.info("Load should be back to normal.") break # Thermostat if last_control_time is None or current_time - last_control_time > relay_control_interval: last_control_time = current_time for room in rooms_settings: if not rooms_settings[room]["enabled"]: continue target = rooms_settings[room][target_name] logging.debug("Target: "+str(target)) temperature = get_metric(rooms_settings[room]["metric"], current_time, relay_control_interval) if temperature is None: continue logging.debug(room+": "+str(temperature)) current_state = relay_state(rooms_settings[room]["relays"]) if current_state != "Failed": logging.debug("Got relay_state: '"+current_state+"'") if temperature < target - hysteresis: if current_state == "0": logging.info(room+": Target temperature is "+str(target)) logging.info(room+": Current temperature is "+str(temperature)) if current_load + relays_load[rooms_settings[room]["relays"]] + load_margin > max_load: logging.warning(room+": Load too high cannot start heaters.") else: logging.info(room+": Starting heaters.") # set_relay(rooms_settings[room]["relays"], "on") sys.stdout.flush() else: logging.debug("Relay already on.") elif temperature > target + hysteresis: if current_state == "1": logging.info(room+": Target temperature is "+str(target)) logging.info(room+": Current temperature is "+str(temperature)) logging.info(room+": Stopping heaters.") sys.stdout.flush() # set_relay(rooms_settings[room]["relays"], "off") else: logging.debug("Relay already off.") time.sleep(load_shedder_interval) t1 = Thread(target=thermostat_loop) t1.daemon = True t1.start() logging.info("====== Ended successfully ======") #import argparse #import signal #from threading import Event #from http.server import BaseHTTPRequestHandler #import threading #import socketserver #from threading import Lock # #xprint_lock = Lock() # #def xprint(*args, **kwargs): # """Thread safe print function""" # with xprint_lock: # print(*args, **kwargs) # sys.stdout.flush() # #p = argparse.ArgumentParser(description='Thermostat and load shedder.') #p.add_argument("-v", "--verbosity", help="Increase output verbosity", # type=str, choices=['DEBUG', 'INFO', 'WARNING'], default='INFO') #args = p.parse_args() # #verbosity = args.verbosity #if verbosity == 'DEBUG': # logging.basicConfig(level=logging.DEBUG) #elif verbosity == 'INFO': # logging.basicConfig(level=logging.INFO) #elif verbosity == 'WARNING': # logging.basicConfig(level=logging.WARNING) # # #def api_help(): # return """ #### HTTP API description: ## /status # ## /help : This message # #"""\ #+'Temporary forced modes (will stay in effect for '+str(int(forced_mode_duration))+' seconds):\n'\ #+'\n'.join(['# /'+target+'\n' for target in targets])\ #+'\n'\ #+'Permanent forced modes:\n'\ #+'\n'.join(['# /'+mode+'\n' for mode in modes]) # #logging.info("====== Starting ======") #stop = Event() #last_data = {} # #def handler(signum, frame): # global stop # logging.info("Got interrupt: "+str(signum)) # stop.set() # logging.info("Shutdown") # #signal.signal(signal.SIGTERM,handler) #signal.signal(signal.SIGINT,handler) # #class MyHandler(BaseHTTPRequestHandler): # def do_GET(self): # global new_forced_mode # request = self.path[1:] # if request in targets or request in modes: # self.send_response(200) # new_forced_mode = request # elif self.path == '/status': # self.send_response(200) # self.send_header('Content-type', 'text/plain') # self.end_headers() # self.wfile.write(status_as_text().encode("utf-8")) # elif self.path == '/help': # self.send_response(200) # self.send_header('Content-type', 'text/plain') # self.end_headers() # self.wfile.write(api_help().encode("utf-8")) # else: # self.send_response(404) # # This rewrites the BaseHTTP logging function # def log_message(self, format, *args): # if verbosity == 'INFO': # xprint("%s - - [%s] %s" % # (self.address_string(), # self.log_date_time_string(), # format%args)) # #class WebThread(threading.Thread): # def run(self): # httpd.serve_forever() # #httpd = socketserver.TCPServer(("", http_port), MyHandler, bind_and_activate=False) #httpd.allow_reuse_address = True #httpd.server_bind() #httpd.server_activate() #webserver_thread = WebThread() #webserver_thread.start() #