1
0
Fork 0
wasp-os/tools/wasptool

336 lines
9.3 KiB
Text
Raw Normal View History

#!/usr/bin/env python3
# SPDX-License-Identifier: LGPL-3.0-or-later
# Copyright (c) 2020 Daniel Thompson
import argparse
import io
import random
import os.path
import pexpect
import time
import string
import sys
def pbar(iterable, quiet=False):
step = 100 / len(iterable)
for i, v in enumerate(iterable):
if not quiet:
percent = round(step * i, 1)
bar = int(percent) // 2
print(f'[{"#"*bar}{"."*(50-bar)}] {percent}%', end='\r', flush=True)
yield v
if not quiet:
print(f'[{"#"*50}] 100% ')
def sync(c):
"""Stop the watch and synchronize with the command prompt.
Sending a random print ensure the final export (of the prompt)
does not accidentally match a previously issued prompt.
"""
tag = ''.join([random.choice(string.ascii_uppercase) for i in range(6)])
c.send('\x03')
c.expect('>>> ')
c.sendline(f'print("{tag[:3]}""{tag[3:]}")')
c.expect(tag)
c.expect('>>> ')
def unsync(c):
"""Set the watch running again.
There must be an expect (or a sleep) since if we kill the subordinate
process too early then the sendline will not have completed.
"""
c.sendline('wasp.system.run()')
c.expect(['Watch is running, use Ctrl-C to stop',
'Watch already running in the background'])
c.send('\x18')
def paste(c, f, verbose=False, chunk=None):
docstring = False
tosend = []
for ln in f.readlines():
ln = ln.rstrip()
# This is a bit loose (definitely not PEP-257 compliant) but
# is enough for most code.
if ln.lstrip().startswith('"""'):
docstring = True
if docstring:
if ln.rstrip().endswith('"""'):
docstring = False
continue
if ln.lstrip().startswith('#'):
continue
if ln.strip() == '':
continue
tosend.append(ln)
for ln in pbar(tosend, verbose):
if chunk and ln.startswith('class'):
chunk()
c.sendline(ln)
choice = c.expect(['=== ', 'FATAL: uncaught exception [0-9a-f\r]*\n'])
if choice == 1:
# Capture and display the error message, then exit
if not verbose:
print('\n~~~')
while choice == 1:
if not verbose:
print(c.match.group(0).rstrip(), file=sys.stderr)
choice = c.expect([pexpect.TIMEOUT, '.*\n'], timeout=2)
if not verbose:
print('~~~')
print('\nPlease reboot your device', file=sys.stderr)
sys.exit(16)
def print_log(logfile):
lines = logfile.getvalue().split('\n')
lines = [ l.strip('\x04\r') for l in lines ]
output = [ l for l in lines if l and l != '>>> ' ]
if output:
print('~~~')
print('\n'.join(output))
print('~~~')
def handle_eval(c, cmd):
verbose = bool(c.logfile)
c.send('\x05')
c.expect('=== ')
c.sendline(cmd)
c.expect('=== ')
if not verbose:
c.logfile = io.StringIO()
c.send('\x04')
c.expect('>>> ')
if not verbose:
print_log(c.logfile)
c.logfile.close()
c.logfile = None
def handle_exec(c, fname):
verbose = bool(c.logfile)
log = io.StringIO()
def chunk():
if not verbose:
c.logfile = log
c.send('\x04')
c.expect('>>> ')
if not verbose:
c.logfile = None
c.send('\x05')
c.expect('=== ')
with open(fname) as f:
if not verbose:
print(f'Preparing to run {fname}:')
c.send('\x05')
c.expect('=== ')
paste(c, f, verbose, chunk)
if not verbose:
c.logfile = log
c.send('\x04')
c.expect('>>> ')
if not verbose:
c.logfile = None
print_log(log)
log.close()
def handle_reset(c, ota=False):
cmd = 'reset'
if ota:
cmd = 'enter_ota_dfu'
c.send('\x05')
c.expect('=== ')
c.sendline('import machine')
c.expect('=== ')
c.sendline(f'machine.{cmd}()')
c.expect('=== ')
c.send('\x04')
def handle_rtc(c):
# Wait for the clock to tick over to the next second
now = then = time.localtime()
while now[5] == then[5]:
now = time.localtime()
# Set the time
c.sendline(f'watch.rtc.set_localtime(({now[0]}, {now[1]}, {now[2]}, {now[3]}, {now[4]}, {now[5]}, {now[6]}, {now[7]}))')
c.expect('>>> ')
def check_rtc(c):
c.sendline('print(watch.rtc.get_localtime())')
c.expect('\(([0-9]+), ([0-9]+), ([0-9]+), ([0-9]+), ([0-9]+), ([0-9]+), ([0-9]+), ([0-9]+)\)')
t = time.localtime()
watch_hms = (int(c.match[4]), int(c.match[5]), int(c.match[6]))
watch_str = f'{watch_hms[0]:02d}:{watch_hms[1]:02d}:{watch_hms[2]:02d}'
host_hms = (t.tm_hour, t.tm_min, t.tm_sec)
host_str = f'{host_hms[0]:02d}:{host_hms[1]:02d}:{host_hms[2]:02d}'
delta = 3600 * (host_hms[0] - watch_hms[0]) + \
60 * (host_hms[1] - watch_hms[1]) + \
(host_hms[2] - watch_hms[2])
print(f"PC <-> watch: {watch_str} <-> {host_str} (delta {delta})")
c.expect('>>> ')
def handle_binary_upload(c, fname):
verbose = bool(c.logfile)
c.sendline(f'f = open("{os.path.basename(fname)}", "wb")')
c.expect('>>> ')
# Absorb the file to be uploaded
with open(fname, 'rb') as f:
data = f.read()
chunksz = 24
nchunks = len(data) // chunksz
lastchunk = len(data) % chunksz
if not verbose:
print(f'Uploading {fname}:')
# Send the data
for i in pbar(range(0, chunksz*nchunks, chunksz), verbose):
c.sendline(f'f.write({repr(data[i:i+chunksz])})')
c.expect('>>> ')
if lastchunk:
c.sendline(f'f.write({repr(data[-lastchunk:])})')
c.expect('>>> ')
c.sendline('f.close()')
c.expect('>>> ')
def handle_upload(c, fname):
verbose = bool(c.logfile)
c.sendline('from shell import upload')
c.expect('>>> ')
with open(fname) as f:
if not verbose:
print(f'Uploading {fname}:')
c.sendline(f'upload("{os.path.basename(fname)}")')
c.expect('=== ')
paste(c, f, verbose)
c.send('\x04')
c.expect('>>> ')
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description='Wasp-os command and control client')
parser.add_argument('--bootloader', action='store_true',
help="Reboot into the bootloader mode for OTA update")
parser.add_argument('--binary', action='store_true',
help="Enable non-ASCII mode for suitable commands (such as upload)")
parser.add_argument('--console', action='store_true',
help='Launch a REPL session')
parser.add_argument('--check-rtc', action='store_true',
help='Compare workstation and watch times')
parser.add_argument('--device',
help='Connect only to a specific named device (or MAC address)')
parser.add_argument('--exec',
help='Execute the contents of a file')
parser.add_argument('--eval',
help='Execute the provided python string')
parser.add_argument('--reset', action='store_true',
help="Reboot the device (and don't stay in bootloader mode)")
parser.add_argument('--ota',
help="Deliver an OTA update to the device")
parser.add_argument('--rtc', action='store_true',
help='Set the time on the wasp-os device')
parser.add_argument('--upload',
help='Copy the specified file to the wasp-os device')
parser.add_argument('--verbose', action='store_true',
help='Log interaction with the wasp-os device')
args = parser.parse_args()
if args.device:
if ':' in args.device:
device_args = ' --address ' + args.device
else:
device_args = ' --device ' + args.device
else:
device_args = ''
pynus = os.path.dirname(sys.argv[0]) + '/pynus/pynus.py' + device_args
console = pexpect.spawn(pynus, encoding='UTF-8')
if args.verbose:
console.logfile = sys.stdout
else:
console.logfile = io.StringIO()
try:
console.expect('Connect.*\(([0-9A-F:]*)\)')
except pexpect.exceptions.TIMEOUT:
print('ERROR: Cannot find suitable wasp-os device')
if not args.verbose:
print_log(console.logfile)
sys.exit(1)
macaddr = console.match.group(1)
console.expect('Exit console using Ctrl-X')
time.sleep(0.5)
sync(console)
if args.rtc:
handle_rtc(console)
if args.check_rtc:
check_rtc(console)
if args.exec:
handle_exec(console, args.exec)
if args.eval:
handle_eval(console, args.eval)
if args.upload:
if args.binary:
handle_binary_upload(console, args.upload)
else:
handle_upload(console, args.upload)
if args.console:
console.close()
argv = pynus.split()
os.execv(argv[0], argv)
if args.ota:
handle_reset(console, ota=True)
time.sleep(1.0)
dfu = os.path.dirname(sys.argv[0]) + '/ota-dfu/dfu.py'
os.execl(dfu, dfu, '-z', args.ota, '-a', macaddr, '--legacy')
if args.reset:
handle_reset(console)
sys.exit(0)
if args.bootloader:
handle_reset(console, ota=True)
sys.exit(0)
unsync(console)