#!/usr/bin/python3 # SPDX-License-Identifier: LGPL-3.0-or-later # Copyright (c) 2020 Daniel Thompson import argparse import io import random import os.path import pexpect import time import string import sys def pbar(iterable, quiet=False): step = 100 / len(iterable) for i, v in enumerate(iterable): if not quiet: percent = round(step * i, 1) bar = int(percent) // 2 print(f'[{"#"*bar}{"."*(50-bar)}] {percent}%', end='\r', flush=True) yield v if not quiet: print(f'[{"#"*50}] 100% ') def sync(c): """Stop the watch and synchronize with the command prompt. Sending a random print ensure the final export (of the prompt) does not accidentally match a previously issued prompt. """ tag = ''.join([random.choice(string.ascii_uppercase) for i in range(6)]) c.send('\x03') c.expect('>>> ') c.sendline(f'print("{tag[:3]}""{tag[3:]}")') c.expect(tag) c.expect('>>> ') def unsync(c): """Set the watch running again. There must be an expect (or a sleep) since if we kill the subordinate process too early then the sendline will not have completed. """ c.sendline('wasp.system.run()') c.expect('Watch is running, use Ctrl-C to stop') c.send('\x18') def paste(c, f, verbose=False, chunk=None): docstring = False tosend = [] for ln in f.readlines(): ln = ln.rstrip() # This is a bit loose (definitely not PEP-257 compliant) but # is enough for most code. if ln.lstrip().startswith('"""'): docstring = True if docstring: if ln.rstrip().endswith('"""'): docstring = False continue if ln.lstrip().startswith('#'): continue if ln.strip() == '': continue tosend.append(ln) for ln in pbar(tosend, verbose): if chunk and ln.startswith('class'): chunk() c.sendline(ln) c.expect('=== ') def print_log(logfile): lines = logfile.getvalue().split('\n') lines = [ l.strip('\x04\r') for l in lines ] output = [ l for l in lines if l and l != '>>> ' ] if output: print('~~~') print('\n'.join(output)) print('~~~') def handle_eval(c, cmd): verbose = bool(c.logfile) c.send('\x05') c.expect('=== ') c.sendline(cmd) c.expect('=== ') if not verbose: c.logfile = io.StringIO() c.send('\x04') c.expect('>>> ') if not verbose: print_log(c.logfile) c.logfile.close() c.logfile = None def handle_exec(c, fname): verbose = bool(c.logfile) log = io.StringIO() def chunk(): if not verbose: c.logfile = log c.send('\x04') c.expect('>>> ') if not verbose: c.logfile = None c.send('\x05') c.expect('=== ') with open(fname) as f: if not verbose: print(f'Preparing to run {fname}:') c.send('\x05') c.expect('=== ') paste(c, f, verbose, chunk) if not verbose: c.logfile = log c.send('\x04') c.expect('>>> ') if not verbose: c.logfile = None print_log(log) log.close() def handle_reset(c): c.send('\x05') c.expect('=== ') c.sendline('import machine') c.sendline('machine.reset()') c.expect('=== ') c.send('\x04') def handle_rtc(c): # Wait for the clock to tick over to the next second now = then = time.localtime() while now[5] == then[5]: now = time.localtime() # Set the time c.sendline(f'watch.rtc.set_localtime(({now[0]}, {now[1]}, {now[2]}, {now[3]}, {now[4]}, {now[5]}, {now[6]}, {now[7]}))') c.expect('>>> ') def handle_upload(c, fname): verbose = bool(c.logfile) c.sendline('from shell import upload') c.expect('>>> ') with open(fname) as f: if not verbose: print(f'Uploading {fname}:') c.sendline(f'upload("{os.path.basename(fname)}")') c.expect('=== ') paste(c, f, verbose) c.send('\x04') c.expect('>>> ') if __name__ == '__main__': parser = argparse.ArgumentParser( description='WASP command and control client') parser.add_argument('--console', action='store_true', help='Launch a REPL session') parser.add_argument('--device', help='Connect only to a specific named device') parser.add_argument('--exec', help='Execute the contents of a file') parser.add_argument('--eval', help='Execute the provided python string') parser.add_argument('--reset', action='store_true', help="Reboot the device (and don't stay in bootloader mode)") parser.add_argument('--rtc', action='store_true', help='Set the time on the WASP device') parser.add_argument('--upload', help='Copy the specified file to the WASP device') parser.add_argument('--verbose', action='store_true', help='Log interaction with the WASP device') args = parser.parse_args() device_name = args.device pynus = os.path.dirname(sys.argv[0]) + '/pynus/pynus.py' console = pexpect.spawn(pynus, encoding='UTF-8') if args.verbose: console.logfile = sys.stdout console.expect('Connect') console.expect('Exit console using Ctrl-X') time.sleep(0.5) sync(console) if args.rtc: handle_rtc(console) if args.exec: handle_exec(console, args.exec) if args.eval: handle_eval(console, args.eval) if args.upload: handle_upload(console, args.upload) if args.console: console.close() os.execl(pynus, pynus) if args.reset: handle_reset(console) sys.exit(0) unsync(console)