docker-sensors-polling/sensors-polling.py

226 lines
8.2 KiB
Python
Executable File

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
import logging
import time
import signal
import yaml
import requests
import subprocess
import argparse
import threading
import socketserver
import sys
from http.server import BaseHTTPRequestHandler
from threading import Event
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
from threading import Lock
xprint_lock = Lock()
def xprint(*args, **kwargs):
"""Thread safe print function"""
with xprint_lock:
print(*args, **kwargs)
sys.stdout.flush()
parser = argparse.ArgumentParser(description='Sensors polling and metrics recording.')
parser.add_argument("-v", "--verbosity", help="Increase output verbosity",
type=str, choices=['DEBUG', 'INFO', 'WARNING'], default='INFO')
args = parser.parse_args()
verbosity = args.verbosity
logger = logging.getLogger('sensors-polling')
if verbosity == 'DEBUG':
logger.setLevel(logging.DEBUG)
elif verbosity == 'INFO':
logger.setLevel(logging.INFO)
elif verbosity == 'WARNING':
logger.setLevel(logging.WARNING)
# create console handler
ch = logging.StreamHandler()
# create formatter
formatter = logging.Formatter('[%(levelname)s] %(name)s: [%(threadName)s] %(message)s')
# add formatter to ch
ch.setFormatter(formatter)
# add ch to logger
logger.addHandler(ch)
logger.info("====== Starting ======")
stop = Event()
last_data = {}
def handler(signum, frame):
global stop
logger.info("Got interrupt: "+str(signum))
stop.set()
logger.info("Shutdown")
signal.signal(signal.SIGTERM,handler)
signal.signal(signal.SIGINT,handler)
with open('./conf.yml') as conf:
yaml_conf = yaml.safe_load(conf)
polling_conf = yaml_conf.get("polling_conf")
http_port = yaml_conf.get("http_port")
default_polling_interval = yaml_conf.get("default_polling_interval")
default_recording_interval = yaml_conf.get("default_recording_interval")
max_threads = len(polling_conf)
recording_api_key = yaml_conf.get("recording_api_key")
post_url = yaml_conf.get("post_url")
def sensors_polling(poller_conf):
global stop
global last_data
s = requests.Session()
start_time=time.time()
last_polling_time=None
last_recording_time=None
if 'polling_interval' in poller_conf.keys():
polling_interval = poller_conf['polling_interval']
else:
polling_interval = default_polling_interval
if 'recording_interval' in poller_conf.keys():
recording_interval = poller_conf['recording_interval']
else:
recording_interval = default_recording_interval
while True:
if stop.is_set():
logger.info('Stopping thread '+poller_conf['name'])
break
logger.debug('New while loop for '+poller_conf['name'])
utc_now = datetime.utcnow()
now = datetime.now()
current_time=time.time()
logger.debug('current_time: '+str(current_time))
# Polling
try:
logger.debug('Getting data for '+poller_conf['name'])
command = [poller_conf['executable']] + poller_conf['arguments']
returned_output = subprocess.check_output(command)
data = json.loads(returned_output.decode("utf-8"))
logger.debug('Got: '+returned_output.decode("utf-8"))
for metric in poller_conf['metrics']:
last_data[metric['name']] = {'value': data[metric['name']], 'timestamp': utc_now.isoformat()}
last_polling_time=time.time()
logger.debug('last_polling_time: '+str(last_polling_time))
except Exception as e:
logger.error(e)
if last_polling_time is None:
polling_missed = int((current_time - start_time) // polling_interval)
else:
polling_missed = int((current_time - last_polling_time) // polling_interval)
if polling_missed > 0:
logger.warning("Missed "+str(polling_missed)+" polling iteration(s)")
# Recording
if last_polling_time is not None:
if last_recording_time is not None:
recording_interval_elapsed = (current_time - last_recording_time > recording_interval)
polling_recent_enough = (last_polling_time > last_recording_time + recording_interval/2)
logger.debug('recording_interval_elapsed: '+str(recording_interval_elapsed))
logger.debug('polling_recent_enough: '+str(polling_recent_enough))
if last_recording_time is None or (recording_interval_elapsed and polling_recent_enough):
try:
for metric in poller_conf['metrics']:
logger.debug('Posting data for '+metric['name'])
#r = s.post(post_url[metric['type']],
# headers={'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:75.0) Gecko/20100101 Firefox/75.0',
# 'X-API-KEY': recording_api_key},
# json={'metric': metric['name'],
# 'value': last_data[metric['name']]['value'],
# 'time': utc_now.isoformat()})
#if r.status_code != 201:
# logger.error(str(r.status_code)+" "+r.reason)
# It has to be current_time variable so the interval check works correctly
last_recording_time=current_time
logger.debug('last_recording_time: '+str(last_recording_time))
except Exception as e:
logger.error(e)
if last_recording_time is None:
recording_missed = int((current_time - start_time) // recording_interval)
else:
recording_missed = int((current_time - last_recording_time) // recording_interval)
if recording_missed > 0:
logger.warning("Missed "+str(recording_missed)+" recording iteration(s)")
# Sleeping
time_to_sleep = polling_interval - ((current_time - start_time) % polling_interval)
logger.debug('Sleeping '+str(time_to_sleep)+' seconds for '+poller_conf['name'])
stop.wait(timeout=time_to_sleep)
def metric_list():
metrics = []
for poller_conf in polling_conf:
for metric in poller_conf['metrics']:
metrics.append(metric['name'])
return metrics
class MyHandler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path == '/':
self.send_response(200)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(bytes(str(metric_list())+'\n', 'utf-8'))
if self.path[1:] in metric_list():
self.send_response(200)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(bytes(json.dumps(last_data[self.path[1:]])+'\n', 'utf-8'))
else:
self.send_response(404)
# This rewrites the BaseHTTP logging function
def log_message(self, format, *args):
if verbosity == 'INFO':
xprint("%s - - [%s] %s" %
(self.address_string(),
self.log_date_time_string(),
format%args))
class WebThread(threading.Thread):
def run(self):
httpd.serve_forever()
httpd = socketserver.TCPServer(("", http_port), MyHandler, bind_and_activate=False)
httpd.allow_reuse_address = True
httpd.server_bind()
httpd.server_activate()
webserver_thread = WebThread()
webserver_thread.start()
executor = ThreadPoolExecutor(max_workers=max_threads)
threads = []
for poller_conf in polling_conf:
threads.append(executor.submit(sensors_polling, poller_conf))
logger.info("Polling "+str(metric_list()))
while True:
if stop.is_set():
executor.shutdown(wait=True)
httpd.shutdown()
httpd.server_close()
break
for thread in threads:
if not thread.running():
try:
res = thread.exception(timeout=1)
if res is not None:
logger.error(res)
except Exception as e:
logger.error(e)
stop.wait(timeout=0.5)
logger.info("====== Ended successfully ======")
# vim: set ts=4 sw=4 sts=4 et :