99 lines
2.7 KiB
Python
Executable File
99 lines
2.7 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# https://www.magdiblog.fr/gpio/teleinfo-edf-suivi-conso-de-votre-compteur-electrique/
|
|
|
|
import argparse
|
|
import json
|
|
import subprocess
|
|
import sys
|
|
import serial
|
|
import serial.tools.list_ports
|
|
|
|
parser = argparse.ArgumentParser(description='Téléinfo retriever.')
|
|
parser.add_argument("-f", "--format", help="Output format.",
|
|
type=str, choices=['human-readable', 'raw_json', 'custom_json'], default='human-readable')
|
|
args = parser.parse_args()
|
|
|
|
baudrate = 1200
|
|
port = None
|
|
|
|
def list_serial_ports():
|
|
ports = serial.tools.list_ports.comports()
|
|
for port, desc, hwid in sorted(ports):
|
|
print("{}: {} [{}]".format(port, desc, hwid))
|
|
|
|
def find_serial_ports():
|
|
ports = serial.tools.list_ports.comports()
|
|
for port, desc, hwid in sorted(ports):
|
|
if "Interface USB 1 TIC" in desc:
|
|
return port
|
|
|
|
port = find_serial_ports()
|
|
if port is None:
|
|
print("Unable to find serial port for Teleinfo please use one of the following")
|
|
list_serial_ports()
|
|
sys.exit(2)
|
|
|
|
tinfo = serial.Serial( port=port,
|
|
baudrate=baudrate,
|
|
parity=serial.PARITY_EVEN,
|
|
stopbits=serial.STOPBITS_ONE,
|
|
bytesize=serial.SEVENBITS )
|
|
|
|
# This can be used to detect overload warning frames
|
|
#while True:
|
|
# c = tinfo.read(1)
|
|
# #print(c, end='');
|
|
# sys.stdout.write(c.decode('utf-8'))
|
|
|
|
for x in range(16):
|
|
tinfo.read(1)
|
|
|
|
while tinfo.read(1)[0]!=0x03:
|
|
pass
|
|
|
|
lines = []
|
|
fullframe = False
|
|
line = ""
|
|
# loop thru serial
|
|
while fullframe==False:
|
|
c = tinfo.read(1)
|
|
if c[0]==0x02:
|
|
# Startframe character
|
|
pass
|
|
elif c[0]==0x03:
|
|
# Endframe character
|
|
fullframe = True
|
|
elif c[0]==0x09:
|
|
line += " "
|
|
elif c[0]==0x0d:
|
|
# Endline character
|
|
lines.append(line)
|
|
elif c[0]==0x0a:
|
|
# Startline character
|
|
line = ""
|
|
else:
|
|
line += c.decode("ascii")
|
|
|
|
res = dict()
|
|
for line in lines:
|
|
l = line.split(' ')
|
|
res[l[0]] = l[1]
|
|
|
|
if args.format == 'human-readable':
|
|
print("Puissance apparente compteur : "+str(int(res['PAPP']))+"VA")
|
|
# moins précis car Intensité arrondie à l'entier
|
|
print("Puissance apparente calculée : "+str(int(res['IINST'])*230)+"VA")
|
|
print("Puissance souscrite : 6kVA")
|
|
print("Puissance max avant coupure (marge 30%) : 7,8kVA")
|
|
print("Intensité : "+str(int(res['IINST']))+"A")
|
|
print("Intensité abonnement : "+str(int(res['ISOUSC']))+"A")
|
|
print("Consommation : "+str(int(res['BASE']))+"Wh")
|
|
elif args.format == 'raw_json':
|
|
print(json.dumps(res))
|
|
elif args.format == 'custom_json':
|
|
data = {}
|
|
data['Modane_elec_main_power'] = int(res['PAPP'])
|
|
data['Modane_elec_energy_index'] = int(res['BASE'])
|
|
print(json.dumps(data))
|
|
|