docker-thermostat/thermostat.py
2024-06-02 14:50:09 +02:00

435 lines
16 KiB
Python

from flask import Flask, request
from flask_restx import Api, Resource, fields
from functools import wraps
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
import json
import logging
import yaml
import os
import sys
import datetime
from datetime import timezone
from datetime import datetime
import time
import requests
import subprocess
from threading import Thread
# This code has been written for
# python3 3.11.2-1+b1
# python3-flask 2.2.2-3 all micro web framework based on Werkzeug and Jinja2 - Python 3.x
# python3-flask-migrate 4.0.4-1 all SQLAlchemy migrations for Flask using Alembic and Python 3
# python3-flask-sqlalchemy 3.0.3-1 all adds SQLAlchemy support to your Python 3 Flask application
# alembic 1.8.1-2 all lightweight database migration tool for SQLAlchemy
# python3-alembic 1.8.1-2 all lightweight database migration tool for SQLAlchemy - Python module
# python3-sqlalchemy 1.4.46+ds1-1 all SQL toolkit and Object Relational Mapper for Python 3
# python3-sqlalchemy-ext:i386 1.4.46+ds1-1+b1 i386 SQL toolkit and Object Relational Mapper for Python3 - C extension
# Flask-SQLAlchemy documentation: https://flask-sqlalchemy.palletsprojects.com/en/3.0.x/quickstart/
# We use SQLAlchemy ORM style 2.x: https://docs.sqlalchemy.org/en/20/tutorial/data_select.html
logging.basicConfig(level=logging.WARNING)
authorizations = {
'apikey': {
'type': 'apiKey',
'in': 'header',
'name': 'X-API-KEY'
}
}
with open('./conf.yml') as conf:
yaml_conf = yaml.safe_load(conf)
flask_settings = yaml_conf.get("flask_settings")
api_key = yaml_conf.get("api_key")
if os.environ['FLASK_ENV'] == 'development':
flask_settings_env = yaml_conf.get("flask_settings_dev")
logging.getLogger().setLevel(logging.DEBUG)
elif os.environ['FLASK_ENV'] == 'production':
flask_settings_env = yaml_conf.get("flask_settings_prod")
logging.getLogger().setLevel(logging.WARNING)
else:
logging.error("FLASK_ENV must be set to development or production.")
sys.exit(1)
targets = yaml_conf.get("targets")
modes = yaml_conf.get("modes")
http_port = yaml_conf.get("http_port")
shedding_order = yaml_conf.get("shedding_order")
rooms_settings = yaml_conf.get("rooms_settings")
default_target = yaml_conf.get("default_target")
relays_load = yaml_conf.get("relays_load")
awake_hour = yaml_conf.get("awake_hour")
sleep_hour = yaml_conf.get("sleep_hour")
forced_mode_duration = yaml_conf.get("forced_mode_duration")
load_shedder_interval = yaml_conf.get("load_shedder_interval")
relay_control_interval = yaml_conf.get("relay_control_interval")
hysteresis = yaml_conf.get("hysteresis")
max_load = yaml_conf.get("max_load")
load_margin = yaml_conf.get("load_margin")
def auth_required(func):
func = api.doc(security='apikey')(func)
@wraps(func)
def check_auth(*args, **kwargs):
if 'X-API-KEY' not in request.headers:
api.abort(401, 'API key required')
key = request.headers['X-API-KEY']
# Check key validity
if key != api_key:
api.abort(401, 'Wrong API key')
return func(*args, **kwargs)
return check_auth
##
def now():
return "["+datetime.now().strftime("%c")+"]"
def enabled_rooms():
rooms_list=[]
for room in rooms_settings:
if rooms_settings[room]["enabled"]:
rooms_list.append(room)
return rooms_list
def status_as_text():
return 'target: '+target_name+'\n'\
+'forced_mode: '+str(forced_mode)+'\n'\
+'\n'.join(['Target temperature for '+room+': '+str(rooms_settings[room][target_name])+'\n'+'Current temperature for '+room+': '+str(get_metric(rooms_settings[room]["metric"], current_time, relay_control_interval)) for room in enabled_rooms()])
def relay_state(relay):
try:
returned_output = subprocess.check_output(["./relay.py", relay, "status"])
return returned_output.strip().decode("utf-8")
except Exception as e:
logging.error(e)
logging.error("relay "+relay+" status: Failed to command relays board.")
sys.stdout.flush()
return "Failed"
def set_relay(relay, state):
try:
returned_output = subprocess.check_output(["./relay.py", relay, state])
logging.info("set relay "+relay+" to "+state+", new global status: "+returned_output.decode("utf-8").split('\n')[1])
sys.stdout.flush()
return "OK"
except Exception as e:
logging.error(e)
logging.error("set relay "+relay+" to "+state+": Failed to command relays board.")
sys.stdout.flush()
return "KO"
def get_metric(metric, current_time, interval):
url = "http://localhost:3000/"+metric
try:
r = requests.get(url)
data = json.loads(r.text)
timestamp = datetime.fromisoformat(data['timestamp']).replace(tzinfo=timezone.utc).timestamp()
if current_time - timestamp < interval * 2:
return data['value']
else:
logging.warning("WARNING: No recent load data available.")
except Exception as e:
logging.error(e)
sys.stdout.flush()
return None
def get_forced_mode():
with app.app_context():
row = db.session.execute(db.select(Set_mode.value, Set_mode.timestamp)).first()
if row is None:
return None
#cur.execute("SELECT value, timestamp FROM set_mode WHERE name='mode'")
#row = cur.fetchone()
data = dict(zip(['value', 'timestamp'], row))
timestamp = datetime.fromisoformat(data['timestamp']).replace(tzinfo=timezone.utc).timestamp()
# We ignore old targets but never ignore absence modes
if data['value'] in targets and time.time() - timestamp > forced_mode_duration:
logging.debug("Ignoring old set mode.")
return None
else:
return data['value']
##
app = Flask(__name__)
app.config.from_mapping(flask_settings)
app.config.from_mapping(flask_settings_env)
db = SQLAlchemy(app)
api = Api(app, version='1.0', title='Thermostat',
description='API to read and set thermostat.', authorizations=authorizations)
ns_thermostat = api.namespace('thermostat/', description='Thermostat API')
# we will only use name="mode" for set_mode table
# only modes that are set manually will be recorded in the database
class Set_mode(db.Model):
__tablename__ = "set_mode"
name = db.Column(db.String, primary_key=True, server_default='mode', nullable=False)
value = db.Column(db.String, nullable=False)
timestamp = db.Column(db.DateTime, server_default=db.text("CURRENT_TIMESTAMP"), nullable=False)
Set_mode_resource_fields = {
'name': fields.String(description='mode type'),
'value': fields.String(required=True, description='value'),
'time': fields.DateTime(dt_format='iso8601'),
}
Set_mode_model = api.model('Set_mode_Model', Set_mode_resource_fields)
@ns_thermostat.route('/set_mode')
class Set_mode_thermostat(Resource):
@auth_required
@api.expect(Set_mode_model, validate=True)
def post(self):
try:
data = Set_mode(**request.json)
db.session.add(data)
db.session.commit()
return "OK", 201
except Exception as e:
logging.error(e)
return "K0", 400
@ns_thermostat.route('/status')
class Status_thermostat(Resource):
@auth_required
def get(self):
result = dict(datas=status_as_text())
logging.debug(result)
return result
api.add_namespace(ns_thermostat)
with app.app_context():
db.create_all()
migrate = Migrate(app, db, compare_type=True)
# TODO: Get Linky overload warning
#cursor.execute("SELECT * FROM set_mode")
#rows = cursor.fetchall()
#for row in rows:
# print(row)
#sys.stdout.flush()
def thermostat_loop():
start_time = time.time()
last_control_time = None
new_forced_mode = None
target_name = default_target
first_loop = True
while True:
# if stop.is_set():
# httpd.shutdown()
# httpd.server_close()
# dbconn.close()
# break
if new_forced_mode is not None:
with app.app_context():
data = Set_mode({"value": new_forced_mode})
db.session.add(data)
db.session.commit()
# #cursor.execute("INSERT OR REPLACE INTO set_mode (value) VALUES ('"+new_forced_mode+"')")
# #dbconn.commit()
logging.info("Switch to "+new_forced_mode)
target_name = new_forced_mode
new_forced_mode = None
# Force immediate action:
last_control_time = None
current_time = time.time()
current_date = datetime.now()
today_awake_time = current_date.replace(hour=int(awake_hour.split(':')[0]), minute=int(awake_hour.split(':')[1]), second=0, microsecond=0)
today_sleep_time = current_date.replace(hour=int(sleep_hour.split(':')[0]), minute=int(sleep_hour.split(':')[1]), second=0, microsecond=0)
forced_mode = get_forced_mode()
# if forced_mode is not None and forced_mode in targets:
# if target_name != forced_mode:
# target_name = forced_mode
# logging.info("Switch to "+forced_mode)
# else:
# if forced_mode == "long_absence":
# if target_name != "target_frost_protection" or first_loop:
# target_name = "target_frost_protection"
# logging.info("Switch to "+target_name)
# elif forced_mode == "short_absence" or first_loop:
# if target_name != "target_sleep_temperature":
# target_name = "target_sleep_temperature"
# logging.info("Switch to "+target_name)
# elif current_date > today_awake_time and current_date < today_sleep_time:
# if target_name != "target_unconfirmed_awake_temperature" and target_name != "target_awake_temperature":
# target_name = "target_unconfirmed_awake_temperature"
# logging.info("Switch to unconfirmed awake mode.")
# elif current_date < today_awake_time or current_date > today_sleep_time:
# if target_name != "target_unconfirmed_sleep_temperature" and target_name != "target_sleep_temperature":
# target_name = "target_unconfirmed_sleep_temperature"
# logging.info("Switch to unconfirmed sleep mode.")
#
# first_loop = False
#
# # Load shedder
# current_load = get_metric("Modane_elec_main_power", current_time, load_shedder_interval)
# if current_load is None:
# time.sleep(load_shedder_interval)
# continue
# elif max_load - current_load < load_margin:
# logging.warning("Load too high: "+str(current_load)+"VA")
# total_shedded = 0
# for room in shedding_order:
# current_state = relay_state(rooms_settings[room]["relays"])
# if current_state != "Failed":
# logging.debug("Got relay_state: '"+current_state+"'")
# if current_state == "1":
# result = set_relay(rooms_settings[room]["relays"], "off")
# if result == "OK":
# total_shedded += relays_load[rooms_settings[room]["relays"]]
# if max_load - current_load - total_shedded < load_margin:
# logging.info("Load should be back to normal.")
# break
#
# # Thermostat
# if last_control_time is None or current_time - last_control_time > relay_control_interval:
# last_control_time = current_time
# for room in rooms_settings:
# if not rooms_settings[room]["enabled"]:
# continue
# target = rooms_settings[room][target_name]
# logging.debug("Target: "+str(target))
# temperature = get_metric(rooms_settings[room]["metric"], current_time, relay_control_interval)
# if temperature is None:
# continue
# logging.debug(room+": "+str(temperature))
# current_state = relay_state(rooms_settings[room]["relays"])
# if current_state != "Failed":
# logging.debug("Got relay_state: '"+current_state+"'")
# if temperature < target - hysteresis:
# if current_state == "0":
# logging.info(room+": Target temperature is "+str(target))
# logging.info(room+": Current temperature is "+str(temperature))
# if current_load + relays_load[rooms_settings[room]["relays"]] + load_margin > max_load:
# logging.warning(room+": Load too high cannot start heaters.")
# else:
# logging.info(room+": Starting heaters.")
# set_relay(rooms_settings[room]["relays"], "on")
# sys.stdout.flush()
# else:
# logging.debug("Relay already on.")
#
# elif temperature > target + hysteresis:
# if current_state == "1":
# logging.info(room+": Target temperature is "+str(target))
# logging.info(room+": Current temperature is "+str(temperature))
# logging.info(room+": Stopping heaters.")
# sys.stdout.flush()
# set_relay(rooms_settings[room]["relays"], "off")
# else:
# logging.debug("Relay already off.")
time.sleep(load_shedder_interval)
t1 = Thread(target=thermostat_loop)
t1.daemon = True
t1.start()
logging.info("====== Ended successfully ======")
#import argparse
#import signal
#from threading import Event
#from http.server import BaseHTTPRequestHandler
#import threading
#import socketserver
#from threading import Lock
#
#xprint_lock = Lock()
#
#def xprint(*args, **kwargs):
# """Thread safe print function"""
# with xprint_lock:
# print(*args, **kwargs)
# sys.stdout.flush()
#
#p = argparse.ArgumentParser(description='Thermostat and load shedder.')
#p.add_argument("-v", "--verbosity", help="Increase output verbosity",
# type=str, choices=['DEBUG', 'INFO', 'WARNING'], default='INFO')
#args = p.parse_args()
#
#verbosity = args.verbosity
#if verbosity == 'DEBUG':
# logging.basicConfig(level=logging.DEBUG)
#elif verbosity == 'INFO':
# logging.basicConfig(level=logging.INFO)
#elif verbosity == 'WARNING':
# logging.basicConfig(level=logging.WARNING)
#
#
#def api_help():
# return """
#### HTTP API description:
## /status
#
## /help : This message
#
#"""\
#+'Temporary forced modes (will stay in effect for '+str(int(forced_mode_duration))+' seconds):\n'\
#+'\n'.join(['# /'+target+'\n' for target in targets])\
#+'\n'\
#+'Permanent forced modes:\n'\
#+'\n'.join(['# /'+mode+'\n' for mode in modes])
#
#logging.info("====== Starting ======")
#stop = Event()
#last_data = {}
#
#def handler(signum, frame):
# global stop
# logging.info("Got interrupt: "+str(signum))
# stop.set()
# logging.info("Shutdown")
#
#signal.signal(signal.SIGTERM,handler)
#signal.signal(signal.SIGINT,handler)
#
#class MyHandler(BaseHTTPRequestHandler):
# def do_GET(self):
# global new_forced_mode
# request = self.path[1:]
# if request in targets or request in modes:
# self.send_response(200)
# new_forced_mode = request
# elif self.path == '/status':
# self.send_response(200)
# self.send_header('Content-type', 'text/plain')
# self.end_headers()
# self.wfile.write(status_as_text().encode("utf-8"))
# elif self.path == '/help':
# self.send_response(200)
# self.send_header('Content-type', 'text/plain')
# self.end_headers()
# self.wfile.write(api_help().encode("utf-8"))
# else:
# self.send_response(404)
# # This rewrites the BaseHTTP logging function
# def log_message(self, format, *args):
# if verbosity == 'INFO':
# xprint("%s - - [%s] %s" %
# (self.address_string(),
# self.log_date_time_string(),
# format%args))
#
#class WebThread(threading.Thread):
# def run(self):
# httpd.serve_forever()
#
#httpd = socketserver.TCPServer(("", http_port), MyHandler, bind_and_activate=False)
#httpd.allow_reuse_address = True
#httpd.server_bind()
#httpd.server_activate()
#webserver_thread = WebThread()
#webserver_thread.start()
#