docker-thermostat/thermostat.py
2024-06-03 00:32:44 +02:00

488 lines
18 KiB
Python

from flask import Flask, request
from flask_restx import Api, Resource, fields
from functools import wraps
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
import json
import logging
import yaml
import os
import sys
import datetime
from datetime import timezone
from datetime import datetime
import time
import requests
import subprocess
from threading import Thread
from threading import Lock
import sqlalchemy.exc
from sqlalchemy.dialects.sqlite import insert as sqlite_upsert
# This code has been written for
# python3 3.11.2-1+b1
# python3-flask 2.2.2-3 all micro web framework based on Werkzeug and Jinja2 - Python 3.x
# python3-flask-migrate 4.0.4-1 all SQLAlchemy migrations for Flask using Alembic and Python 3
# python3-flask-sqlalchemy 3.0.3-1 all adds SQLAlchemy support to your Python 3 Flask application
# alembic 1.8.1-2 all lightweight database migration tool for SQLAlchemy
# python3-alembic 1.8.1-2 all lightweight database migration tool for SQLAlchemy - Python module
# python3-sqlalchemy 1.4.46+ds1-1 all SQL toolkit and Object Relational Mapper for Python 3
# python3-sqlalchemy-ext:i386 1.4.46+ds1-1+b1 i386 SQL toolkit and Object Relational Mapper for Python3 - C extension
# flask-restx 1.3.0
# Flask-SQLAlchemy documentation: https://flask-sqlalchemy.palletsprojects.com/en/3.0.x/quickstart/
# We use SQLAlchemy ORM style 2.x: https://docs.sqlalchemy.org/en/20/tutorial/data_select.html
xprint_lock = Lock()
logging.basicConfig(level=logging.WARNING)
def xprint(*args, **kwargs):
"""Thread safe print function"""
with xprint_lock:
print(*args, **kwargs)
sys.stdout.flush()
authorizations = {
'apikey': {
'type': 'apiKey',
'in': 'header',
'name': 'X-API-KEY'
}
}
with open('./conf.yml') as conf:
yaml_conf = yaml.safe_load(conf)
flask_settings = yaml_conf.get("flask_settings")
api_key = yaml_conf.get("api_key")
if os.environ['FLASK_ENV'] == 'development':
flask_settings_env = yaml_conf.get("flask_settings_dev")
logging.getLogger().setLevel(logging.DEBUG)
elif os.environ['FLASK_ENV'] == 'production':
flask_settings_env = yaml_conf.get("flask_settings_prod")
logging.getLogger().setLevel(logging.WARNING)
else:
logging.error("FLASK_ENV must be set to development or production.")
sys.exit(1)
targets = yaml_conf.get("targets")
modes = yaml_conf.get("modes")
http_port = yaml_conf.get("http_port")
shedding_order = yaml_conf.get("shedding_order")
rooms_settings = yaml_conf.get("rooms_settings")
default_target = yaml_conf.get("default_target")
relays_load = yaml_conf.get("relays_load")
awake_hour = yaml_conf.get("awake_hour")
sleep_hour = yaml_conf.get("sleep_hour")
forced_mode_duration = yaml_conf.get("forced_mode_duration")
load_shedder_interval = yaml_conf.get("load_shedder_interval")
relay_control_interval = yaml_conf.get("relay_control_interval")
hysteresis = yaml_conf.get("hysteresis")
max_load = yaml_conf.get("max_load")
load_margin = yaml_conf.get("load_margin")
def auth_required(func):
func = api.doc(security='apikey')(func)
@wraps(func)
def check_auth(*args, **kwargs):
if 'X-API-KEY' not in request.headers:
api.abort(401, 'API key required')
key = request.headers['X-API-KEY']
# Check key validity
if key != api_key:
api.abort(401, 'Wrong API key')
return func(*args, **kwargs)
return check_auth
##
def now():
return "["+datetime.now().strftime("%c")+"]"
def enabled_rooms():
rooms_list=[]
for room in rooms_settings:
if rooms_settings[room]["enabled"]:
rooms_list.append(room)
return rooms_list
def status_as_text():
return 'target: '+target_name+'\n'\
+'forced_mode: '+str(forced_mode)+'\n'\
+'\n'.join(['Target temperature for '+room+': '+str(rooms_settings[room][target_name])+'\n'+'Current temperature for '+room+': '+str(get_metric(rooms_settings[room]["metric"], current_time, relay_control_interval)) for room in enabled_rooms()])
def status_as_dict():
return {"target": target_name,\
"forced_mode": str(forced_mode),\
"enabled_rooms": {room: {'Target temperature': str(rooms_settings[room][target_name]), 'Current temperature': str(get_metric(rooms_settings[room]["metric"], current_time, relay_control_interval))} for room in enabled_rooms()}}
def relay_state(relay):
try:
returned_output = subprocess.check_output(["./relay.py", relay, "status"])
return returned_output.strip().decode("utf-8")
except Exception as e:
logging.error(e)
logging.error("relay "+relay+" status: Failed to command relays board.")
sys.stdout.flush()
return "Failed"
def set_relay(relay, state):
try:
returned_output = subprocess.check_output(["./relay.py", relay, state])
logging.info("set relay "+relay+" to "+state+", new global status: "+returned_output.decode("utf-8").split('\n')[1])
sys.stdout.flush()
return "OK"
except Exception as e:
logging.error(e)
logging.error("set relay "+relay+" to "+state+": Failed to command relays board.")
sys.stdout.flush()
return "KO"
def get_metric(metric, current_time, interval):
url = "http://localhost:3000/"+metric
try:
r = requests.get(url)
data = json.loads(r.text)
timestamp = datetime.fromisoformat(data['timestamp']).replace(tzinfo=timezone.utc).timestamp()
if current_time - timestamp < interval * 2:
return data['value']
else:
logging.warning("WARNING: No recent load data available.")
except Exception as e:
logging.error(e)
sys.stdout.flush()
return None
def get_forced_mode():
with app.app_context():
row = db.session.execute(db.select(Set_mode.value, Set_mode.timestamp)).first()
if row is None:
return None
#cur.execute("SELECT value, timestamp FROM set_mode WHERE name='mode'")
#row = cur.fetchone()
xprint(row)
data = dict(zip(['value', 'timestamp'], row))
timestamp = data['timestamp'].replace(tzinfo=timezone.utc).timestamp()
# We ignore old targets but never ignore absence modes
if data['value'] in targets and time.time() - timestamp > forced_mode_duration:
logging.debug("Ignoring old set mode.")
return None
else:
return data['value']
##
app = Flask(__name__)
app.config.from_mapping(flask_settings)
app.config.from_mapping(flask_settings_env)
db = SQLAlchemy(app)
api = Api(app, version='1.0', title='Thermostat',
description='API to read and set thermostat.', authorizations=authorizations)
ns_thermostat = api.namespace('thermostat/', description='Thermostat API')
# we will only use name="mode" for set_mode table
# only modes that are set manually will be recorded in the database
class Set_mode(db.Model):
__tablename__ = "set_mode"
name = db.Column(db.String, primary_key=True, default='mode', server_default='mode', nullable=False)
value = db.Column(db.String, nullable=False)
timestamp = db.Column(db.DateTime, server_default=db.text("CURRENT_TIMESTAMP"), nullable=False)
Set_mode_resource_fields = {
'name': fields.String(description='mode type'),
'value': fields.String(required=True, description='value', enum=targets+modes),
'time': fields.DateTime(dt_format='iso8601'),
}
Set_mode_model = api.model('Set_mode_Model', Set_mode_resource_fields)
@ns_thermostat.route('/set_mode')
class Set_mode_thermostat(Resource):
@auth_required
@api.expect(Set_mode_model, validate=True)
def post(self):
try:
data = Set_mode(**request.json)
except Exception as e:
logging.error(e)
return "K0", 400
new_forced_mode = data.value
return "OK", 201
@ns_thermostat.route('/status')
class Status_thermostat(Resource):
@auth_required
def get(self):
result = status_as_dict()
logging.debug(result)
return result
Set_verbosity_resource_fields = {
'value': fields.String(required=True, description='Verbosity', enum=["DEBUG", "INFO", "WARNING"])
}
Set_verbosity_model = api.model('Set_verbosity_Model', Set_verbosity_resource_fields)
@ns_thermostat.route('/set_verbosity')
class Set_verbosity_thermostat(Resource):
@auth_required
@api.expect(Set_verbosity_model, validate=True)
def put(self):
xprint(api.payload.value)
if api.payload.value == 'DEBUG':
logging.getLogger().setLevel(logging.DEBUG)
elif api.payload.value == 'INFO':
logging.getLogger().setLevel(logging.INFO)
elif api.payload.value == 'WARNING':
logging.getLogger().setLevel(logging.WARNING)
return "OK", 201
api.add_namespace(ns_thermostat)
with app.app_context():
db.create_all()
migrate = Migrate(app, db, compare_type=True)
# TODO: Get Linky overload warning
#cursor.execute("SELECT * FROM set_mode")
#rows = cursor.fetchall()
#for row in rows:
# xprint(row)
#sys.stdout.flush()
target_name = default_target
forced_mode = None
current_time = time.time()
new_forced_mode = None
def thermostat_loop():
start_time = time.time()
last_control_time = None
first_loop = True
global target_name
global forced_mode
global new_forced_mode
global current_time
while True:
# if stop.is_set():
# httpd.shutdown()
# httpd.server_close()
# dbconn.close()
# break
if new_forced_mode is not None:
with app.app_context():
data = Set_mode({"value": new_forced_mode})
db.session.add(data)
db.session.commit()
# try:
# db.session.add(data)
# db.session.commit()
# return "OK", 201
# except sqlalchemy.exc.IntegrityError as e:
# try:
# db.session.rollback()
# cur_mode = db.session.execute(db.select(Set_mode).filter_by(name="mode")).scalar_one()
# cur_mode = data
# db.session.commit()
# except Exception as e:
# db.session.rollback()
# logging.error(e)
# return "K0", 400
# except Exception as e:
# db.session.rollback()
# logging.error(e)
# return "K0", 400
#cursor.execute("INSERT OR REPLACE INTO set_mode (value) VALUES ('"+new_forced_mode+"')")
#dbconn.commit()
logging.info("Switch to "+new_forced_mode)
target_name = new_forced_mode
new_forced_mode = None
# Force immediate action:
last_control_time = None
current_time = time.time()
current_date = datetime.now()
today_awake_time = current_date.replace(hour=int(awake_hour.split(':')[0]), minute=int(awake_hour.split(':')[1]), second=0, microsecond=0)
today_sleep_time = current_date.replace(hour=int(sleep_hour.split(':')[0]), minute=int(sleep_hour.split(':')[1]), second=0, microsecond=0)
forced_mode = get_forced_mode()
if forced_mode is not None and forced_mode in targets:
if target_name != forced_mode:
target_name = forced_mode
logging.info("Switch to "+forced_mode)
else:
if forced_mode == "long_absence":
if target_name != "target_frost_protection" or first_loop:
target_name = "target_frost_protection"
logging.info("Switch to "+target_name)
elif forced_mode == "short_absence" or first_loop:
if target_name != "target_sleep_temperature":
target_name = "target_sleep_temperature"
logging.info("Switch to "+target_name)
elif current_date > today_awake_time and current_date < today_sleep_time:
if target_name != "target_unconfirmed_awake_temperature" and target_name != "target_awake_temperature":
target_name = "target_unconfirmed_awake_temperature"
logging.info("Switch to unconfirmed awake mode.")
elif current_date < today_awake_time or current_date > today_sleep_time:
if target_name != "target_unconfirmed_sleep_temperature" and target_name != "target_sleep_temperature":
target_name = "target_unconfirmed_sleep_temperature"
logging.info("Switch to unconfirmed sleep mode.")
first_loop = False
# Load shedder
current_load = get_metric("Modane_elec_main_power", current_time, load_shedder_interval)
if current_load is None:
time.sleep(load_shedder_interval)
continue
elif max_load - current_load < load_margin:
logging.warning("Load too high: "+str(current_load)+"VA")
total_shedded = 0
for room in shedding_order:
current_state = relay_state(rooms_settings[room]["relays"])
if current_state != "Failed":
logging.debug("Got relay_state: '"+current_state+"'")
if current_state == "1":
result = set_relay(rooms_settings[room]["relays"], "off")
if result == "OK":
total_shedded += relays_load[rooms_settings[room]["relays"]]
if max_load - current_load - total_shedded < load_margin:
logging.info("Load should be back to normal.")
break
# Thermostat
if last_control_time is None or current_time - last_control_time > relay_control_interval:
last_control_time = current_time
for room in rooms_settings:
if not rooms_settings[room]["enabled"]:
continue
target = rooms_settings[room][target_name]
logging.debug("Target: "+str(target))
temperature = get_metric(rooms_settings[room]["metric"], current_time, relay_control_interval)
if temperature is None:
continue
logging.debug(room+": "+str(temperature))
current_state = relay_state(rooms_settings[room]["relays"])
if current_state != "Failed":
logging.debug("Got relay_state: '"+current_state+"'")
if temperature < target - hysteresis:
if current_state == "0":
logging.info(room+": Target temperature is "+str(target))
logging.info(room+": Current temperature is "+str(temperature))
if current_load + relays_load[rooms_settings[room]["relays"]] + load_margin > max_load:
logging.warning(room+": Load too high cannot start heaters.")
else:
logging.info(room+": Starting heaters.")
# set_relay(rooms_settings[room]["relays"], "on")
sys.stdout.flush()
else:
logging.debug("Relay already on.")
elif temperature > target + hysteresis:
if current_state == "1":
logging.info(room+": Target temperature is "+str(target))
logging.info(room+": Current temperature is "+str(temperature))
logging.info(room+": Stopping heaters.")
sys.stdout.flush()
# set_relay(rooms_settings[room]["relays"], "off")
else:
logging.debug("Relay already off.")
time.sleep(load_shedder_interval)
t1 = Thread(target=thermostat_loop)
t1.daemon = True
t1.start()
logging.info("====== Ended successfully ======")
#import argparse
#import signal
#from threading import Event
#from http.server import BaseHTTPRequestHandler
#import threading
#import socketserver
#p = argparse.ArgumentParser(description='Thermostat and load shedder.')
#p.add_argument("-v", "--verbosity", help="Increase output verbosity",
# type=str, choices=['DEBUG', 'INFO', 'WARNING'], default='INFO')
#args = p.parse_args()
#
#verbosity = args.verbosity
#if verbosity == 'DEBUG':
# logging.basicConfig(level=logging.DEBUG)
#elif verbosity == 'INFO':
# logging.basicConfig(level=logging.INFO)
#elif verbosity == 'WARNING':
# logging.basicConfig(level=logging.WARNING)
#
#
#def api_help():
# return """
#### HTTP API description:
## /status
#
## /help : This message
#
#"""\
#+'Temporary forced modes (will stay in effect for '+str(int(forced_mode_duration))+' seconds):\n'\
#+'\n'.join(['# /'+target+'\n' for target in targets])\
#+'\n'\
#+'Permanent forced modes:\n'\
#+'\n'.join(['# /'+mode+'\n' for mode in modes])
#
#logging.info("====== Starting ======")
#stop = Event()
#last_data = {}
#
#def handler(signum, frame):
# global stop
# logging.info("Got interrupt: "+str(signum))
# stop.set()
# logging.info("Shutdown")
#
#signal.signal(signal.SIGTERM,handler)
#signal.signal(signal.SIGINT,handler)
#
#class MyHandler(BaseHTTPRequestHandler):
# def do_GET(self):
# global new_forced_mode
# request = self.path[1:]
# if request in targets or request in modes:
# self.send_response(200)
# new_forced_mode = request
# elif self.path == '/status':
# self.send_response(200)
# self.send_header('Content-type', 'text/plain')
# self.end_headers()
# self.wfile.write(status_as_text().encode("utf-8"))
# elif self.path == '/help':
# self.send_response(200)
# self.send_header('Content-type', 'text/plain')
# self.end_headers()
# self.wfile.write(api_help().encode("utf-8"))
# else:
# self.send_response(404)
# # This rewrites the BaseHTTP logging function
# def log_message(self, format, *args):
# if verbosity == 'INFO':
# xprint("%s - - [%s] %s" %
# (self.address_string(),
# self.log_date_time_string(),
# format%args))
#
#class WebThread(threading.Thread):
# def run(self):
# httpd.serve_forever()
#
#httpd = socketserver.TCPServer(("", http_port), MyHandler, bind_and_activate=False)
#httpd.allow_reuse_address = True
#httpd.server_bind()
#httpd.server_activate()
#webserver_thread = WebThread()
#webserver_thread.start()
#