ee257d6ade
Signed-off-by: olivier-cornelis <klduycea@mailer.me>
499 lines
14 KiB
Python
Executable file
499 lines
14 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
|
|
# SPDX-License-Identifier: LGPL-3.0-or-later
|
|
# Copyright (c) 2020 Daniel Thompson
|
|
|
|
import argparse
|
|
import binascii
|
|
import io
|
|
import random
|
|
import os
|
|
import pexpect
|
|
import time
|
|
import types
|
|
import string
|
|
import subprocess
|
|
import sys
|
|
|
|
def draw_pbar(percent, quiet=False, end='\r'):
|
|
if not quiet:
|
|
if percent > 100:
|
|
percent = 100
|
|
bar = int(percent) // 2
|
|
print(f'[{"#"*bar}{"."*(50-bar)}] {round(percent, 1)}% ', end=end, flush=True)
|
|
|
|
def pbar(iterable, quiet=False):
|
|
step = 100 / len(iterable)
|
|
|
|
for i, v in enumerate(iterable):
|
|
draw_pbar(step * i, quiet)
|
|
yield v
|
|
if not quiet:
|
|
draw_pbar(100, quiet, None)
|
|
|
|
def run_command(c, cmd):
|
|
"""Cheap and cheerful command wrapper.
|
|
|
|
This differs from REPLWrapper because it assumes the remote end will
|
|
echo the characters... and that we must eat them before handing over
|
|
the results to the caller.
|
|
"""
|
|
c.sendline(cmd)
|
|
c.expect_exact(cmd)
|
|
c.expect('>>> ')
|
|
return c.before.replace('\r\r\n', '\n').strip('\n')
|
|
|
|
def sync(c):
|
|
"""Stop the watch and synchronize with the command prompt.
|
|
|
|
Sending a random print ensure the final export (of the prompt)
|
|
does not accidentally match a previously issued prompt.
|
|
"""
|
|
verbose = bool(c.logfile)
|
|
tag = ''.join([random.choice(string.ascii_uppercase) for i in range(6)])
|
|
|
|
|
|
try:
|
|
if not verbose:
|
|
c.logfile = io.StringIO()
|
|
|
|
c.send('\x03')
|
|
c.expect('>>> ')
|
|
c.sendline(f'print("{tag[:3]}""{tag[3:]}")')
|
|
c.expect(tag)
|
|
c.expect('>>> ')
|
|
|
|
if not verbose:
|
|
c.logfile.close()
|
|
c.logfile = None
|
|
except pexpect.exceptions.EOF:
|
|
print("ERROR: Cannot sync with device")
|
|
print_log(c.logfile)
|
|
sys.exit(1)
|
|
|
|
def unsync(c):
|
|
"""Set the watch running again.
|
|
|
|
There must be an expect (or a sleep) since if we kill the subordinate
|
|
process too early then the sendline will not have completed.
|
|
"""
|
|
c.sendline('wasp.system.run()')
|
|
c.expect(['Watch is running, use Ctrl-C to stop',
|
|
'Watch already running in the background'])
|
|
c.send('\x18')
|
|
|
|
def paste(c, f, verbose=False, chunk=None):
|
|
docstring = False
|
|
|
|
tosend = []
|
|
|
|
for ln in f.readlines():
|
|
ln = ln.rstrip()
|
|
|
|
# This is a bit loose (definitely not PEP-257 compliant) but
|
|
# is enough for most code.
|
|
if ln.lstrip().startswith('"""'):
|
|
docstring = True
|
|
if docstring:
|
|
if ln.rstrip().endswith('"""'):
|
|
docstring = False
|
|
continue
|
|
|
|
if ln.lstrip().startswith('#'):
|
|
continue
|
|
|
|
if ln.strip() == '':
|
|
continue
|
|
|
|
tosend.append(ln)
|
|
|
|
for ln in pbar(tosend, verbose):
|
|
if chunk and ln.startswith('class'):
|
|
chunk()
|
|
|
|
c.sendline(ln)
|
|
|
|
choice = c.expect(['=== ', 'FATAL: uncaught exception [0-9a-f\r]*\n'])
|
|
if choice == 1:
|
|
# Capture and display the error message, then exit
|
|
if not verbose:
|
|
print('\n~~~')
|
|
while choice == 1:
|
|
if not verbose:
|
|
print(c.match.group(0).rstrip(), file=sys.stderr)
|
|
choice = c.expect([pexpect.TIMEOUT, '.*\n'], timeout=2)
|
|
if not verbose:
|
|
print('~~~')
|
|
print('\nPlease reboot your device', file=sys.stderr)
|
|
sys.exit(16)
|
|
|
|
def print_log(logfile):
|
|
try:
|
|
lines = logfile.getvalue().split('\n')
|
|
except:
|
|
lines = logfile.split('\n')
|
|
lines = [ l.strip('\x04\r') for l in lines ]
|
|
|
|
output = [ l for l in lines if l and l != '>>> ' ]
|
|
if output:
|
|
print('~~~')
|
|
print('\n'.join(output))
|
|
print('~~~')
|
|
|
|
def handle_eval(c, cmd):
|
|
verbose = bool(c.logfile)
|
|
|
|
c.send('\x05')
|
|
c.expect('=== ')
|
|
c.sendline(cmd)
|
|
c.expect('=== ')
|
|
|
|
if not verbose:
|
|
c.logfile = io.StringIO()
|
|
c.send('\x04')
|
|
c.expect('>>> ')
|
|
if not verbose:
|
|
print_log(c.logfile)
|
|
c.logfile.close()
|
|
c.logfile = None
|
|
|
|
def handle_exec(c, fname):
|
|
verbose = bool(c.logfile)
|
|
|
|
log = io.StringIO()
|
|
|
|
def chunk():
|
|
if not verbose:
|
|
c.logfile = log
|
|
c.send('\x04')
|
|
c.expect('>>> ')
|
|
if not verbose:
|
|
c.logfile = None
|
|
c.send('\x05')
|
|
c.expect('=== ')
|
|
|
|
with open(fname) as f:
|
|
if not verbose:
|
|
print(f'Preparing to run {fname}:')
|
|
|
|
c.send('\x05')
|
|
c.expect('=== ')
|
|
|
|
paste(c, f, verbose, chunk)
|
|
|
|
if not verbose:
|
|
c.logfile = log
|
|
c.send('\x04')
|
|
c.expect('>>> ')
|
|
if not verbose:
|
|
c.logfile = None
|
|
|
|
print_log(log)
|
|
log.close()
|
|
|
|
def handle_battery_level(c):
|
|
print(f'Battery: {c.run_command("watch.battery.level()")}%')
|
|
|
|
def handle_memory_free(c):
|
|
before_gc = c.run_command("wasp.gc.mem_free()")
|
|
c.run_command('wasp.gc.collect()')
|
|
after_gc = c.run_command("wasp.gc.mem_free()")
|
|
|
|
p = subprocess.run(["git", "describe", "--tags"], capture_output=True)
|
|
version = p.stdout.decode('UTF-8').strip()
|
|
|
|
print(f'"{version}",{before_gc},{after_gc}')
|
|
|
|
def handle_reset(c, ota=False):
|
|
cmd = 'reset'
|
|
if ota:
|
|
cmd = 'enter_ota_dfu'
|
|
|
|
c.send('\x05')
|
|
c.expect('=== ')
|
|
c.sendline('import machine')
|
|
c.expect('=== ')
|
|
c.sendline(f'machine.{cmd}()')
|
|
c.expect('=== ')
|
|
c.send('\x04')
|
|
|
|
def handle_rtc(c):
|
|
# Wait for the clock to tick over to the next second
|
|
now = then = time.localtime()
|
|
while now[5] == then[5]:
|
|
now = time.localtime()
|
|
|
|
# Set the time
|
|
c.sendline(f'watch.rtc.set_localtime(({now[0]}, {now[1]}, {now[2]}, {now[3]}, {now[4]}, {now[5]}, {now[6]}, {now[7]}))')
|
|
c.expect('>>> ')
|
|
|
|
def check_rtc(c):
|
|
c.sendline('print(watch.rtc.get_localtime())')
|
|
c.expect(r'\(([0-9]+), ([0-9]+), ([0-9]+), ([0-9]+), ([0-9]+), ([0-9]+), ([0-9]+), ([0-9]+)\)')
|
|
t = time.localtime()
|
|
|
|
watch_hms = (int(c.match[4]), int(c.match[5]), int(c.match[6]))
|
|
watch_str = f'{watch_hms[0]:02d}:{watch_hms[1]:02d}:{watch_hms[2]:02d}'
|
|
host_hms = (t.tm_hour, t.tm_min, t.tm_sec)
|
|
host_str = f'{host_hms[0]:02d}:{host_hms[1]:02d}:{host_hms[2]:02d}'
|
|
delta = 3600 * (host_hms[0] - watch_hms[0]) + \
|
|
60 * (host_hms[1] - watch_hms[1]) + \
|
|
(host_hms[2] - watch_hms[2])
|
|
print(f"PC <-> watch: {watch_str} <-> {host_str} (delta {delta})")
|
|
|
|
c.expect('>>> ')
|
|
|
|
def handle_binary_download(c, tname, fname):
|
|
verbose = bool(c.logfile)
|
|
|
|
if tname.startswith("/"):
|
|
fname = "." + fname
|
|
|
|
c.run_command('import os')
|
|
stat = c.run_command(f'os.stat("{tname}")[6]')
|
|
if 'Error' in stat:
|
|
print('Watch reported error:')
|
|
print(stat)
|
|
return
|
|
|
|
print(f'Downloading {fname}:')
|
|
|
|
c.run_command(f'f = open("{tname}", "rb")')
|
|
sz = eval(stat)
|
|
bytes_read = 0
|
|
|
|
if os.path.exists(fname):
|
|
if args.force_pull:
|
|
print(f"Will overwrite local file '{fname}'")
|
|
else:
|
|
raise Exception(f"Local file '{fname}' already exists, use --force argument to overwrite it")
|
|
|
|
if "/" in fname: # create local directories if needed
|
|
pile = "./"
|
|
for local_dir in fname.split("/")[:-1]:
|
|
if local_dir == ".":
|
|
continue
|
|
try:
|
|
os.mkdir(f"{pile}/{local_dir}/")
|
|
print(f"Created local dir {pile}/{local_dir}/")
|
|
except FileExistsError: # folder already exists
|
|
pass
|
|
pile += local_dir + "/"
|
|
|
|
with open(fname, 'wb') as f:
|
|
while True:
|
|
draw_pbar(100 * bytes_read / sz, verbose)
|
|
reply = c.run_command('repr(f.read(24))')
|
|
reply = reply.replace('\\\\', '\\')
|
|
if reply.startswith('"'):
|
|
# "b'..CONTENT..'"
|
|
reply = reply[1:-1]
|
|
elif reply.startswith("'"):
|
|
# 'b\'..CONTENT..\''
|
|
reply = reply[1:-1].replace("\\'", "'")
|
|
data = eval(reply)
|
|
if len(data) == 0:
|
|
break
|
|
bytes_read += len(data)
|
|
f.write(data)
|
|
|
|
draw_pbar(100, verbose, end=None)
|
|
c.run_command('f.close()')
|
|
|
|
# Release as much memory as possible
|
|
c.run_command('del f')
|
|
c.run_command('del os')
|
|
|
|
def handle_binary_upload(c, fname, tname):
|
|
verbose = bool(c.logfile)
|
|
|
|
if not tname:
|
|
tname = os.path.basename(fname)
|
|
else:
|
|
dname = os.path.dirname(tname)
|
|
if dname:
|
|
c.run_command('import os')
|
|
c.sendline(f'os.mkdir("{dname}")')
|
|
c.run_command('del os')
|
|
|
|
c.run_command('import ubinascii')
|
|
c.run_command(f'f = open("{tname}", "wb")')
|
|
# We define a function with a short name to reduce the constant per-chunk
|
|
# overhead.
|
|
# We use a lambda to avoid the "..." prompt triggered by def.
|
|
c.run_command(f'w = lambda d: f.write(ubinascii.a2b_base64(d))')
|
|
|
|
# Absorb the file to be uploaded
|
|
with open(fname, 'rb') as f:
|
|
data = f.read()
|
|
chunksz = 64
|
|
nchunks = len(data) // chunksz
|
|
lastchunk = len(data) % chunksz
|
|
|
|
if not verbose:
|
|
print(f'Uploading {fname}:')
|
|
|
|
# Send the data
|
|
for i in pbar(range(0, chunksz*nchunks, chunksz), verbose):
|
|
c.run_command(f'w({repr(binascii.b2a_base64(data[i:i+chunksz]))})')
|
|
if lastchunk:
|
|
c.run_command(f'w({repr(binascii.b2a_base64(data[-lastchunk:]))})')
|
|
|
|
c.run_command('f.close()')
|
|
c.run_command('del w')
|
|
c.run_command('del ubinascii')
|
|
|
|
def handle_upload(c, fname, tname):
|
|
verbose = bool(c.logfile)
|
|
|
|
if not tname:
|
|
tname = os.path.basename(fname)
|
|
else:
|
|
dname = os.path.dirname(tname)
|
|
if dname:
|
|
c.run_command('import os')
|
|
c.sendline(f'os.mkdir("{dname}")')
|
|
c.run_command('del os')
|
|
|
|
c.run_command('from shell import upload')
|
|
|
|
with open(fname) as f:
|
|
if not verbose:
|
|
print(f'Uploading {fname}:')
|
|
|
|
c.sendline(f'upload("{tname}")')
|
|
c.expect('=== ')
|
|
paste(c, f, verbose)
|
|
c.send('\x04')
|
|
|
|
c.expect('>>> ')
|
|
|
|
if __name__ == '__main__':
|
|
parser = argparse.ArgumentParser(
|
|
description='Wasp-os command and control client')
|
|
parser.add_argument('--as', dest='file_as', default=None,
|
|
help="Filename to use on the target (e.g. wasptool --upload docs/main/chrono.py --as main.py. Also works with --pull")
|
|
parser.add_argument('--battery', action='store_true',
|
|
help="Report remaining battery charge")
|
|
parser.add_argument('--bootloader', action='store_true',
|
|
help="Reboot into the bootloader mode for OTA update")
|
|
parser.add_argument('--binary', action='store_true',
|
|
help="Enable non-ASCII mode for upload command")
|
|
parser.add_argument('--console', action='store_true',
|
|
help='Launch a REPL session')
|
|
parser.add_argument('--check-rtc', action='store_true',
|
|
help='Compare workstation and watch times')
|
|
parser.add_argument('--device',
|
|
help='Connect only to a specific named device (or MAC address)')
|
|
parser.add_argument('--exec',
|
|
help='Execute the contents of a file')
|
|
parser.add_argument('--eval',
|
|
help='Execute the provided python string')
|
|
parser.add_argument('--memfree', action='store_true',
|
|
help='Report on the current memory usage.')
|
|
parser.add_argument('--pull',
|
|
help='Fetch a file from the target')
|
|
parser.add_argument('--force', action='store_true', dest='force_pull',
|
|
help='Force pull to overwrite local file if a file with the same name already exists')
|
|
parser.add_argument('--push',
|
|
help='Push a file to the target')
|
|
parser.add_argument('--reset', action='store_true',
|
|
help="Reboot the device (and don't stay in bootloader mode)")
|
|
parser.add_argument('--ota',
|
|
help="Deliver an OTA update to the device")
|
|
parser.add_argument('--rtc', action='store_true',
|
|
help='Set the time on the wasp-os device')
|
|
parser.add_argument('--upload',
|
|
help='Copy the specified file to the wasp-os device')
|
|
parser.add_argument('--verbose', action='store_true',
|
|
help='Log interaction with the wasp-os device')
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.device:
|
|
if ':' in args.device:
|
|
device_args = ' --address ' + args.device
|
|
else:
|
|
device_args = ' --device ' + args.device
|
|
else:
|
|
device_args = ''
|
|
|
|
pynus = os.path.dirname(sys.argv[0]) + '/pynus/pynus.py' + device_args
|
|
console = pexpect.spawn(pynus, encoding='UTF-8')
|
|
console.run_command = types.MethodType(run_command, console)
|
|
console.sync = types.MethodType(sync, console)
|
|
console.unsync = types.MethodType(unsync, console)
|
|
if args.verbose:
|
|
console.logfile = sys.stdout
|
|
else:
|
|
console.logfile = io.StringIO()
|
|
|
|
try:
|
|
console.expect(r'Connect.*\(([0-9A-F:]*)\)')
|
|
except pexpect.exceptions.TIMEOUT:
|
|
print('ERROR: Cannot find suitable wasp-os device')
|
|
if not args.verbose:
|
|
print_log(console.logfile)
|
|
sys.exit(1)
|
|
if not args.verbose:
|
|
console.logfile.close()
|
|
console.logfile = None
|
|
|
|
macaddr = console.match.group(1)
|
|
console.expect('Exit console using Ctrl-X')
|
|
time.sleep(0.5)
|
|
console.sync()
|
|
|
|
if args.rtc:
|
|
handle_rtc(console)
|
|
|
|
if args.check_rtc:
|
|
check_rtc(console)
|
|
|
|
if args.exec:
|
|
handle_exec(console, args.exec)
|
|
|
|
if args.eval:
|
|
handle_eval(console, args.eval)
|
|
|
|
if args.pull:
|
|
if args.file_as is None:
|
|
handle_binary_download(console, args.pull, args.pull)
|
|
else:
|
|
handle_binary_download(console, args.pull, args.file_as)
|
|
|
|
if args.push:
|
|
handle_binary_upload(console, args.push, args.push)
|
|
|
|
if args.upload:
|
|
if args.binary:
|
|
handle_binary_upload(console, args.upload, args.file_as)
|
|
else:
|
|
handle_upload(console, args.upload, args.file_as)
|
|
|
|
if args.memfree:
|
|
handle_memory_free(console)
|
|
|
|
if args.console:
|
|
console.close()
|
|
argv = pynus.split()
|
|
os.execv(argv[0], argv)
|
|
|
|
if args.ota:
|
|
handle_reset(console, ota=True)
|
|
time.sleep(2.0)
|
|
dfu = os.path.dirname(sys.argv[0]) + '/ota-dfu/dfu.py'
|
|
os.execl(dfu, dfu, '-z', args.ota, '-a', macaddr, '--legacy')
|
|
|
|
if args.reset:
|
|
handle_reset(console)
|
|
sys.exit(0)
|
|
|
|
if args.bootloader:
|
|
handle_reset(console, ota=True)
|
|
sys.exit(0)
|
|
|
|
if args.battery:
|
|
handle_battery_level(console)
|
|
|
|
console.unsync()
|