cabe6f143c
Currently if the binary file being downloaded contains single quote characters then it gets wrapped differently by repr() so we have to add additional cases to strip the wrapper. Fix this the "obvious" way... where by "obvious" I mean almost anything but. Signed-off-by: Daniel Thompson <daniel@redfelineninja.org.uk>
445 lines
12 KiB
Python
Executable file
445 lines
12 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
|
|
# SPDX-License-Identifier: LGPL-3.0-or-later
|
|
# Copyright (c) 2020 Daniel Thompson
|
|
|
|
import argparse
|
|
import io
|
|
import random
|
|
import os.path
|
|
import pexpect
|
|
import time
|
|
import types
|
|
import string
|
|
import sys
|
|
|
|
def draw_pbar(percent, quiet=False, end='\r'):
|
|
if not quiet:
|
|
if percent > 100:
|
|
percent = 100
|
|
bar = int(percent) // 2
|
|
print(f'[{"#"*bar}{"."*(50-bar)}] {percent}% ', end=end, flush=True)
|
|
|
|
def pbar(iterable, quiet=False):
|
|
step = 100 / len(iterable)
|
|
|
|
for i, v in enumerate(iterable):
|
|
draw_pbar(round(step * i, 1), quiet)
|
|
yield v
|
|
if not quiet:
|
|
draw_pbar(100, quiet, None)
|
|
|
|
def run_command(c, cmd):
|
|
"""Cheap and cheerful command wrapper.
|
|
|
|
This differs from REPLWrapper because it assumes the remote end will
|
|
echo the characters... and that we must eat them before handing over
|
|
the results to the caller.
|
|
"""
|
|
c.sendline(cmd)
|
|
c.expect_exact(cmd)
|
|
c.expect('>>> ')
|
|
return c.before.replace('\r\r\n', '\n').strip('\n')
|
|
|
|
def sync(c):
|
|
"""Stop the watch and synchronize with the command prompt.
|
|
|
|
Sending a random print ensure the final export (of the prompt)
|
|
does not accidentally match a previously issued prompt.
|
|
"""
|
|
verbose = bool(c.logfile)
|
|
tag = ''.join([random.choice(string.ascii_uppercase) for i in range(6)])
|
|
|
|
|
|
try:
|
|
if not verbose:
|
|
c.logfile = io.StringIO()
|
|
|
|
c.send('\x03')
|
|
c.expect('>>> ')
|
|
c.sendline(f'print("{tag[:3]}""{tag[3:]}")')
|
|
c.expect(tag)
|
|
c.expect('>>> ')
|
|
|
|
if not verbose:
|
|
c.logfile.close()
|
|
c.logfile = None
|
|
except pexpect.exceptions.EOF:
|
|
print("ERROR: Cannot sync with device")
|
|
print_log(c.logfile)
|
|
sys.exit(1)
|
|
|
|
def unsync(c):
|
|
"""Set the watch running again.
|
|
|
|
There must be an expect (or a sleep) since if we kill the subordinate
|
|
process too early then the sendline will not have completed.
|
|
"""
|
|
c.sendline('wasp.system.run()')
|
|
c.expect(['Watch is running, use Ctrl-C to stop',
|
|
'Watch already running in the background'])
|
|
c.send('\x18')
|
|
|
|
def paste(c, f, verbose=False, chunk=None):
|
|
docstring = False
|
|
|
|
tosend = []
|
|
|
|
for ln in f.readlines():
|
|
ln = ln.rstrip()
|
|
|
|
# This is a bit loose (definitely not PEP-257 compliant) but
|
|
# is enough for most code.
|
|
if ln.lstrip().startswith('"""'):
|
|
docstring = True
|
|
if docstring:
|
|
if ln.rstrip().endswith('"""'):
|
|
docstring = False
|
|
continue
|
|
|
|
if ln.lstrip().startswith('#'):
|
|
continue
|
|
|
|
if ln.strip() == '':
|
|
continue
|
|
|
|
tosend.append(ln)
|
|
|
|
for ln in pbar(tosend, verbose):
|
|
if chunk and ln.startswith('class'):
|
|
chunk()
|
|
|
|
c.sendline(ln)
|
|
|
|
choice = c.expect(['=== ', 'FATAL: uncaught exception [0-9a-f\r]*\n'])
|
|
if choice == 1:
|
|
# Capture and display the error message, then exit
|
|
if not verbose:
|
|
print('\n~~~')
|
|
while choice == 1:
|
|
if not verbose:
|
|
print(c.match.group(0).rstrip(), file=sys.stderr)
|
|
choice = c.expect([pexpect.TIMEOUT, '.*\n'], timeout=2)
|
|
if not verbose:
|
|
print('~~~')
|
|
print('\nPlease reboot your device', file=sys.stderr)
|
|
sys.exit(16)
|
|
|
|
def print_log(logfile):
|
|
try:
|
|
lines = logfile.getvalue().split('\n')
|
|
except:
|
|
lines = logfile.split('\n')
|
|
lines = [ l.strip('\x04\r') for l in lines ]
|
|
|
|
output = [ l for l in lines if l and l != '>>> ' ]
|
|
if output:
|
|
print('~~~')
|
|
print('\n'.join(output))
|
|
print('~~~')
|
|
|
|
def handle_eval(c, cmd):
|
|
verbose = bool(c.logfile)
|
|
|
|
c.send('\x05')
|
|
c.expect('=== ')
|
|
c.sendline(cmd)
|
|
c.expect('=== ')
|
|
|
|
if not verbose:
|
|
c.logfile = io.StringIO()
|
|
c.send('\x04')
|
|
c.expect('>>> ')
|
|
if not verbose:
|
|
print_log(c.logfile)
|
|
c.logfile.close()
|
|
c.logfile = None
|
|
|
|
def handle_exec(c, fname):
|
|
verbose = bool(c.logfile)
|
|
|
|
log = io.StringIO()
|
|
|
|
def chunk():
|
|
if not verbose:
|
|
c.logfile = log
|
|
c.send('\x04')
|
|
c.expect('>>> ')
|
|
if not verbose:
|
|
c.logfile = None
|
|
c.send('\x05')
|
|
c.expect('=== ')
|
|
|
|
with open(fname) as f:
|
|
if not verbose:
|
|
print(f'Preparing to run {fname}:')
|
|
|
|
c.send('\x05')
|
|
c.expect('=== ')
|
|
|
|
paste(c, f, verbose, chunk)
|
|
|
|
if not verbose:
|
|
c.logfile = log
|
|
c.send('\x04')
|
|
c.expect('>>> ')
|
|
if not verbose:
|
|
c.logfile = None
|
|
|
|
print_log(log)
|
|
log.close()
|
|
|
|
def handle_memory_free(c):
|
|
# wasp namespace to avoid having to import into the global space
|
|
print(f'Before GC: {c.run_command("wasp.gc.mem_free()"):7}')
|
|
c.run_command('wasp.gc.collect()')
|
|
print(f'After GC: {c.run_command("wasp.gc.mem_free()"):7}')
|
|
|
|
def handle_reset(c, ota=False):
|
|
cmd = 'reset'
|
|
if ota:
|
|
cmd = 'enter_ota_dfu'
|
|
|
|
c.send('\x05')
|
|
c.expect('=== ')
|
|
c.sendline('import machine')
|
|
c.expect('=== ')
|
|
c.sendline(f'machine.{cmd}()')
|
|
c.expect('=== ')
|
|
c.send('\x04')
|
|
|
|
def handle_rtc(c):
|
|
# Wait for the clock to tick over to the next second
|
|
now = then = time.localtime()
|
|
while now[5] == then[5]:
|
|
now = time.localtime()
|
|
|
|
# Set the time
|
|
c.sendline(f'watch.rtc.set_localtime(({now[0]}, {now[1]}, {now[2]}, {now[3]}, {now[4]}, {now[5]}, {now[6]}, {now[7]}))')
|
|
c.expect('>>> ')
|
|
|
|
def check_rtc(c):
|
|
c.sendline('print(watch.rtc.get_localtime())')
|
|
c.expect('\(([0-9]+), ([0-9]+), ([0-9]+), ([0-9]+), ([0-9]+), ([0-9]+), ([0-9]+), ([0-9]+)\)')
|
|
t = time.localtime()
|
|
|
|
watch_hms = (int(c.match[4]), int(c.match[5]), int(c.match[6]))
|
|
watch_str = f'{watch_hms[0]:02d}:{watch_hms[1]:02d}:{watch_hms[2]:02d}'
|
|
host_hms = (t.tm_hour, t.tm_min, t.tm_sec)
|
|
host_str = f'{host_hms[0]:02d}:{host_hms[1]:02d}:{host_hms[2]:02d}'
|
|
delta = 3600 * (host_hms[0] - watch_hms[0]) + \
|
|
60 * (host_hms[1] - watch_hms[1]) + \
|
|
(host_hms[2] - watch_hms[2])
|
|
print(f"PC <-> watch: {watch_str} <-> {host_str} (delta {delta})")
|
|
|
|
c.expect('>>> ')
|
|
|
|
def handle_binary_download(c, tname, fname):
|
|
verbose = bool(c.logfile)
|
|
|
|
c.run_command('import os')
|
|
stat = c.run_command(f'os.stat("{tname}")[6]')
|
|
if 'Error' in stat:
|
|
print('Watch reported error:')
|
|
print(stat)
|
|
return
|
|
|
|
print(f'Downloading {fname}:')
|
|
|
|
c.run_command(f'f = open("{tname}", "rb")')
|
|
sz = eval(stat)
|
|
bytes_read = 0
|
|
|
|
with open(fname, 'wb') as f:
|
|
while True:
|
|
draw_pbar(100 * bytes_read / sz, verbose)
|
|
reply = c.run_command('repr(f.read(24))')
|
|
reply = reply.replace('\\\\', '\\')
|
|
if reply.startswith('"'):
|
|
# "b'..CONTENT..'"
|
|
reply = reply[1:-1]
|
|
elif reply.startswith("'"):
|
|
# 'b\'..CONTENT..\''
|
|
reply = reply[1:-1].replace("\\'", "'")
|
|
data = print(reply)
|
|
data = eval(reply)
|
|
if len(data) == 0:
|
|
break
|
|
bytes_read += len(data)
|
|
f.write(data)
|
|
|
|
draw_pbar(100, verbose, end=None)
|
|
c.run_command('f.close()')
|
|
|
|
# Release as much memory as possible
|
|
c.run_command('del f')
|
|
c.run_command('del os')
|
|
|
|
def handle_binary_upload(c, fname, tname):
|
|
verbose = bool(c.logfile)
|
|
|
|
if not tname:
|
|
tname = os.path.basename(fname)
|
|
|
|
c.sendline(f'f = open("{tname}", "wb")')
|
|
c.expect('>>> ')
|
|
|
|
# Absorb the file to be uploaded
|
|
with open(fname, 'rb') as f:
|
|
data = f.read()
|
|
chunksz = 24
|
|
nchunks = len(data) // chunksz
|
|
lastchunk = len(data) % chunksz
|
|
|
|
if not verbose:
|
|
print(f'Uploading {fname}:')
|
|
|
|
# Send the data
|
|
for i in pbar(range(0, chunksz*nchunks, chunksz), verbose):
|
|
c.sendline(f'f.write({repr(data[i:i+chunksz])})')
|
|
c.expect('>>> ')
|
|
if lastchunk:
|
|
c.sendline(f'f.write({repr(data[-lastchunk:])})')
|
|
c.expect('>>> ')
|
|
|
|
c.sendline('f.close()')
|
|
c.expect('>>> ')
|
|
|
|
def handle_upload(c, fname, tname):
|
|
verbose = bool(c.logfile)
|
|
|
|
if not tname:
|
|
tname = os.path.basename(fname)
|
|
|
|
c.run_command('from shell import upload')
|
|
|
|
with open(fname) as f:
|
|
if not verbose:
|
|
print(f'Uploading {fname}:')
|
|
|
|
c.sendline(f'upload("{tname}")')
|
|
c.expect('=== ')
|
|
paste(c, f, verbose)
|
|
c.send('\x04')
|
|
|
|
c.expect('>>> ')
|
|
|
|
if __name__ == '__main__':
|
|
parser = argparse.ArgumentParser(
|
|
description='Wasp-os command and control client')
|
|
parser.add_argument('--as', dest='upload_as', default=None,
|
|
help="Filename to use on the target (e.g. wasptool --upload docs/main/chrono.py --as main.py")
|
|
parser.add_argument('--bootloader', action='store_true',
|
|
help="Reboot into the bootloader mode for OTA update")
|
|
parser.add_argument('--binary', action='store_true',
|
|
help="Enable non-ASCII mode for suitable commands (such as upload)")
|
|
parser.add_argument('--console', action='store_true',
|
|
help='Launch a REPL session')
|
|
parser.add_argument('--check-rtc', action='store_true',
|
|
help='Compare workstation and watch times')
|
|
parser.add_argument('--device',
|
|
help='Connect only to a specific named device (or MAC address)')
|
|
parser.add_argument('--exec',
|
|
help='Execute the contents of a file')
|
|
parser.add_argument('--eval',
|
|
help='Execute the provided python string')
|
|
parser.add_argument('--memfree', action='store_true',
|
|
help='Report on the current memory usage.')
|
|
parser.add_argument('--pull',
|
|
help='Fetch a file from the target')
|
|
parser.add_argument('--push',
|
|
help='Push a file to the target')
|
|
parser.add_argument('--reset', action='store_true',
|
|
help="Reboot the device (and don't stay in bootloader mode)")
|
|
parser.add_argument('--ota',
|
|
help="Deliver an OTA update to the device")
|
|
parser.add_argument('--rtc', action='store_true',
|
|
help='Set the time on the wasp-os device')
|
|
parser.add_argument('--upload',
|
|
help='Copy the specified file to the wasp-os device')
|
|
parser.add_argument('--verbose', action='store_true',
|
|
help='Log interaction with the wasp-os device')
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.device:
|
|
if ':' in args.device:
|
|
device_args = ' --address ' + args.device
|
|
else:
|
|
device_args = ' --device ' + args.device
|
|
else:
|
|
device_args = ''
|
|
|
|
pynus = os.path.dirname(sys.argv[0]) + '/pynus/pynus.py' + device_args
|
|
console = pexpect.spawn(pynus, encoding='UTF-8')
|
|
console.run_command = types.MethodType(run_command, console)
|
|
console.sync = types.MethodType(sync, console)
|
|
console.unsync = types.MethodType(unsync, console)
|
|
if args.verbose:
|
|
console.logfile = sys.stdout
|
|
else:
|
|
console.logfile = io.StringIO()
|
|
|
|
try:
|
|
console.expect('Connect.*\(([0-9A-F:]*)\)')
|
|
except pexpect.exceptions.TIMEOUT:
|
|
print('ERROR: Cannot find suitable wasp-os device')
|
|
if not args.verbose:
|
|
print_log(console.logfile)
|
|
sys.exit(1)
|
|
if not args.verbose:
|
|
console.logfile.close()
|
|
console.logfile = None
|
|
|
|
macaddr = console.match.group(1)
|
|
console.expect('Exit console using Ctrl-X')
|
|
time.sleep(0.5)
|
|
console.sync()
|
|
|
|
if args.rtc:
|
|
handle_rtc(console)
|
|
|
|
if args.check_rtc:
|
|
check_rtc(console)
|
|
|
|
if args.exec:
|
|
handle_exec(console, args.exec)
|
|
|
|
if args.eval:
|
|
handle_eval(console, args.eval)
|
|
|
|
if args.pull:
|
|
handle_binary_download(console, args.pull, args.pull)
|
|
|
|
if args.push:
|
|
handle_binary_upload(console, args.push, args.push)
|
|
|
|
if args.upload:
|
|
if args.binary:
|
|
handle_binary_upload(console, args.upload, args.upload_as)
|
|
else:
|
|
handle_upload(console, args.upload, args.upload_as)
|
|
|
|
if args.memfree:
|
|
handle_memory_free(console)
|
|
|
|
if args.console:
|
|
console.close()
|
|
argv = pynus.split()
|
|
os.execv(argv[0], argv)
|
|
|
|
if args.ota:
|
|
handle_reset(console, ota=True)
|
|
time.sleep(1.0)
|
|
dfu = os.path.dirname(sys.argv[0]) + '/ota-dfu/dfu.py'
|
|
os.execl(dfu, dfu, '-z', args.ota, '-a', macaddr, '--legacy')
|
|
|
|
if args.reset:
|
|
handle_reset(console)
|
|
sys.exit(0)
|
|
|
|
if args.bootloader:
|
|
handle_reset(console, ota=True)
|
|
sys.exit(0)
|
|
|
|
console.unsync()
|