Add a run_command method for the "console". This allows running a command on the target and capturing the result. Normally this is handled using REPLWrapper but that doesn't work well with the NUS console because local echo gets in the way. Signed-off-by: Daniel Thompson <daniel@redfelineninja.org.uk>
431 lines
12 KiB
Python
Executable file
431 lines
12 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
|
|
# SPDX-License-Identifier: LGPL-3.0-or-later
|
|
# Copyright (c) 2020 Daniel Thompson
|
|
|
|
import argparse
|
|
import io
|
|
import random
|
|
import os.path
|
|
import pexpect
|
|
import time
|
|
import types
|
|
import string
|
|
import sys
|
|
|
|
def draw_pbar(percent, quiet=False, end='\r'):
|
|
if not quiet:
|
|
if percent > 100:
|
|
percent = 100
|
|
bar = int(percent) // 2
|
|
print(f'[{"#"*bar}{"."*(50-bar)}] {percent}% ', end=end, flush=True)
|
|
|
|
def pbar(iterable, quiet=False):
|
|
step = 100 / len(iterable)
|
|
|
|
for i, v in enumerate(iterable):
|
|
draw_pbar(round(step * i, 1), quiet)
|
|
yield v
|
|
if not quiet:
|
|
draw_pbar(100, quiet, None)
|
|
|
|
def run_command(c, cmd):
|
|
"""Cheap and cheerful command wrapper.
|
|
|
|
This differs from REPLWrapper because it assumes the remote end will
|
|
echo the characters... and that we must eat them before handing over
|
|
the results to the caller.
|
|
"""
|
|
c.sendline(cmd)
|
|
c.expect_exact(cmd)
|
|
c.expect('>>> ')
|
|
return c.before.replace('\r\r\n', '\n').strip('\n')
|
|
|
|
def sync(c):
|
|
"""Stop the watch and synchronize with the command prompt.
|
|
|
|
Sending a random print ensure the final export (of the prompt)
|
|
does not accidentally match a previously issued prompt.
|
|
"""
|
|
verbose = bool(c.logfile)
|
|
tag = ''.join([random.choice(string.ascii_uppercase) for i in range(6)])
|
|
|
|
|
|
try:
|
|
if not verbose:
|
|
c.logfile = io.StringIO()
|
|
|
|
c.send('\x03')
|
|
c.expect('>>> ')
|
|
c.sendline(f'print("{tag[:3]}""{tag[3:]}")')
|
|
c.expect(tag)
|
|
c.expect('>>> ')
|
|
|
|
if not verbose:
|
|
c.logfile.close()
|
|
c.logfile = None
|
|
except pexpect.exceptions.EOF:
|
|
print("ERROR: Cannot sync with device")
|
|
print_log(c.logfile)
|
|
sys.exit(1)
|
|
|
|
def unsync(c):
|
|
"""Set the watch running again.
|
|
|
|
There must be an expect (or a sleep) since if we kill the subordinate
|
|
process too early then the sendline will not have completed.
|
|
"""
|
|
c.sendline('wasp.system.run()')
|
|
c.expect(['Watch is running, use Ctrl-C to stop',
|
|
'Watch already running in the background'])
|
|
c.send('\x18')
|
|
|
|
def paste(c, f, verbose=False, chunk=None):
|
|
docstring = False
|
|
|
|
tosend = []
|
|
|
|
for ln in f.readlines():
|
|
ln = ln.rstrip()
|
|
|
|
# This is a bit loose (definitely not PEP-257 compliant) but
|
|
# is enough for most code.
|
|
if ln.lstrip().startswith('"""'):
|
|
docstring = True
|
|
if docstring:
|
|
if ln.rstrip().endswith('"""'):
|
|
docstring = False
|
|
continue
|
|
|
|
if ln.lstrip().startswith('#'):
|
|
continue
|
|
|
|
if ln.strip() == '':
|
|
continue
|
|
|
|
tosend.append(ln)
|
|
|
|
for ln in pbar(tosend, verbose):
|
|
if chunk and ln.startswith('class'):
|
|
chunk()
|
|
|
|
c.sendline(ln)
|
|
|
|
choice = c.expect(['=== ', 'FATAL: uncaught exception [0-9a-f\r]*\n'])
|
|
if choice == 1:
|
|
# Capture and display the error message, then exit
|
|
if not verbose:
|
|
print('\n~~~')
|
|
while choice == 1:
|
|
if not verbose:
|
|
print(c.match.group(0).rstrip(), file=sys.stderr)
|
|
choice = c.expect([pexpect.TIMEOUT, '.*\n'], timeout=2)
|
|
if not verbose:
|
|
print('~~~')
|
|
print('\nPlease reboot your device', file=sys.stderr)
|
|
sys.exit(16)
|
|
|
|
def print_log(logfile):
|
|
lines = logfile.getvalue().split('\n')
|
|
lines = [ l.strip('\x04\r') for l in lines ]
|
|
|
|
output = [ l for l in lines if l and l != '>>> ' ]
|
|
if output:
|
|
print('~~~')
|
|
print('\n'.join(output))
|
|
print('~~~')
|
|
|
|
def handle_eval(c, cmd):
|
|
verbose = bool(c.logfile)
|
|
|
|
c.send('\x05')
|
|
c.expect('=== ')
|
|
c.sendline(cmd)
|
|
c.expect('=== ')
|
|
|
|
if not verbose:
|
|
c.logfile = io.StringIO()
|
|
c.send('\x04')
|
|
c.expect('>>> ')
|
|
if not verbose:
|
|
print_log(c.logfile)
|
|
c.logfile.close()
|
|
c.logfile = None
|
|
|
|
def handle_exec(c, fname):
|
|
verbose = bool(c.logfile)
|
|
|
|
log = io.StringIO()
|
|
|
|
def chunk():
|
|
if not verbose:
|
|
c.logfile = log
|
|
c.send('\x04')
|
|
c.expect('>>> ')
|
|
if not verbose:
|
|
c.logfile = None
|
|
c.send('\x05')
|
|
c.expect('=== ')
|
|
|
|
with open(fname) as f:
|
|
if not verbose:
|
|
print(f'Preparing to run {fname}:')
|
|
|
|
c.send('\x05')
|
|
c.expect('=== ')
|
|
|
|
paste(c, f, verbose, chunk)
|
|
|
|
if not verbose:
|
|
c.logfile = log
|
|
c.send('\x04')
|
|
c.expect('>>> ')
|
|
if not verbose:
|
|
c.logfile = None
|
|
|
|
print_log(log)
|
|
log.close()
|
|
|
|
def handle_reset(c, ota=False):
|
|
cmd = 'reset'
|
|
if ota:
|
|
cmd = 'enter_ota_dfu'
|
|
|
|
c.send('\x05')
|
|
c.expect('=== ')
|
|
c.sendline('import machine')
|
|
c.expect('=== ')
|
|
c.sendline(f'machine.{cmd}()')
|
|
c.expect('=== ')
|
|
c.send('\x04')
|
|
|
|
def handle_rtc(c):
|
|
# Wait for the clock to tick over to the next second
|
|
now = then = time.localtime()
|
|
while now[5] == then[5]:
|
|
now = time.localtime()
|
|
|
|
# Set the time
|
|
c.sendline(f'watch.rtc.set_localtime(({now[0]}, {now[1]}, {now[2]}, {now[3]}, {now[4]}, {now[5]}, {now[6]}, {now[7]}))')
|
|
c.expect('>>> ')
|
|
|
|
def check_rtc(c):
|
|
c.sendline('print(watch.rtc.get_localtime())')
|
|
c.expect('\(([0-9]+), ([0-9]+), ([0-9]+), ([0-9]+), ([0-9]+), ([0-9]+), ([0-9]+), ([0-9]+)\)')
|
|
t = time.localtime()
|
|
|
|
watch_hms = (int(c.match[4]), int(c.match[5]), int(c.match[6]))
|
|
watch_str = f'{watch_hms[0]:02d}:{watch_hms[1]:02d}:{watch_hms[2]:02d}'
|
|
host_hms = (t.tm_hour, t.tm_min, t.tm_sec)
|
|
host_str = f'{host_hms[0]:02d}:{host_hms[1]:02d}:{host_hms[2]:02d}'
|
|
delta = 3600 * (host_hms[0] - watch_hms[0]) + \
|
|
60 * (host_hms[1] - watch_hms[1]) + \
|
|
(host_hms[2] - watch_hms[2])
|
|
print(f"PC <-> watch: {watch_str} <-> {host_str} (delta {delta})")
|
|
|
|
c.expect('>>> ')
|
|
|
|
def handle_binary_download(c, tname, fname):
|
|
verbose = bool(c.logfile)
|
|
|
|
c.sendline('import os')
|
|
c.expect('>>>')
|
|
c.sendline(f'os.stat("{tname}")[6]')
|
|
err = c.expect(['>>> ', 'Error'])
|
|
if err:
|
|
print('Target error, cannot open file')
|
|
c.expect('>>>')
|
|
return
|
|
|
|
lines = c.before.split('\n')
|
|
sz = eval(lines[1].rstrip())
|
|
bytes_read = 0
|
|
|
|
c.sendline(f'f = open("{tname}", "rb")')
|
|
c.expect('>>>')
|
|
|
|
print(f'Downloading {fname}:')
|
|
|
|
with open(fname, 'wb') as f:
|
|
while True:
|
|
draw_pbar(100 * bytes_read / sz, verbose)
|
|
c.sendline('repr(f.read(24))')
|
|
c.expect('>>>')
|
|
lines = c.before.split('\n')
|
|
r = lines[1].rstrip().strip('"').replace('\\\\', '\\')
|
|
eval(f'f.write({r})')
|
|
bytes_read += 24
|
|
if len(r) <= 3:
|
|
break
|
|
|
|
draw_pbar(100, verbose, end=None)
|
|
|
|
c.sendline('f.close()')
|
|
c.expect('>>> ')
|
|
c.sendline('f = None')
|
|
c.expect('>>> ')
|
|
|
|
def handle_binary_upload(c, fname, tname):
|
|
verbose = bool(c.logfile)
|
|
|
|
if not tname:
|
|
tname = os.path.basename(fname)
|
|
|
|
c.sendline(f'f = open("{tname}", "wb")')
|
|
c.expect('>>> ')
|
|
|
|
# Absorb the file to be uploaded
|
|
with open(fname, 'rb') as f:
|
|
data = f.read()
|
|
chunksz = 24
|
|
nchunks = len(data) // chunksz
|
|
lastchunk = len(data) % chunksz
|
|
|
|
if not verbose:
|
|
print(f'Uploading {fname}:')
|
|
|
|
# Send the data
|
|
for i in pbar(range(0, chunksz*nchunks, chunksz), verbose):
|
|
c.sendline(f'f.write({repr(data[i:i+chunksz])})')
|
|
c.expect('>>> ')
|
|
if lastchunk:
|
|
c.sendline(f'f.write({repr(data[-lastchunk:])})')
|
|
c.expect('>>> ')
|
|
|
|
c.sendline('f.close()')
|
|
c.expect('>>> ')
|
|
|
|
def handle_upload(c, fname, tname):
|
|
verbose = bool(c.logfile)
|
|
|
|
if not tname:
|
|
tname = os.path.basename(fname)
|
|
|
|
c.sendline('from shell import upload')
|
|
c.expect('>>> ')
|
|
|
|
with open(fname) as f:
|
|
if not verbose:
|
|
print(f'Uploading {fname}:')
|
|
|
|
c.sendline(f'upload("{tname}")')
|
|
c.expect('=== ')
|
|
paste(c, f, verbose)
|
|
c.send('\x04')
|
|
|
|
c.expect('>>> ')
|
|
|
|
if __name__ == '__main__':
|
|
parser = argparse.ArgumentParser(
|
|
description='Wasp-os command and control client')
|
|
parser.add_argument('--as', dest='upload_as', default=None,
|
|
help="Filename to use on the target (e.g. wasptool --upload docs/main/chrono.py --as main.py")
|
|
parser.add_argument('--bootloader', action='store_true',
|
|
help="Reboot into the bootloader mode for OTA update")
|
|
parser.add_argument('--binary', action='store_true',
|
|
help="Enable non-ASCII mode for suitable commands (such as upload)")
|
|
parser.add_argument('--console', action='store_true',
|
|
help='Launch a REPL session')
|
|
parser.add_argument('--check-rtc', action='store_true',
|
|
help='Compare workstation and watch times')
|
|
parser.add_argument('--device',
|
|
help='Connect only to a specific named device (or MAC address)')
|
|
parser.add_argument('--exec',
|
|
help='Execute the contents of a file')
|
|
parser.add_argument('--eval',
|
|
help='Execute the provided python string')
|
|
parser.add_argument('--pull',
|
|
help='Fetch a file from the target')
|
|
parser.add_argument('--push',
|
|
help='Push a file to the target')
|
|
parser.add_argument('--reset', action='store_true',
|
|
help="Reboot the device (and don't stay in bootloader mode)")
|
|
parser.add_argument('--ota',
|
|
help="Deliver an OTA update to the device")
|
|
parser.add_argument('--rtc', action='store_true',
|
|
help='Set the time on the wasp-os device')
|
|
parser.add_argument('--upload',
|
|
help='Copy the specified file to the wasp-os device')
|
|
parser.add_argument('--verbose', action='store_true',
|
|
help='Log interaction with the wasp-os device')
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.device:
|
|
if ':' in args.device:
|
|
device_args = ' --address ' + args.device
|
|
else:
|
|
device_args = ' --device ' + args.device
|
|
else:
|
|
device_args = ''
|
|
|
|
pynus = os.path.dirname(sys.argv[0]) + '/pynus/pynus.py' + device_args
|
|
console = pexpect.spawn(pynus, encoding='UTF-8')
|
|
console.run_command = types.MethodType(run_command, console)
|
|
console.sync = types.MethodType(sync, console)
|
|
console.unsync = types.MethodType(unsync, console)
|
|
if args.verbose:
|
|
console.logfile = sys.stdout
|
|
else:
|
|
console.logfile = io.StringIO()
|
|
|
|
try:
|
|
console.expect('Connect.*\(([0-9A-F:]*)\)')
|
|
except pexpect.exceptions.TIMEOUT:
|
|
print('ERROR: Cannot find suitable wasp-os device')
|
|
if not args.verbose:
|
|
print_log(console.logfile)
|
|
sys.exit(1)
|
|
if not args.verbose:
|
|
console.logfile.close()
|
|
console.logfile = None
|
|
|
|
macaddr = console.match.group(1)
|
|
console.expect('Exit console using Ctrl-X')
|
|
time.sleep(0.5)
|
|
console.sync()
|
|
|
|
if args.rtc:
|
|
handle_rtc(console)
|
|
|
|
if args.check_rtc:
|
|
check_rtc(console)
|
|
|
|
if args.exec:
|
|
handle_exec(console, args.exec)
|
|
|
|
if args.eval:
|
|
handle_eval(console, args.eval)
|
|
|
|
if args.pull:
|
|
handle_binary_download(console, args.pull, args.pull)
|
|
|
|
if args.push:
|
|
handle_binary_upload(console, args.push, args.push)
|
|
|
|
if args.upload:
|
|
if args.binary:
|
|
handle_binary_upload(console, args.upload, args.upload_as)
|
|
else:
|
|
handle_upload(console, args.upload, args.upload_as)
|
|
|
|
if args.console:
|
|
console.close()
|
|
argv = pynus.split()
|
|
os.execv(argv[0], argv)
|
|
|
|
if args.ota:
|
|
handle_reset(console, ota=True)
|
|
time.sleep(1.0)
|
|
dfu = os.path.dirname(sys.argv[0]) + '/ota-dfu/dfu.py'
|
|
os.execl(dfu, dfu, '-z', args.ota, '-a', macaddr, '--legacy')
|
|
|
|
if args.reset:
|
|
handle_reset(console)
|
|
sys.exit(0)
|
|
|
|
if args.bootloader:
|
|
handle_reset(console, ota=True)
|
|
sys.exit(0)
|
|
|
|
console.unsync()
|