#!/usr/bin/env python3 # SPDX-License-Identifier: LGPL-3.0-or-later # Copyright (c) 2020 Daniel Thompson import argparse import io import random import os.path import pexpect import time import string import sys def pbar(iterable, quiet=False): step = 100 / len(iterable) for i, v in enumerate(iterable): if not quiet: percent = round(step * i, 1) bar = int(percent) // 2 print(f'[{"#"*bar}{"."*(50-bar)}] {percent}%', end='\r', flush=True) yield v if not quiet: print(f'[{"#"*50}] 100% ') def sync(c): """Stop the watch and synchronize with the command prompt. Sending a random print ensure the final export (of the prompt) does not accidentally match a previously issued prompt. """ tag = ''.join([random.choice(string.ascii_uppercase) for i in range(6)]) c.send('\x03') c.expect('>>> ') c.sendline(f'print("{tag[:3]}""{tag[3:]}")') c.expect(tag) c.expect('>>> ') def unsync(c): """Set the watch running again. There must be an expect (or a sleep) since if we kill the subordinate process too early then the sendline will not have completed. """ c.sendline('wasp.system.run()') c.expect(['Watch is running, use Ctrl-C to stop', 'Watch already running in the background']) c.send('\x18') def paste(c, f, verbose=False, chunk=None): docstring = False tosend = [] for ln in f.readlines(): ln = ln.rstrip() # This is a bit loose (definitely not PEP-257 compliant) but # is enough for most code. if ln.lstrip().startswith('"""'): docstring = True if docstring: if ln.rstrip().endswith('"""'): docstring = False continue if ln.lstrip().startswith('#'): continue if ln.strip() == '': continue tosend.append(ln) for ln in pbar(tosend, verbose): if chunk and ln.startswith('class'): chunk() c.sendline(ln) choice = c.expect(['=== ', 'FATAL: uncaught exception [0-9a-f\r]*\n']) if choice == 1: # Capture and display the error message, then exit if not verbose: print('\n~~~') while choice == 1: if not verbose: print(c.match.group(0).rstrip(), file=sys.stderr) choice = c.expect([pexpect.TIMEOUT, '.*\n'], timeout=2) if not verbose: print('~~~') print('\nPlease reboot your device', file=sys.stderr) sys.exit(16) def print_log(logfile): lines = logfile.getvalue().split('\n') lines = [ l.strip('\x04\r') for l in lines ] output = [ l for l in lines if l and l != '>>> ' ] if output: print('~~~') print('\n'.join(output)) print('~~~') def handle_eval(c, cmd): verbose = bool(c.logfile) c.send('\x05') c.expect('=== ') c.sendline(cmd) c.expect('=== ') if not verbose: c.logfile = io.StringIO() c.send('\x04') c.expect('>>> ') if not verbose: print_log(c.logfile) c.logfile.close() c.logfile = None def handle_exec(c, fname): verbose = bool(c.logfile) log = io.StringIO() def chunk(): if not verbose: c.logfile = log c.send('\x04') c.expect('>>> ') if not verbose: c.logfile = None c.send('\x05') c.expect('=== ') with open(fname) as f: if not verbose: print(f'Preparing to run {fname}:') c.send('\x05') c.expect('=== ') paste(c, f, verbose, chunk) if not verbose: c.logfile = log c.send('\x04') c.expect('>>> ') if not verbose: c.logfile = None print_log(log) log.close() def handle_reset(c, ota=False): cmd = 'reset' if ota: cmd = 'enter_ota_dfu' c.send('\x05') c.expect('=== ') c.sendline('import machine') c.expect('=== ') c.sendline(f'machine.{cmd}()') c.expect('=== ') c.send('\x04') def handle_rtc(c): # Wait for the clock to tick over to the next second now = then = time.localtime() while now[5] == then[5]: now = time.localtime() # Set the time c.sendline(f'watch.rtc.set_localtime(({now[0]}, {now[1]}, {now[2]}, {now[3]}, {now[4]}, {now[5]}, {now[6]}, {now[7]}))') c.expect('>>> ') def check_rtc(c): c.sendline('print(watch.rtc.get_localtime())') c.expect('\(([0-9]+), ([0-9]+), ([0-9]+), ([0-9]+), ([0-9]+), ([0-9]+), ([0-9]+), ([0-9]+)\)') t = time.localtime() watch_hms = (int(c.match[4]), int(c.match[5]), int(c.match[6])) watch_str = f'{watch_hms[0]:02d}:{watch_hms[1]:02d}:{watch_hms[2]:02d}' host_hms = (t.tm_hour, t.tm_min, t.tm_sec) host_str = f'{host_hms[0]:02d}:{host_hms[1]:02d}:{host_hms[2]:02d}' delta = 3600 * (host_hms[0] - watch_hms[0]) + \ 60 * (host_hms[1] - watch_hms[1]) + \ (host_hms[2] - watch_hms[2]) print(f"PC <-> watch: {watch_str} <-> {host_str} (delta {delta})") c.expect('>>> ') def handle_binary_upload(c, fname): verbose = bool(c.logfile) c.sendline(f'f = open("{os.path.basename(fname)}", "wb")') c.expect('>>> ') # Absorb the file to be uploaded with open(fname, 'rb') as f: data = f.read() chunksz = 24 nchunks = len(data) // chunksz lastchunk = len(data) % chunksz if not verbose: print(f'Uploading {fname}:') # Send the data for i in pbar(range(0, chunksz*nchunks, chunksz), verbose): c.sendline(f'f.write({repr(data[i:i+chunksz])})') c.expect('>>> ') if lastchunk: c.sendline(f'f.write({repr(data[-lastchunk:])})') c.expect('>>> ') c.sendline('f.close()') c.expect('>>> ') def handle_upload(c, fname): verbose = bool(c.logfile) c.sendline('from shell import upload') c.expect('>>> ') with open(fname) as f: if not verbose: print(f'Uploading {fname}:') c.sendline(f'upload("{os.path.basename(fname)}")') c.expect('=== ') paste(c, f, verbose) c.send('\x04') c.expect('>>> ') if __name__ == '__main__': parser = argparse.ArgumentParser( description='Wasp-os command and control client') parser.add_argument('--bootloader', action='store_true', help="Reboot into the bootloader mode for OTA update") parser.add_argument('--binary', action='store_true', help="Enable non-ASCII mode for suitable commands (such as upload)") parser.add_argument('--console', action='store_true', help='Launch a REPL session') parser.add_argument('--check-rtc', action='store_true', help='Compare workstation and watch times') parser.add_argument('--device', help='Connect only to a specific named device (or MAC address)') parser.add_argument('--exec', help='Execute the contents of a file') parser.add_argument('--eval', help='Execute the provided python string') parser.add_argument('--reset', action='store_true', help="Reboot the device (and don't stay in bootloader mode)") parser.add_argument('--ota', help="Deliver an OTA update to the device") parser.add_argument('--rtc', action='store_true', help='Set the time on the wasp-os device') parser.add_argument('--upload', help='Copy the specified file to the wasp-os device') parser.add_argument('--verbose', action='store_true', help='Log interaction with the wasp-os device') args = parser.parse_args() if args.device: if ':' in args.device: device_args = ' --address ' + args.device else: device_args = ' --device ' + args.device else: device_args = '' pynus = os.path.dirname(sys.argv[0]) + '/pynus/pynus.py' + device_args console = pexpect.spawn(pynus, encoding='UTF-8') if args.verbose: console.logfile = sys.stdout console.expect('Connect.*\(([0-9A-F:]*)\)') macaddr = console.match.group(1) console.expect('Exit console using Ctrl-X') time.sleep(0.5) sync(console) if args.rtc: handle_rtc(console) if args.check_rtc: check_rtc(console) if args.exec: handle_exec(console, args.exec) if args.eval: handle_eval(console, args.eval) if args.upload: if args.binary: handle_binary_upload(console, args.upload) else: handle_upload(console, args.upload) if args.console: console.close() argv = pynus.split() os.execv(argv[0], argv) if args.ota: handle_reset(console, ota=True) time.sleep(1.0) dfu = os.path.dirname(sys.argv[0]) + '/ota-dfu/dfu.py' os.execl(dfu, dfu, '-z', args.ota, '-a', macaddr, '--legacy') if args.reset: handle_reset(console) sys.exit(0) if args.bootloader: handle_reset(console, ota=True) sys.exit(0) unsync(console)