2024-06-07 20:03:40 +00:00
|
|
|
#!/usr/bin/env python3
|
|
|
|
# Pilotage GCE USB 8 Relay Board
|
|
|
|
import time
|
|
|
|
import serial
|
2024-06-10 18:42:55 +00:00
|
|
|
import serial.tools.list_ports
|
2024-06-07 20:03:40 +00:00
|
|
|
import argparse
|
|
|
|
import sys
|
|
|
|
import subprocess
|
|
|
|
|
|
|
|
parser = argparse.ArgumentParser(description='Control relays.')
|
|
|
|
parser.add_argument('relay_list', type=str,
|
|
|
|
help='list of relays in ["1", ..., "8"] separated by comma, no spaces. "all" means all relays.')
|
|
|
|
parser.add_argument('action', type=str, choices=['on', 'off', 'status'],
|
|
|
|
help='Action on the list of relays.')
|
|
|
|
|
|
|
|
args = parser.parse_args()
|
|
|
|
|
|
|
|
if args.relay_list == 'all':
|
|
|
|
relays = [ 'RLY'+str(i) for i in range(1,9)]
|
|
|
|
else:
|
|
|
|
relays = []
|
|
|
|
for relay in args.relay_list.split(','):
|
|
|
|
if relay in [ str(i) for i in range(1,9)]:
|
|
|
|
relays.append('RLY'+relay)
|
|
|
|
else:
|
|
|
|
print("ERROR: Relay '"+relay+"' does not exist.")
|
|
|
|
sys.exit(1)
|
|
|
|
|
|
|
|
if args.action == 'on':
|
|
|
|
action = '1'
|
|
|
|
elif args.action == 'off':
|
|
|
|
action = '0'
|
|
|
|
else:
|
|
|
|
action = 'status'
|
|
|
|
|
|
|
|
#print(relays)
|
|
|
|
|
2024-06-10 18:42:55 +00:00
|
|
|
port = None
|
|
|
|
|
|
|
|
def list_serial_ports():
|
|
|
|
ports = serial.tools.list_ports.comports()
|
|
|
|
for port, desc, hwid in sorted(ports):
|
|
|
|
print("{}: {} [{}]".format(port, desc, hwid))
|
2024-06-07 20:03:40 +00:00
|
|
|
|
2024-06-10 18:42:55 +00:00
|
|
|
def find_serial_ports():
|
|
|
|
ports = serial.tools.list_ports.comports()
|
|
|
|
for port, desc, hwid in sorted(ports):
|
|
|
|
if "FT232R USB UART" in desc:
|
|
|
|
return port
|
2024-06-07 20:03:40 +00:00
|
|
|
|
2024-06-10 18:42:55 +00:00
|
|
|
port = find_serial_ports()
|
|
|
|
if port is None:
|
|
|
|
print("Unable to find serial port for relay board please use one of the following")
|
|
|
|
list_serial_ports()
|
|
|
|
sys.exit(2)
|
2024-06-07 20:03:40 +00:00
|
|
|
|
|
|
|
def init_serial(pPort):
|
|
|
|
global ser
|
|
|
|
ser = serial.Serial()
|
|
|
|
ser.baudrate = 9600
|
|
|
|
ser.port = pPort
|
|
|
|
ser.bytesize = 8
|
|
|
|
ser.parity = 'N'
|
|
|
|
ser.timeout = None
|
|
|
|
ser.xonxoff = False
|
|
|
|
ser.rtscts=False
|
|
|
|
ser.dsrdtr=False
|
|
|
|
ser.timeout = 1
|
|
|
|
ser.open()
|
|
|
|
|
|
|
|
if ser.isOpen():
|
|
|
|
pass
|
|
|
|
#print('Connected to ' + ser.portstr)
|
|
|
|
else:
|
|
|
|
print('Could not connect to ' + ser.portstr)
|
|
|
|
sys.exit(1)
|
|
|
|
|
2024-06-10 18:42:55 +00:00
|
|
|
init_serial(port)
|
2024-06-07 20:03:40 +00:00
|
|
|
if action != 'status':
|
|
|
|
for relay in relays:
|
2024-06-08 09:17:21 +00:00
|
|
|
command = relay+action+'\r\n'
|
|
|
|
ser.write(command.encode('ascii'))
|
2024-06-07 20:03:40 +00:00
|
|
|
bytes = ser.readline()
|
|
|
|
#print ('Renvoie :\r\n' + bytes)
|
|
|
|
|
2024-06-08 07:53:53 +00:00
|
|
|
ser.write('?RLY\r\n'.encode('ascii'))
|
2024-06-08 08:17:56 +00:00
|
|
|
answer = ser.readline().decode('ascii')
|
2024-06-07 20:03:40 +00:00
|
|
|
status = ''
|
|
|
|
for char in answer:
|
|
|
|
if char in ['0', '1']:
|
|
|
|
status+= char
|
|
|
|
if len(status) != 8:
|
|
|
|
print(len(status))
|
|
|
|
print("ERROR: status cannot be parsed.")
|
|
|
|
print('status:\r\n' + status)
|
|
|
|
sys.exit(1)
|
|
|
|
elif action != 'status':
|
|
|
|
print ('status:\r\n' + status)
|
|
|
|
|
|
|
|
if action != 'status':
|
|
|
|
for i in range(1,9):
|
|
|
|
relay = status[i-1]
|
|
|
|
if relay not in ['0', '1']:
|
|
|
|
print("ERROR: unrecognized value '"+relay+"' for relay "+str(i))
|
|
|
|
elif relay in relays and relay != action:
|
|
|
|
print("ERROR: wrong status '"+relay+"' for relay "+str(i))
|
|
|
|
else:
|
|
|
|
for i in [int(relay[3]) for relay in relays]:
|
|
|
|
relay = status[i-1]
|
|
|
|
if relay not in ['0', '1']:
|
|
|
|
print("ERROR: unrecognized value '"+relay+"' for relay "+str(i))
|
|
|
|
else:
|
|
|
|
print(relay, end='')
|
|
|
|
|
|
|
|
ser.close()
|