#!/usr/bin/python3 # SPDX-License-Identifier: LGPL-3.0-or-later # Copyright (c) 2020 Daniel Thompson import argparse import random import os.path import pexpect import time import string import sys def pbar(iterable, quiet=False): step = 100 / len(iterable) for i, v in enumerate(iterable): if not quiet: percent = round(step * i, 1) bar = int(percent) // 2 print(f'[{"="*bar}{"-"*(50-bar)}] {percent}%', end='\r', flush=True) yield v if not quiet: print(f'[{"="*50}] 100% ') def sync(c): """Stop the watch and synchronize with the command prompt. Sending a random print ensure the final export (of the prompt) does not accidentally match a previously issued prompt. """ tag = ''.join([random.choice(string.ascii_uppercase) for i in range(6)]) c.send('\x03') c.expect('>>> ') c.sendline(f'print("{tag[:3]}""{tag[3:]}")') c.expect(tag) c.expect('>>> ') def unsync(c): """Set the watch running again. There must be an expect (or a sleep) since if we kill the subordinate process too early then the sendline will not have completed. """ c.sendline('wasp.system.run()') c.expect('Watch is running, use Ctrl-C to stop') c.send('\x18') def paste(c, f, verbose=False): docstring = False tosend = [] for ln in f.readlines(): ln = ln.rstrip() # This is a bit loose (definitely not PEP-257 compliant) but # is enough for most code. if ln.lstrip().startswith('"""'): docstring = True if docstring: if ln.rstrip().endswith('"""'): docstring = False continue if ln.lstrip().startswith('#'): continue tosend.append(ln) for ln in pbar(tosend, verbose): c.sendline(ln) c.expect('=== ') def handle_eval(c, cmd): verbose = bool(c.logfile) c.send('\x05') c.expect('=== ') c.sendline(cmd) c.expect('=== ') c.logfile = sys.stdout c.send('\x04') c.expect('>>> ') if not verbose: c.logfile = None def handle_exec(c, fname): verbose = bool(c.logfile) with open(fname) as f: if not verbose: print(f'Preparing to run {fname}:') c.send('\x05') c.expect('=== ') paste(c, f, verbose) c.logfile = sys.stdout c.send('\x04') c.expect('>>> ') if not verbose: c.logfile = None def handle_rtc(c): # Wait for the clock to tick over to the next second now = then = time.localtime() while now[5] == then[5]: now = time.localtime() # Set the time c.sendline(f'watch.rtc.set_localtime(({now[0]}, {now[1]}, {now[2]}, {now[3]}, {now[4]}, {now[5]}, {now[6]}, {now[7]}))') c.expect('>>> ') def handle_upload(c, fname): verbose = bool(c.logfile) c.sendline('from shell import upload') c.expect('>>> ') with open(fname) as f: if not verbose: print(f'Uploading {fname}:') c.sendline(f'upload("{os.path.basename(fname)}")') c.expect('=== ') paste(c, f, verbose) c.send('\x04') c.expect('>>> ') if __name__ == '__main__': parser = argparse.ArgumentParser( description='WASP command and control client') parser.add_argument('--console', action='store_true', help='Launch a REPL session') parser.add_argument('--device', help='Connect only to a specific named device') parser.add_argument('--exec', help='Execute the contents of a file') parser.add_argument('--eval', help='Execute the provided python string') parser.add_argument('--rtc', action='store_true', help='Set the time on the WASP device') parser.add_argument('--upload', help='Copy the specified file to the WASP device') parser.add_argument('--verbose', action='store_true', help='Log interaction with the WASP device') args = parser.parse_args() device_name = args.device pynus = os.path.dirname(sys.argv[0]) + '/pynus/pynus.py' console = pexpect.spawn(pynus, encoding='UTF-8') if args.verbose: console.logfile = sys.stdout console.expect('Connect') console.expect('Exit console using Ctrl-X') time.sleep(0.5) sync(console) if args.rtc: handle_rtc(console) if args.exec: handle_exec(console, args.exec) if args.eval: handle_eval(console, args.eval) if args.upload: handle_upload(console, args.upload) if args.console: console.close() os.execl(pynus, pynus) unsync(console)