From c64b6f62cd09af4690107f54c63bf3fa4c58068a Mon Sep 17 00:00:00 2001 From: Jouni Malinen Date: Fri, 27 Dec 2019 17:12:34 +0200 Subject: [PATCH] tests: Use python selector in the parallel-vm.py main loop This gets rid of the loop that was polling for things to do every 0.25 seconds and instead, reacts to any data from VMs as soon as it becomes available. This avoids unnecessary operations when no new data is available and avoids unnecessary waits when new data becomes available more quickly. Signed-off-by: Jouni Malinen --- tests/hwsim/vm/parallel-vm.py | 228 ++++++++++++++++++---------------- 1 file changed, 123 insertions(+), 105 deletions(-) diff --git a/tests/hwsim/vm/parallel-vm.py b/tests/hwsim/vm/parallel-vm.py index a2a60dd4f..07fa3d52e 100755 --- a/tests/hwsim/vm/parallel-vm.py +++ b/tests/hwsim/vm/parallel-vm.py @@ -11,6 +11,7 @@ import curses import fcntl import logging import os +import selectors import subprocess import sys import time @@ -87,7 +88,7 @@ def get_failed(vm): failed += vm[i]['failed'] return failed -def vm_read_stdout(vm, i, test_queue): +def vm_read_stdout(vm, test_queue): global total_started, total_passed, total_failed, total_skipped global rerun_failures global first_run_failures @@ -102,7 +103,7 @@ def vm_read_stdout(vm, i, test_queue): if e.errno == errno.EAGAIN: return False raise - logger.debug("VM[%d] stdout.read[%s]" % (i, out)) + logger.debug("VM[%d] stdout.read[%s]" % (vm['idx'], out)) pending = vm['pending'] + out lines = [] while True: @@ -111,7 +112,7 @@ def vm_read_stdout(vm, i, test_queue): break line = pending[0:pos].rstrip() pending = pending[(pos + 1):] - logger.debug("VM[%d] stdout full line[%s]" % (i, line)) + logger.debug("VM[%d] stdout full line[%s]" % (vm['idx'], line)) if line.startswith("READY"): vm['starting'] = False vm['started'] = True @@ -124,14 +125,15 @@ def vm_read_stdout(vm, i, test_queue): total_failed += 1 vals = line.split(' ') if len(vals) < 2: - logger.info("VM[%d] incomplete FAIL line: %s" % (i, line)) + logger.info("VM[%d] incomplete FAIL line: %s" % (vm['idx'], + line)) name = line else: name = vals[1] - logger.debug("VM[%d] test case failed: %s" % (i, name)) + logger.debug("VM[%d] test case failed: %s" % (vm['idx'], name)) vm['failed'].append(name) if name != vm['current_name']: - logger.info("VM[%d] test result mismatch: %s (expected %s)" % (i, name, vm['current_name'])) + logger.info("VM[%d] test result mismatch: %s (expected %s)" % (vm['idx'], name, vm['current_name'])) else: count = vm['current_count'] if count == 0: @@ -142,7 +144,7 @@ def vm_read_stdout(vm, i, test_queue): elif line.startswith("NOT-FOUND"): ready = True total_failed += 1 - logger.info("VM[%d] test case not found" % i) + logger.info("VM[%d] test case not found" % vm['idx']) elif line.startswith("SKIP"): ready = True total_skipped += 1 @@ -159,7 +161,8 @@ def vm_read_stdout(vm, i, test_queue): vm['pending'] = pending return ready -def start_vm(vm): +def start_vm(vm, sel): + logger.info("VM[%d] starting up" % (vm['idx'] + 1)) vm['starting'] = True vm['proc'] = subprocess.Popen(vm['cmd'], stdin=subprocess.PIPE, @@ -170,6 +173,7 @@ def start_vm(vm): fd = stream.fileno() fl = fcntl.fcntl(fd, fcntl.F_GETFL) fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) + sel.register(stream, selectors.EVENT_READ, vm) def num_vm_starting(): count = 0 @@ -178,6 +182,99 @@ def num_vm_starting(): count += 1 return count +def vm_read_stderr(vm): + try: + err = vm['proc'].stderr.read() + if err != None: + err = err.decode() + if len(err) > 0: + vm['err'] += err + logger.info("VM[%d] stderr.read[%s]" % (vm['idx'], err)) + except IOError as e: + if e.errno != errno.EAGAIN: + raise + +def vm_next_step(_vm, scr, test_queue): + scr.move(_vm['idx'] + 1, 10) + scr.clrtoeol() + if not test_queue: + _vm['proc'].stdin.write(b'\n') + _vm['proc'].stdin.flush() + scr.addstr("shutting down") + logger.info("VM[%d] shutting down" % _vm['idx']) + return + (name, count) = test_queue.pop(0) + _vm['current_name'] = name + _vm['current_count'] = count + _vm['proc'].stdin.write(name.encode() + b'\n') + _vm['proc'].stdin.flush() + scr.addstr(name) + logger.debug("VM[%d] start test %s" % (_vm['idx'], name)) + +def check_vm_start(scr, sel, test_queue): + running = False + updated = False + for i in range(num_servers): + if not vm[i]['proc']: + # Either not yet started or already stopped VM + if test_queue and vm[i]['cmd'] and num_vm_starting() < 2: + scr.move(i + 1, 10) + scr.clrtoeol() + scr.addstr(i + 1, 10, "starting VM") + updated = True + start_vm(vm[i], sel) + else: + continue + + running = True + return running, updated + +def vm_terminated(_vm, scr, sel, test_queue): + updated = False + for stream in [_vm['proc'].stdout, _vm['proc'].stderr]: + sel.unregister(stream) + _vm['proc'] = None + scr.move(_vm['idx'] + 1, 10) + scr.clrtoeol() + log = '{}/{}.srv.{}/console'.format(dir, timestamp, _vm['idx'] + 1) + with open(log, 'r') as f: + if "Kernel panic" in f.read(): + scr.addstr("kernel panic") + logger.info("VM[%d] kernel panic" % _vm['idx']) + updated = True + if test_queue: + num_vm = 0 + for i in range(num_servers): + if _vm['proc']: + num_vm += 1 + if len(test_queue) > num_vm: + scr.addstr("unexpected exit") + logger.info("VM[%d] unexpected exit" % i) + updated = True + return updated + +def update_screen(scr, total_tests): + scr.move(num_servers + 1, 10) + scr.clrtoeol() + scr.addstr("{} %".format(int(100.0 * (total_passed + total_failed + total_skipped) / total_tests))) + scr.addstr(num_servers + 1, 20, + "TOTAL={} STARTED={} PASS={} FAIL={} SKIP={}".format(total_tests, total_started, total_passed, total_failed, total_skipped)) + failed = get_failed(vm) + if len(failed) > 0: + scr.move(num_servers + 2, 0) + scr.clrtoeol() + scr.addstr("Failed test cases: ") + count = 0 + for f in failed: + count += 1 + if count > 30: + scr.addstr('...') + scr.clrtoeol() + break + scr.addstr(f) + scr.addstr(' ') + scr.refresh() + def show_progress(scr): global num_servers global vm @@ -188,10 +285,11 @@ def show_progress(scr): global total_started, total_passed, total_failed, total_skipped global rerun_failures + sel = selectors.DefaultSelector() total_tests = len(tests) logger.info("Total tests: %d" % total_tests) test_queue = [(t, 0) for t in tests] - start_vm(vm[0]) + start_vm(vm[0], sel) scr.leaveok(1) scr.addstr(0, 0, "Parallel test execution status", curses.A_BOLD) @@ -204,105 +302,27 @@ def show_progress(scr): scr.refresh() while True: - running = False updated = False - - for i in range(num_servers): - if not vm[i]['proc']: - if test_queue and vm[i]['cmd'] and num_vm_starting() < 2: - scr.move(i + 1, 10) - scr.clrtoeol() - scr.addstr(i + 1, 10, "starting VM") - updated = True - start_vm(vm[i]) + events = sel.select(timeout=1) + for key, mask in events: + _vm = key.data + if not _vm['proc']: continue - if vm[i]['proc'].poll() is not None: - vm[i]['proc'] = None - scr.move(i + 1, 10) - scr.clrtoeol() - log = '{}/{}.srv.{}/console'.format(dir, timestamp, i + 1) - with open(log, 'r') as f: - if "Kernel panic" in f.read(): - scr.addstr("kernel panic") - logger.info("VM[%d] kernel panic" % i) - updated = True - if test_queue: - num_vm = 0 - for i in range(num_servers): - if vm[i]['proc']: - num_vm += 1 - if len(test_queue) > num_vm: - scr.addstr("unexpected exit") - logger.info("VM[%d] unexpected exit" % i) - updated = True - continue - - running = True - try: - err = vm[i]['proc'].stderr.read() - if err != None: - err = err.decode() - vm[i]['err'] += err - logger.info("VM[%d] stderr.read[%s]" % (i, err)) - except IOError as e: - if e.errno != errno.EAGAIN: - raise - - if vm_read_stdout(vm[i], i, test_queue): - scr.move(i + 1, 10) - scr.clrtoeol() + vm_read_stderr(_vm) + if vm_read_stdout(_vm, test_queue): + vm_next_step(_vm, scr, test_queue) updated = True - if not test_queue: - vm[i]['proc'].stdin.write(b'\n') - vm[i]['proc'].stdin.flush() - scr.addstr("shutting down") - logger.info("VM[%d] shutting down" % i) - continue - else: - (name, count) = test_queue.pop(0) - vm[i]['current_name'] = name - vm[i]['current_count'] = count - vm[i]['proc'].stdin.write(name.encode() + b'\n') - vm[i]['proc'].stdin.flush() - scr.addstr(name) - logger.debug("VM[%d] start test %s" % (i, name)) - - try: - err = vm[i]['proc'].stderr.read() - if err != None: - err = err.decode() - vm[i]['err'] += err - logger.debug("VM[%d] stderr.read[%s]" % (i, err)) - except IOError as e: - if e.errno != errno.EAGAIN: - raise + vm_read_stderr(_vm) + if _vm['proc'].poll() is not None: + if vm_terminated(_vm, scr, sel, test_queue): + updated = True + running, run_update = check_vm_start(scr, sel, test_queue) + if updated or run_update: + update_screen(scr, total_tests) if not running: break - - if updated: - scr.move(num_servers + 1, 10) - scr.clrtoeol() - scr.addstr("{} %".format(int(100.0 * (total_passed + total_failed + total_skipped) / total_tests))) - scr.addstr(num_servers + 1, 20, "TOTAL={} STARTED={} PASS={} FAIL={} SKIP={}".format(total_tests, total_started, total_passed, total_failed, total_skipped)) - failed = get_failed(vm) - if len(failed) > 0: - scr.move(num_servers + 2, 0) - scr.clrtoeol() - scr.addstr("Failed test cases: ") - count = 0 - for f in failed: - count += 1 - if count > 30: - scr.addstr('...') - scr.clrtoeol() - break - scr.addstr(f) - scr.addstr(' ') - - scr.refresh() - - time.sleep(0.25) + sel.close() for i in range(num_servers): if not vm[i]['proc']: @@ -436,9 +456,6 @@ def main(): vm = {} for i in range(0, num_servers): - print("\rStarting virtual machine {}/{}".format(i + 1, num_servers), - end='') - logger.info("Starting virtual machine {}/{}".format(i + 1, num_servers)) cmd = [os.path.join(scriptsdir, 'vm-run.sh'), '--timestamp', str(timestamp), '--ext', 'srv.%d' % (i + 1), @@ -446,6 +463,7 @@ def main(): if args.telnet: cmd += ['--telnet', str(args.telnet + i)] vm[i] = {} + vm[i]['idx'] = i vm[i]['starting'] = False vm[i]['started'] = False vm[i]['cmd'] = cmd