from flask import Flask, request from flask_restx import Api, Resource, fields from functools import wraps from flask_sqlalchemy import SQLAlchemy from flask_migrate import Migrate import json import logging import yaml import os import sys import datetime import time import requests # This code has been written for # python3-flask 2.2.2-3 all micro web framework based on Werkzeug and Jinja2 - Python 3.x # python3-flask-migrate 4.0.4-1 all SQLAlchemy migrations for Flask using Alembic and Python 3 # python3-flask-sqlalchemy 3.0.3-1 all adds SQLAlchemy support to your Python 3 Flask application # alembic 1.8.1-2 all lightweight database migration tool for SQLAlchemy # python3-alembic 1.8.1-2 all lightweight database migration tool for SQLAlchemy - Python module # python3-sqlalchemy 1.4.46+ds1-1 all SQL toolkit and Object Relational Mapper for Python 3 # python3-sqlalchemy-ext:i386 1.4.46+ds1-1+b1 i386 SQL toolkit and Object Relational Mapper for Python3 - C extension # Flask-SQLAlchemy documentation: https://flask-sqlalchemy.palletsprojects.com/en/3.0.x/quickstart/ # We use SQLAlchemy ORM style 2.x: https://docs.sqlalchemy.org/en/20/tutorial/data_select.html logging.basicConfig(level=logging.WARNING) authorizations = { 'apikey': { 'type': 'apiKey', 'in': 'header', 'name': 'X-API-KEY' } } with open('./conf.yml') as conf: yaml_conf = yaml.safe_load(conf) flask_settings = yaml_conf.get("flask_settings") api_key = yaml_conf.get("api_key") if os.environ['FLASK_ENV'] == 'development': flask_settings_env = yaml_conf.get("flask_settings_dev") logging.getLogger().setLevel(logging.DEBUG) elif os.environ['FLASK_ENV'] == 'production': flask_settings_env = yaml_conf.get("flask_settings_prod") logging.getLogger().setLevel(logging.WARNING) else: logging.error("FLASK_ENV must be set to development or production.") sys.exit(1) targets = yaml_conf.get("targets") modes = yaml_conf.get("modes") http_port = yaml_conf.get("http_port") shedding_order = yaml_conf.get("shedding_order") rooms_settings = yaml_conf.get("rooms_settings") default_target = yaml_conf.get("default_target") relays_load = yaml_conf.get("relays_load") awake_hour = yaml_conf.get("awake_hour") sleep_hour = yaml_conf.get("sleep_hour") forced_mode_duration = yaml_conf.get("forced_mode_duration") load_shedder_interval = yaml_conf.get("load_shedder_interval") relay_control_interval = yaml_conf.get("relay_control_interval") hysteresis = yaml_conf.get("hysteresis") max_load = yaml_conf.get("max_load") load_margin = yaml_conf.get("load_margin") def auth_required(func): func = api.doc(security='apikey')(func) @wraps(func) def check_auth(*args, **kwargs): if 'X-API-KEY' not in request.headers: api.abort(401, 'API key required') key = request.headers['X-API-KEY'] # Check key validity if key != api_key: api.abort(401, 'Wrong API key') return func(*args, **kwargs) return check_auth ## def now(): return "["+datetime.datetime.now().strftime("%c")+"]" def enabled_rooms(): rooms_list=[] for room in rooms_settings: if rooms_settings[room]["enabled"]: rooms_list.append(room) return rooms_list def status_as_text(): return 'target: '+target_name+'\n'\ +'forced_mode: '+str(forced_mode)+'\n'\ +'\n'.join(['Target temperature for '+room+': '+str(rooms_settings[room][target_name])+'\n'+'Current temperature for '+room+': '+str(get_metric(rooms_settings[room]["metric"], current_time, relay_control_interval)) for room in enabled_rooms()]) def relay_state(relay): try: returned_output = subprocess.check_output(["./relay.py", relay, "status"]) return returned_output.strip().decode("utf-8") except Exception as e: logging.error(e) logging.error("relay "+relay+" status: Failed to command relays board.") sys.stdout.flush() return "Failed" def set_relay(relay, state): try: returned_output = subprocess.check_output(["./relay.py", relay, state]) logging.info("set relay "+relay+" to "+state+", new global status: "+returned_output.decode("utf-8").split('\n')[1]) sys.stdout.flush() return "OK" except Exception as e: logging.error(e) logging.error("set relay "+relay+" to "+state+": Failed to command relays board.") sys.stdout.flush() return "KO" def get_metric(metric, current_time, interval): url = "http://localhost:3000/"+metric try: r = requests.get(url) data = json.loads(r.text) timestamp = getDateTimeFromISO8601String(data['timestamp']).replace(tzinfo=timezone.utc).timestamp() if current_time - timestamp < interval * 2: return data['value'] else: logging.warning("WARNING: No recent load data available.") except Exception as e: logging.error(e) sys.stdout.flush() return None def get_forced_mode(): with app.app_context(): row = db.session.execute(db.select(Set_mode.value, Set_mode.timestamp)).first() if row is None: return None #cur.execute("SELECT value, timestamp FROM set_mode WHERE name='mode'") #row = cur.fetchone() data = dict(zip(['value', 'timestamp'], row)) timestamp = getDateTimeFromISO8601String(data['timestamp']).replace(tzinfo=timezone.utc).timestamp() # We ignore old targets but never ignore absence modes if data['value'] in targets and time.time() - timestamp > forced_mode_duration: logging.debug("Ignoring old set mode.") return None else: return data['value'] ## app = Flask(__name__) app.config.from_mapping(flask_settings) app.config.from_mapping(flask_settings_env) db = SQLAlchemy(app) api = Api(app, version='1.0', title='Thermostat', description='API to read and set thermostat.', authorizations=authorizations) ns_thermostat = api.namespace('thermostat/', description='Thermostat API') # we will only use name="mode" for set_mode table # only modes that are set manually will be recorded in the database class Set_mode(db.Model): __tablename__ = "set_mode" name = db.Column(db.String, primary_key=True, server_default='mode', nullable=False) value = db.Column(db.String, nullable=False) timestamp = db.Column(db.DateTime, server_default=db.text("CURRENT_TIMESTAMP"), nullable=False) Set_mode_resource_fields = { 'name': fields.String(description='mode type'), 'value': fields.String(required=True, description='value'), 'time': fields.DateTime(dt_format='iso8601'), } Set_mode_model = api.model('Set_mode_Model', Set_mode_resource_fields) @ns_thermostat.route('/set_mode') class Set_mode_thermostat(Resource): @auth_required @api.expect(Set_mode_model, validate=True) def post(self): try: data = Set_mode(**request.json) db.session.add(data) db.session.commit() return "OK", 201 except Exception as e: logging.error(e) return "K0", 400 @ns_thermostat.route('/status') class Status_thermostat(Resource): @auth_required def get(self): result = dict(datas=status_as_text()) logging.debug(result) return result api.add_namespace(ns_thermostat) with app.app_context(): db.create_all() migrate = Migrate(app, db, compare_type=True) start_time = time.time() last_control_time = None new_forced_mode = None target_name = default_target first_loop = True # TODO: Get Linky overload warning #cursor.execute("SELECT * FROM set_mode") #rows = cursor.fetchall() #for row in rows: # print(row) #sys.stdout.flush() while True: # if stop.is_set(): # httpd.shutdown() # httpd.server_close() # dbconn.close() # break if new_forced_mode is not None: with app.app_context(): data = Set_mode({"value": new_forced_mode}) db.session.add(data) db.session.commit() #cursor.execute("INSERT OR REPLACE INTO set_mode (value) VALUES ('"+new_forced_mode+"')") #dbconn.commit() logging.info("Switch to "+new_forced_mode) target_name = new_forced_mode new_forced_mode = None # Force immediate action: last_control_time = None current_time = time.time() current_date = datetime.datetime.now() today_awake_time = current_date.replace(hour=int(awake_hour.split(':')[0]), minute=int(awake_hour.split(':')[1]), second=0, microsecond=0) today_sleep_time = current_date.replace(hour=int(sleep_hour.split(':')[0]), minute=int(sleep_hour.split(':')[1]), second=0, microsecond=0) forced_mode = get_forced_mode() if forced_mode is not None and forced_mode in targets: if target_name != forced_mode: target_name = forced_mode logging.info("Switch to "+forced_mode) else: if forced_mode == "long_absence": if target_name != "target_frost_protection" or first_loop: target_name = "target_frost_protection" logging.info("Switch to "+target_name) elif forced_mode == "short_absence" or first_loop: if target_name != "target_sleep_temperature": target_name = "target_sleep_temperature" logging.info("Switch to "+target_name) elif current_date > today_awake_time and current_date < today_sleep_time: if target_name != "target_unconfirmed_awake_temperature" and target_name != "target_awake_temperature": target_name = "target_unconfirmed_awake_temperature" logging.info("Switch to unconfirmed awake mode.") elif current_date < today_awake_time or current_date > today_sleep_time: if target_name != "target_unconfirmed_sleep_temperature" and target_name != "target_sleep_temperature": target_name = "target_unconfirmed_sleep_temperature" logging.info("Switch to unconfirmed sleep mode.") first_loop = False # Load shedder current_load = get_metric("Modane_elec_main_power", current_time, load_shedder_interval) if current_load is None: time.sleep(load_shedder_interval) continue elif max_load - current_load < load_margin: logging.warning("Load too high: "+str(current_load)+"VA") total_shedded = 0 for room in shedding_order: current_state = relay_state(rooms_settings[room]["relays"]) if current_state != "Failed": logging.debug("Got relay_state: '"+current_state+"'") if current_state == "1": result = set_relay(rooms_settings[room]["relays"], "off") if result == "OK": total_shedded += relays_load[rooms_settings[room]["relays"]] if max_load - current_load - total_shedded < load_margin: logging.info("Load should be back to normal.") break # Thermostat if last_control_time is None or current_time - last_control_time > relay_control_interval: last_control_time = current_time for room in rooms_settings: if not rooms_settings[room]["enabled"]: continue target = rooms_settings[room][target_name] logging.debug("Target: "+str(target)) temperature = get_metric(rooms_settings[room]["metric"], current_time, relay_control_interval) if temperature is None: continue logging.debug(room+": "+str(temperature)) current_state = relay_state(rooms_settings[room]["relays"]) if current_state != "Failed": logging.debug("Got relay_state: '"+current_state+"'") if temperature < target - hysteresis: if current_state == "0": logging.info(room+": Target temperature is "+str(target)) logging.info(room+": Current temperature is "+str(temperature)) if current_load + relays_load[rooms_settings[room]["relays"]] + load_margin > max_load: logging.warning(room+": Load too high cannot start heaters.") else: logging.info(room+": Starting heaters.") set_relay(rooms_settings[room]["relays"], "on") sys.stdout.flush() else: logging.debug("Relay already on.") elif temperature > target + hysteresis: if current_state == "1": logging.info(room+": Target temperature is "+str(target)) logging.info(room+": Current temperature is "+str(temperature)) logging.info(room+": Stopping heaters.") sys.stdout.flush() set_relay(rooms_settings[room]["relays"], "off") else: logging.debug("Relay already off.") time.sleep(load_shedder_interval) logging.info("====== Ended successfully ======") #import subprocess #from datetime import timezone #import argparse #import signal #from threading import Event #from http.server import BaseHTTPRequestHandler #import threading #import socketserver # ## As of Python 3.7 there is a method datetime.fromisoformat() which is exactly the reverse for isoformat(). ## So this will no longer be necessary. #from dateutil import parser #def getDateTimeFromISO8601String(s): # d = parser.parse(s) # return d # #from threading import Lock # #xprint_lock = Lock() # #def xprint(*args, **kwargs): # """Thread safe print function""" # with xprint_lock: # print(*args, **kwargs) # sys.stdout.flush() # #p = argparse.ArgumentParser(description='Thermostat and load shedder.') #p.add_argument("-v", "--verbosity", help="Increase output verbosity", # type=str, choices=['DEBUG', 'INFO', 'WARNING'], default='INFO') #args = p.parse_args() # #verbosity = args.verbosity #if verbosity == 'DEBUG': # logging.basicConfig(level=logging.DEBUG) #elif verbosity == 'INFO': # logging.basicConfig(level=logging.INFO) #elif verbosity == 'WARNING': # logging.basicConfig(level=logging.WARNING) # # #def api_help(): # return """ #### HTTP API description: ## /status # ## /help : This message # #"""\ #+'Temporary forced modes (will stay in effect for '+str(int(forced_mode_duration))+' seconds):\n'\ #+'\n'.join(['# /'+target+'\n' for target in targets])\ #+'\n'\ #+'Permanent forced modes:\n'\ #+'\n'.join(['# /'+mode+'\n' for mode in modes]) # #logging.info("====== Starting ======") #stop = Event() #last_data = {} # #def handler(signum, frame): # global stop # logging.info("Got interrupt: "+str(signum)) # stop.set() # logging.info("Shutdown") # #signal.signal(signal.SIGTERM,handler) #signal.signal(signal.SIGINT,handler) # #class MyHandler(BaseHTTPRequestHandler): # def do_GET(self): # global new_forced_mode # request = self.path[1:] # if request in targets or request in modes: # self.send_response(200) # new_forced_mode = request # elif self.path == '/status': # self.send_response(200) # self.send_header('Content-type', 'text/plain') # self.end_headers() # self.wfile.write(status_as_text().encode("utf-8")) # elif self.path == '/help': # self.send_response(200) # self.send_header('Content-type', 'text/plain') # self.end_headers() # self.wfile.write(api_help().encode("utf-8")) # else: # self.send_response(404) # # This rewrites the BaseHTTP logging function # def log_message(self, format, *args): # if verbosity == 'INFO': # xprint("%s - - [%s] %s" % # (self.address_string(), # self.log_date_time_string(), # format%args)) # #class WebThread(threading.Thread): # def run(self): # httpd.serve_forever() # #httpd = socketserver.TCPServer(("", http_port), MyHandler, bind_and_activate=False) #httpd.allow_reuse_address = True #httpd.server_bind() #httpd.server_activate() #webserver_thread = WebThread() #webserver_thread.start() #