226 lines
8.2 KiB
Python
Executable File
226 lines
8.2 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import json
|
|
import logging
|
|
import time
|
|
import signal
|
|
import yaml
|
|
import requests
|
|
import subprocess
|
|
import argparse
|
|
import threading
|
|
import socketserver
|
|
import sys
|
|
from http.server import BaseHTTPRequestHandler
|
|
from threading import Event
|
|
from datetime import datetime
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from threading import Lock
|
|
|
|
xprint_lock = Lock()
|
|
|
|
def xprint(*args, **kwargs):
|
|
"""Thread safe print function"""
|
|
with xprint_lock:
|
|
print(*args, **kwargs)
|
|
sys.stdout.flush()
|
|
|
|
parser = argparse.ArgumentParser(description='Sensors polling and metrics recording.')
|
|
parser.add_argument("-v", "--verbosity", help="Increase output verbosity",
|
|
type=str, choices=['DEBUG', 'INFO', 'WARNING'], default='INFO')
|
|
args = parser.parse_args()
|
|
|
|
verbosity = args.verbosity
|
|
logger = logging.getLogger('sensors-polling')
|
|
|
|
if verbosity == 'DEBUG':
|
|
logger.setLevel(logging.DEBUG)
|
|
elif verbosity == 'INFO':
|
|
logger.setLevel(logging.INFO)
|
|
elif verbosity == 'WARNING':
|
|
logger.setLevel(logging.WARNING)
|
|
|
|
# create console handler
|
|
ch = logging.StreamHandler()
|
|
# create formatter
|
|
formatter = logging.Formatter('[%(levelname)s] %(name)s: [%(threadName)s] %(message)s')
|
|
# add formatter to ch
|
|
ch.setFormatter(formatter)
|
|
# add ch to logger
|
|
logger.addHandler(ch)
|
|
|
|
logger.info("====== Starting ======")
|
|
|
|
stop = Event()
|
|
last_data = {}
|
|
|
|
def handler(signum, frame):
|
|
global stop
|
|
logger.info("Got interrupt: "+str(signum))
|
|
stop.set()
|
|
logger.info("Shutdown")
|
|
|
|
signal.signal(signal.SIGTERM,handler)
|
|
signal.signal(signal.SIGINT,handler)
|
|
|
|
with open('./conf.yml') as conf:
|
|
yaml_conf = yaml.safe_load(conf)
|
|
polling_conf = yaml_conf.get("polling_conf")
|
|
http_port = yaml_conf.get("http_port")
|
|
default_polling_interval = yaml_conf.get("default_polling_interval")
|
|
default_recording_interval = yaml_conf.get("default_recording_interval")
|
|
max_threads = len(polling_conf)
|
|
recording_api_key = yaml_conf.get("recording_api_key")
|
|
post_url = yaml_conf.get("post_url")
|
|
|
|
def sensors_polling(poller_conf):
|
|
global stop
|
|
global last_data
|
|
s = requests.Session()
|
|
start_time=time.time()
|
|
last_polling_time=None
|
|
last_recording_time=None
|
|
if 'polling_interval' in poller_conf.keys():
|
|
polling_interval = poller_conf['polling_interval']
|
|
else:
|
|
polling_interval = default_polling_interval
|
|
|
|
if 'recording_interval' in poller_conf.keys():
|
|
recording_interval = poller_conf['recording_interval']
|
|
else:
|
|
recording_interval = default_recording_interval
|
|
|
|
while True:
|
|
if stop.is_set():
|
|
logger.info('Stopping thread '+poller_conf['name'])
|
|
break
|
|
logger.debug('New while loop for '+poller_conf['name'])
|
|
utc_now = datetime.utcnow()
|
|
now = datetime.now()
|
|
current_time=time.time()
|
|
logger.debug('current_time: '+str(current_time))
|
|
|
|
# Polling
|
|
try:
|
|
logger.debug('Getting data for '+poller_conf['name'])
|
|
command = [poller_conf['executable']] + poller_conf['arguments']
|
|
returned_output = subprocess.check_output(command)
|
|
data = json.loads(returned_output.decode("utf-8"))
|
|
logger.debug('Got: '+returned_output.decode("utf-8"))
|
|
for metric in poller_conf['metrics']:
|
|
last_data[metric['name']] = {'value': data[metric['name']], 'timestamp': utc_now.isoformat()}
|
|
last_polling_time=time.time()
|
|
logger.debug('last_polling_time: '+str(last_polling_time))
|
|
except Exception as e:
|
|
logger.error(e)
|
|
if last_polling_time is None:
|
|
polling_missed = int((current_time - start_time) // polling_interval)
|
|
else:
|
|
polling_missed = int((current_time - last_polling_time) // polling_interval)
|
|
if polling_missed > 0:
|
|
logger.warning("Missed "+str(polling_missed)+" polling iteration(s)")
|
|
|
|
# Recording
|
|
if last_polling_time is not None:
|
|
if last_recording_time is not None:
|
|
recording_interval_elapsed = (current_time - last_recording_time > recording_interval)
|
|
polling_recent_enough = (last_polling_time > last_recording_time + recording_interval/2)
|
|
logger.debug('recording_interval_elapsed: '+str(recording_interval_elapsed))
|
|
logger.debug('polling_recent_enough: '+str(polling_recent_enough))
|
|
if last_recording_time is None or (recording_interval_elapsed and polling_recent_enough):
|
|
try:
|
|
for metric in poller_conf['metrics']:
|
|
logger.debug('Posting data for '+metric['name'])
|
|
r = s.post(post_url[metric['type']],
|
|
headers={'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:75.0) Gecko/20100101 Firefox/75.0',
|
|
'X-API-KEY': recording_api_key},
|
|
json={'metric': metric['name'],
|
|
'value': last_data[metric['name']]['value'],
|
|
'time': utc_now.isoformat()})
|
|
if r.status_code != 201:
|
|
logger.error(str(r.status_code)+" "+r.reason)
|
|
# It has to be current_time variable so the interval check works correctly
|
|
last_recording_time=current_time
|
|
logger.debug('last_recording_time: '+str(last_recording_time))
|
|
except Exception as e:
|
|
logger.error(e)
|
|
if last_recording_time is None:
|
|
recording_missed = int((current_time - start_time) // recording_interval)
|
|
else:
|
|
recording_missed = int((current_time - last_recording_time) // recording_interval)
|
|
if recording_missed > 0:
|
|
logger.warning("Missed "+str(recording_missed)+" recording iteration(s)")
|
|
|
|
# Sleeping
|
|
time_to_sleep = polling_interval - ((current_time - start_time) % polling_interval)
|
|
logger.debug('Sleeping '+str(time_to_sleep)+' seconds for '+poller_conf['name'])
|
|
stop.wait(timeout=time_to_sleep)
|
|
|
|
def metric_list():
|
|
metrics = []
|
|
for poller_conf in polling_conf:
|
|
for metric in poller_conf['metrics']:
|
|
metrics.append(metric['name'])
|
|
return metrics
|
|
|
|
class MyHandler(BaseHTTPRequestHandler):
|
|
def do_GET(self):
|
|
if self.path == '/':
|
|
self.send_response(200)
|
|
self.send_header('Content-type', 'text/plain')
|
|
self.end_headers()
|
|
self.wfile.write(bytes(str(metric_list())+'\n', 'utf-8'))
|
|
if self.path[1:] in metric_list():
|
|
self.send_response(200)
|
|
self.send_header('Content-type', 'text/plain')
|
|
self.end_headers()
|
|
self.wfile.write(bytes(json.dumps(last_data[self.path[1:]])+'\n', 'utf-8'))
|
|
else:
|
|
self.send_response(404)
|
|
# This rewrites the BaseHTTP logging function
|
|
def log_message(self, format, *args):
|
|
if verbosity == 'INFO':
|
|
xprint("%s - - [%s] %s" %
|
|
(self.address_string(),
|
|
self.log_date_time_string(),
|
|
format%args))
|
|
|
|
class WebThread(threading.Thread):
|
|
def run(self):
|
|
httpd.serve_forever()
|
|
|
|
httpd = socketserver.TCPServer(("", http_port), MyHandler, bind_and_activate=False)
|
|
httpd.allow_reuse_address = True
|
|
httpd.server_bind()
|
|
httpd.server_activate()
|
|
webserver_thread = WebThread()
|
|
webserver_thread.start()
|
|
|
|
executor = ThreadPoolExecutor(max_workers=max_threads)
|
|
threads = []
|
|
for poller_conf in polling_conf:
|
|
threads.append(executor.submit(sensors_polling, poller_conf))
|
|
|
|
logger.info("Polling "+str(metric_list()))
|
|
|
|
while True:
|
|
if stop.is_set():
|
|
executor.shutdown(wait=True)
|
|
httpd.shutdown()
|
|
httpd.server_close()
|
|
break
|
|
for thread in threads:
|
|
if not thread.running():
|
|
try:
|
|
res = thread.exception(timeout=1)
|
|
if res is not None:
|
|
logger.error(res)
|
|
except Exception as e:
|
|
logger.error(e)
|
|
stop.wait(timeout=0.5)
|
|
|
|
logger.info("====== Ended successfully ======")
|
|
|
|
# vim: set ts=4 sw=4 sts=4 et :
|