docker-sensors-polling/read_teleinfo.py

103 lines
2.8 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
2024-02-04 20:19:01 +00:00
# # pip install teleinfo
# https://pypi.org/project/teleinfo/
# https://www.magdiblog.fr/gpio/teleinfo-edf-suivi-conso-de-votre-compteur-electrique/
import argparse
import json
from teleinfo import Parser
from teleinfo.hw_vendors import UTInfo2
2024-06-08 20:00:17 +00:00
import subprocess
2024-06-09 20:36:43 +00:00
import sys
import serial
import serial.tools.list_ports
2024-02-04 20:19:01 +00:00
parser = argparse.ArgumentParser(description='Téléinfo retriever.')
parser.add_argument("-f", "--format", help="Output format.",
type=str, choices=['human-readable', 'raw_json', 'custom_json'], default='human-readable')
args = parser.parse_args()
2024-06-09 20:36:43 +00:00
baudrate = 1200
port = None
def list_serial_ports():
ports = serial.tools.list_ports.comports()
for port, desc, hwid in sorted(ports):
print("{}: {} [{}]".format(port, desc, hwid))
2024-06-08 20:00:17 +00:00
2024-06-09 20:36:43 +00:00
def find_serial_ports():
ports = serial.tools.list_ports.comports()
for port, desc, hwid in sorted(ports):
if "Interface USB 1 TIC" in desc:
return port
2024-06-08 20:00:17 +00:00
2024-06-09 20:36:43 +00:00
port = find_serial_ports()
if port is None:
print("Unable to find serial port for Teleinfo please use one of the following")
list_serial_ports()
sys.exit(2)
tinfo = serial.Serial( port=port,
baudrate=baudrate,
parity=serial.PARITY_EVEN,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.SEVENBITS )
# This can be used to detect overload warning frames
#while True:
# c = tinfo.read(1)
# #print(c, end='');
# sys.stdout.write(c.decode('utf-8'))
for x in range(16):
tinfo.read(1)
while tinfo.read(1)[0]!=0x03:
pass
lines = []
fullframe = False
line = ""
# loop thru serial
while fullframe==False:
c = tinfo.read(1)
if c[0]==0x02:
# Startframe character
pass
elif c[0]==0x03:
# Endframe character
fullframe = True
elif c[0]==0x09:
line += " "
elif c[0]==0x0d:
# Endline character
lines.append(line)
elif c[0]==0x0a:
# Startline character
line = ""
else:
line += c.decode("ascii")
res = dict()
for line in lines:
l = line.split(' ')
res[l[0]] = l[1]
2024-02-04 20:19:01 +00:00
if args.format == 'human-readable':
print("Puissance apparente compteur : "+str(int(res['PAPP']))+"VA")
2024-02-04 20:19:01 +00:00
# moins précis car Intensité arrondie à l'entier
print("Puissance apparente calculée : "+str(int(res['IINST'])*230)+"VA")
print("Puissance souscrite : 6kVA")
print("Puissance max avant coupure (marge 30%) : 7,8kVA")
print("Intensité : "+str(int(res['IINST']))+"A")
print("Intensité abonnement : "+str(int(res['ISOUSC']))+"A")
print("Consommation : "+str(int(res['BASE']))+"Wh")
2024-02-04 20:19:01 +00:00
elif args.format == 'raw_json':
print(json.dumps(res))
2024-02-04 20:19:01 +00:00
elif args.format == 'custom_json':
data = {}
data['Modane_elec_main_power'] = int(res['PAPP'])
data['Modane_elec_energy_index'] = int(res['BASE'])
print(json.dumps(data))
2024-06-09 20:36:43 +00:00