docker-sensors-polling/sensors-polling.py

226 lines
8.2 KiB
Python
Raw Normal View History

2024-02-04 20:19:01 +00:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
import logging
import time
import signal
import yaml
import requests
import subprocess
import argparse
import threading
import socketserver
2024-02-05 01:09:25 +00:00
import sys
2024-02-04 20:19:01 +00:00
from http.server import BaseHTTPRequestHandler
from threading import Event
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
2024-02-05 01:09:25 +00:00
from threading import Lock
xprint_lock = Lock()
def xprint(*args, **kwargs):
"""Thread safe print function"""
with xprint_lock:
print(*args, **kwargs)
sys.stdout.flush()
2024-02-04 20:19:01 +00:00
parser = argparse.ArgumentParser(description='Sensors polling and metrics recording.')
parser.add_argument("-v", "--verbosity", help="Increase output verbosity",
type=str, choices=['DEBUG', 'INFO', 'WARNING'], default='INFO')
args = parser.parse_args()
2024-02-05 01:09:25 +00:00
verbosity = args.verbosity
2024-02-10 09:00:12 +00:00
logger = logging.getLogger('sensors-polling')
2024-02-05 01:09:25 +00:00
if verbosity == 'DEBUG':
2024-02-10 09:00:12 +00:00
logger.setLevel(logging.DEBUG)
2024-02-05 01:09:25 +00:00
elif verbosity == 'INFO':
2024-02-10 09:00:12 +00:00
logger.setLevel(logging.INFO)
2024-02-05 01:09:25 +00:00
elif verbosity == 'WARNING':
2024-02-10 09:00:12 +00:00
logger.setLevel(logging.WARNING)
# create console handler
ch = logging.StreamHandler()
# create formatter
formatter = logging.Formatter('[%(levelname)s] %(name)s: [%(threadName)s] %(message)s')
# add formatter to ch
ch.setFormatter(formatter)
# add ch to logger
logger.addHandler(ch)
2024-02-04 20:19:01 +00:00
2024-02-10 09:00:12 +00:00
logger.info("====== Starting ======")
2024-02-04 20:19:01 +00:00
stop = Event()
last_data = {}
def handler(signum, frame):
global stop
2024-02-10 09:00:12 +00:00
logger.info("Got interrupt: "+str(signum))
2024-02-04 20:19:01 +00:00
stop.set()
2024-02-10 09:00:12 +00:00
logger.info("Shutdown")
2024-02-04 20:19:01 +00:00
signal.signal(signal.SIGTERM,handler)
signal.signal(signal.SIGINT,handler)
with open('./conf.yml') as conf:
2024-06-08 18:59:23 +00:00
yaml_conf = yaml.safe_load(conf)
2024-02-04 20:19:01 +00:00
polling_conf = yaml_conf.get("polling_conf")
http_port = yaml_conf.get("http_port")
default_polling_interval = yaml_conf.get("default_polling_interval")
default_recording_interval = yaml_conf.get("default_recording_interval")
max_threads = len(polling_conf)
recording_api_key = yaml_conf.get("recording_api_key")
post_url = yaml_conf.get("post_url")
def sensors_polling(poller_conf):
global stop
global last_data
s = requests.Session()
start_time=time.time()
last_polling_time=None
last_recording_time=None
if 'polling_interval' in poller_conf.keys():
polling_interval = poller_conf['polling_interval']
else:
polling_interval = default_polling_interval
if 'recording_interval' in poller_conf.keys():
recording_interval = poller_conf['recording_interval']
else:
recording_interval = default_recording_interval
while True:
if stop.is_set():
2024-02-10 09:00:12 +00:00
logger.info('Stopping thread '+poller_conf['name'])
2024-02-04 20:19:01 +00:00
break
2024-02-10 09:00:12 +00:00
logger.debug('New while loop for '+poller_conf['name'])
2024-02-04 20:19:01 +00:00
utc_now = datetime.utcnow()
now = datetime.now()
current_time=time.time()
2024-02-10 09:00:12 +00:00
logger.debug('current_time: '+str(current_time))
2024-02-04 20:19:01 +00:00
# Polling
try:
2024-02-10 09:00:12 +00:00
logger.debug('Getting data for '+poller_conf['name'])
2024-02-04 23:21:47 +00:00
command = [poller_conf['executable']] + poller_conf['arguments']
2024-02-04 20:19:01 +00:00
returned_output = subprocess.check_output(command)
data = json.loads(returned_output.decode("utf-8"))
2024-02-10 09:00:12 +00:00
logger.debug('Got: '+returned_output.decode("utf-8"))
2024-02-04 20:19:01 +00:00
for metric in poller_conf['metrics']:
last_data[metric['name']] = {'value': data[metric['name']], 'timestamp': utc_now.isoformat()}
last_polling_time=time.time()
2024-02-10 09:00:12 +00:00
logger.debug('last_polling_time: '+str(last_polling_time))
2024-02-04 20:19:01 +00:00
except Exception as e:
2024-02-10 09:00:12 +00:00
logger.error(e)
2024-02-04 20:19:01 +00:00
if last_polling_time is None:
polling_missed = int((current_time - start_time) // polling_interval)
else:
polling_missed = int((current_time - last_polling_time) // polling_interval)
if polling_missed > 0:
2024-02-10 09:00:12 +00:00
logger.warning("Missed "+str(polling_missed)+" polling iteration(s)")
2024-02-04 20:19:01 +00:00
# Recording
2024-02-10 09:00:12 +00:00
if last_polling_time is not None:
if last_recording_time is not None:
recording_interval_elapsed = (current_time - last_recording_time > recording_interval)
polling_recent_enough = (last_polling_time > last_recording_time + recording_interval/2)
logger.debug('recording_interval_elapsed: '+str(recording_interval_elapsed))
logger.debug('polling_recent_enough: '+str(polling_recent_enough))
if last_recording_time is None or (recording_interval_elapsed and polling_recent_enough):
try:
for metric in poller_conf['metrics']:
logger.debug('Posting data for '+metric['name'])
r = s.post(post_url[metric['type']],
headers={'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:75.0) Gecko/20100101 Firefox/75.0',
'X-API-KEY': recording_api_key},
json={'metric': metric['name'],
'value': last_data[metric['name']]['value'],
'time': utc_now.isoformat()})
if r.status_code != 201:
logger.error(str(r.status_code)+" "+r.reason)
# It has to be current_time variable so the interval check works correctly
last_recording_time=current_time
logger.debug('last_recording_time: '+str(last_recording_time))
except Exception as e:
logger.error(e)
2024-02-04 20:19:01 +00:00
if last_recording_time is None:
recording_missed = int((current_time - start_time) // recording_interval)
else:
recording_missed = int((current_time - last_recording_time) // recording_interval)
if recording_missed > 0:
2024-02-10 09:00:12 +00:00
logger.warning("Missed "+str(recording_missed)+" recording iteration(s)")
2024-02-04 20:19:01 +00:00
# Sleeping
time_to_sleep = polling_interval - ((current_time - start_time) % polling_interval)
2024-02-10 09:00:12 +00:00
logger.debug('Sleeping '+str(time_to_sleep)+' seconds for '+poller_conf['name'])
2024-02-04 20:19:01 +00:00
stop.wait(timeout=time_to_sleep)
def metric_list():
2024-02-05 00:05:19 +00:00
metrics = []
for poller_conf in polling_conf:
for metric in poller_conf['metrics']:
metrics.append(metric['name'])
return metrics
2024-02-04 20:19:01 +00:00
class MyHandler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path == '/':
self.send_response(200)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(bytes(str(metric_list())+'\n', 'utf-8'))
if self.path[1:] in metric_list():
self.send_response(200)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(bytes(json.dumps(last_data[self.path[1:]])+'\n', 'utf-8'))
else:
self.send_response(404)
2024-02-05 01:09:25 +00:00
# This rewrites the BaseHTTP logging function
def log_message(self, format, *args):
if verbosity == 'INFO':
xprint("%s - - [%s] %s" %
(self.address_string(),
self.log_date_time_string(),
format%args))
2024-02-04 20:19:01 +00:00
class WebThread(threading.Thread):
def run(self):
httpd.serve_forever()
httpd = socketserver.TCPServer(("", http_port), MyHandler, bind_and_activate=False)
httpd.allow_reuse_address = True
httpd.server_bind()
httpd.server_activate()
webserver_thread = WebThread()
webserver_thread.start()
executor = ThreadPoolExecutor(max_workers=max_threads)
threads = []
for poller_conf in polling_conf:
threads.append(executor.submit(sensors_polling, poller_conf))
2024-02-10 09:00:12 +00:00
logger.info("Polling "+str(metric_list()))
2024-02-04 20:19:01 +00:00
while True:
if stop.is_set():
executor.shutdown(wait=True)
httpd.shutdown()
httpd.server_close()
break
for thread in threads:
if not thread.running():
try:
res = thread.exception(timeout=1)
if res is not None:
2024-02-10 09:00:12 +00:00
logger.error(res)
2024-02-04 20:19:01 +00:00
except Exception as e:
2024-02-10 09:00:12 +00:00
logger.error(e)
2024-02-04 20:19:01 +00:00
stop.wait(timeout=0.5)
2024-02-10 09:00:12 +00:00
logger.info("====== Ended successfully ======")
2024-02-04 20:19:01 +00:00
# vim: set ts=4 sw=4 sts=4 et :