From 118280191aea8eae53a808f9c82d87079fe65d1a Mon Sep 17 00:00:00 2001 From: "William D. Jones" Date: Wed, 27 Mar 2019 21:32:29 -0400 Subject: [PATCH 01/15] Begin converting taskloop to use asyncio module. Signed-off-by: William D. Jones --- sbysrc/sby_core.py | 90 +++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 89 insertions(+), 1 deletion(-) diff --git a/sbysrc/sby_core.py b/sbysrc/sby_core.py index 4ec2ab36..3c9c1f95 100644 --- a/sbysrc/sby_core.py +++ b/sbysrc/sby_core.py @@ -20,6 +20,7 @@ if os.name == "posix": import resource, fcntl import subprocess +import asyncio from shutil import copyfile, rmtree from select import select from time import time, localtime, sleep @@ -200,6 +201,56 @@ def preexec_fn(): next_task.poll() return + async def output_async(self): + while True: + outs = await self.p.stdout.readline() + await asyncio.sleep(0) # https://bugs.python.org/issue24532 + outs = outs.decode("utf-8") + if len(outs) == 0: break + if outs[-1] != '\n': + self.linebuffer += outs + break + outs = (self.linebuffer + outs).strip() + self.linebuffer = "" + self.handle_output(outs) + + async def maybe_spawn_async(self): + if self.finished or self.terminated: + return + + if not self.running: + for dep in self.deps: + if not dep.finished: + return + + self.job.log("%s: starting process \"%s\"" % (self.info, self.cmdline)) + self.p = await asyncio.create_subprocess_shell(self.cmdline, stdin=asyncio.subprocess.DEVNULL, + stdout=asyncio.subprocess.PIPE, + stderr=(asyncio.subprocess.STDOUT if self.logstderr else None)) + self.job.tasks_pending.remove(self) + self.job.tasks_running.append(self) + self.running = True + asyncio.ensure_future(self.output_async()) + self.fut = asyncio.ensure_future(self.p.wait()) + + async def shutdown_and_notify_async(self): + self.job.log("%s: finished (returncode=%d)" % (self.info, self.p.returncode)) + self.job.tasks_running.remove(self) + self.running = False + + self.handle_exit(self.p.returncode) + + if self.checkretcode and self.p.returncode != 0: + self.job.status = "ERROR" + self.job.log("%s: job failed. ERROR." % self.info) + self.terminated = True + self.job.terminate() + return + + self.finished = True + for next_task in self.notify: + await next_task.maybe_spawn_async() + return class SbyAbort(BaseException): pass @@ -283,6 +334,41 @@ def taskloop(self): self.status = "TIMEOUT" self.terminate(timeout=True) + def taskloop_async(self): + if os.name != "posix": + loop = asyncio.ProactorEventLoop() + asyncio.set_event_loop(loop) + loop = asyncio.get_event_loop() + loop.set_debug(enabled=True) + poll_fut = asyncio.ensure_future(self.task_poller()) + loop.run_until_complete(poll_fut) + + async def task_poller(self): + for task in self.tasks_pending: + await task.maybe_spawn_async() + + while len(self.tasks_running): + task_futs = [] + for task in self.tasks_running: + if task.running: + task_futs.append(task.fut) + (done, pending) = await asyncio.wait(task_futs, return_when=asyncio.FIRST_COMPLETED) + + for task in self.tasks_running: + if task.fut in done: + await task.shutdown_and_notify_async() + + # for task in self.tasks_pending: + # await task.poll_async() + + #if self.opt_timeout is not None: + #total_clock_time = int(time() - self.start_clock_time) + #if total_clock_time > self.opt_timeout: + #self.log("Reached TIMEOUT (%d seconds). Terminating all tasks." % self.opt_timeout) + #self.status = "TIMEOUT" + #self.terminate(timeout=True) + + def log(self, logmessage): tm = localtime() print("SBY {:2d}:{:02d}:{:02d} [{}] {}".format(tm.tm_hour, tm.tm_min, tm.tm_sec, self.workdir, logmessage), flush=True) @@ -655,7 +741,9 @@ def run(self, setupmode): if opt not in self.used_options: self.error("Unused option: {}".format(opt)) - self.taskloop() + # self.taskloop() + self.taskloop_async() + total_clock_time = int(time() - self.start_clock_time) From b5350a2eb908a61925ca44d104f33f23758791d0 Mon Sep 17 00:00:00 2001 From: "William D. Jones" Date: Fri, 29 Mar 2019 02:03:45 -0400 Subject: [PATCH 02/15] Add asynchronous timer to detect timeouts. Signed-off-by: William D. Jones --- sbysrc/sby_core.py | 32 ++++++++++++++++++++++---------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/sbysrc/sby_core.py b/sbysrc/sby_core.py index 3c9c1f95..a57439eb 100644 --- a/sbysrc/sby_core.py +++ b/sbysrc/sby_core.py @@ -21,6 +21,7 @@ import resource, fcntl import subprocess import asyncio +from functools import partial from shutil import copyfile, rmtree from select import select from time import time, localtime, sleep @@ -343,7 +344,26 @@ def taskloop_async(self): poll_fut = asyncio.ensure_future(self.task_poller()) loop.run_until_complete(poll_fut) + async def timekeeper(self): + total_clock_time = int(time() - self.start_clock_time) + + try: + while total_clock_time <= self.opt_timeout: + await asyncio.sleep(1) + total_clock_time = int(time() - self.start_clock_time) + except asyncio.CancelledError: + pass + + def timeout(self, fut): + self.log("Reached TIMEOUT (%d seconds). Terminating all tasks." % self.opt_timeout) + self.status = "TIMEOUT" + self.terminate(timeout=True) + async def task_poller(self): + if self.opt_timeout is not None: + timer_fut = asyncio.ensure_future(self.timekeeper()) + timer_fut.add_done_callback(partial(SbyJob.timeout, self)) + for task in self.tasks_pending: await task.maybe_spawn_async() @@ -358,16 +378,8 @@ async def task_poller(self): if task.fut in done: await task.shutdown_and_notify_async() - # for task in self.tasks_pending: - # await task.poll_async() - - #if self.opt_timeout is not None: - #total_clock_time = int(time() - self.start_clock_time) - #if total_clock_time > self.opt_timeout: - #self.log("Reached TIMEOUT (%d seconds). Terminating all tasks." % self.opt_timeout) - #self.status = "TIMEOUT" - #self.terminate(timeout=True) - + if self.opt_timeout is not None: + timer_fut.cancel() def log(self, logmessage): tm = localtime() From e618a1591d40904fe5e8a5a2c336065e16e3253d Mon Sep 17 00:00:00 2001 From: "William D. Jones" Date: Fri, 29 Mar 2019 03:27:43 -0400 Subject: [PATCH 03/15] Do not spawn tasks in SbyTask constructor; rely on taskloop to do it. Signed-off-by: William D. Jones --- sbysrc/sby_core.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/sbysrc/sby_core.py b/sbysrc/sby_core.py index a57439eb..37bf5ba6 100644 --- a/sbysrc/sby_core.py +++ b/sbysrc/sby_core.py @@ -81,17 +81,12 @@ def __init__(self, job, info, deps, cmdline, logfile=None, logstderr=True, silen self.job.tasks_pending.append(self) for dep in self.deps: - dep.register_dep(self) + if not dep.finished: + dep.notify.append(self) self.output_callback = None self.exit_callback = None - def register_dep(self, next_task): - if self.finished: - next_task.poll() - else: - self.notify.append(next_task) - def log(self, line): if line is not None and (self.noprintregex is None or not self.noprintregex.match(line)): if self.logfile is not None: From 009a8194dd6f22e2681317370e2887af4024dea7 Mon Sep 17 00:00:00 2001 From: "William D. Jones" Date: Fri, 29 Mar 2019 03:46:21 -0400 Subject: [PATCH 04/15] Cleanup dead (non-async) code and rename functions. Signed-off-by: William D. Jones --- sbysrc/sby_core.py | 120 +++------------------------------------------ 1 file changed, 7 insertions(+), 113 deletions(-) diff --git a/sbysrc/sby_core.py b/sbysrc/sby_core.py index 37bf5ba6..1a9bd0bb 100644 --- a/sbysrc/sby_core.py +++ b/sbysrc/sby_core.py @@ -124,79 +124,6 @@ def terminate(self, timeout=False): all_tasks_running.remove(self) self.terminated = True - def poll(self): - if self.finished or self.terminated: - return - - if not self.running: - for dep in self.deps: - if not dep.finished: - return - - if not self.silent: - self.job.log("{}: starting process \"{}\"".format(self.info, self.cmdline)) - - if os.name == "posix": - def preexec_fn(): - signal.signal(signal.SIGINT, signal.SIG_IGN) - os.setpgrp() - - self.p = subprocess.Popen(["/usr/bin/env", "bash", "-c", self.cmdline], stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, - stderr=(subprocess.STDOUT if self.logstderr else None), preexec_fn=preexec_fn) - - fl = fcntl.fcntl(self.p.stdout, fcntl.F_GETFL) - fcntl.fcntl(self.p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK) - - else: - self.p = subprocess.Popen(self.cmdline, shell=True, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, - stderr=(subprocess.STDOUT if self.logstderr else None)) - - self.job.tasks_pending.remove(self) - self.job.tasks_running.append(self) - all_tasks_running.append(self) - self.running = True - return - - while True: - outs = self.p.stdout.readline().decode("utf-8") - if len(outs) == 0: break - if outs[-1] != '\n': - self.linebuffer += outs - break - outs = (self.linebuffer + outs).strip() - self.linebuffer = "" - self.handle_output(outs) - - if self.p.poll() is not None: - if not self.silent: - self.job.log("{}: finished (returncode={})".format(self.info, self.p.returncode)) - self.job.tasks_running.remove(self) - all_tasks_running.remove(self) - self.running = False - - if self.p.returncode == 127: - self.job.status = "ERROR" - if not self.silent: - self.job.log("{}: COMMAND NOT FOUND. ERROR.".format(self.info)) - self.terminated = True - self.job.terminate() - return - - self.handle_exit(self.p.returncode) - - if self.checkretcode and self.p.returncode != 0: - self.job.status = "ERROR" - if not self.silent: - self.job.log("{}: job failed. ERROR.".format(self.info)) - self.terminated = True - self.job.terminate() - return - - self.finished = True - for next_task in self.notify: - next_task.poll() - return - async def output_async(self): while True: outs = await self.p.stdout.readline() @@ -210,7 +137,7 @@ async def output_async(self): self.linebuffer = "" self.handle_output(outs) - async def maybe_spawn_async(self): + async def maybe_spawn(self): if self.finished or self.terminated: return @@ -226,10 +153,10 @@ async def maybe_spawn_async(self): self.job.tasks_pending.remove(self) self.job.tasks_running.append(self) self.running = True - asyncio.ensure_future(self.output_async()) + asyncio.ensure_future(self.output()) self.fut = asyncio.ensure_future(self.p.wait()) - async def shutdown_and_notify_async(self): + async def shutdown_and_notify(self): self.job.log("%s: finished (returncode=%d)" % (self.info, self.p.returncode)) self.job.tasks_running.remove(self) self.running = False @@ -245,7 +172,7 @@ async def shutdown_and_notify_async(self): self.finished = True for next_task in self.notify: - await next_task.maybe_spawn_async() + await next_task.maybe_spawn() return class SbyAbort(BaseException): @@ -299,37 +226,6 @@ def __init__(self, sbyconfig, workdir, early_logs, reusedir): for line in sbyconfig: print(line, file=f) - def taskloop(self): - for task in self.tasks_pending: - task.poll() - - while len(self.tasks_running): - fds = [] - for task in self.tasks_running: - if task.running: - fds.append(task.p.stdout) - - if os.name == "posix": - try: - select(fds, [], [], 1.0) == ([], [], []) - except InterruptedError: - pass - else: - sleep(0.1) - - for task in self.tasks_running: - task.poll() - - for task in self.tasks_pending: - task.poll() - - if self.opt_timeout is not None: - total_clock_time = int(time() - self.start_clock_time) - if total_clock_time > self.opt_timeout: - self.log("Reached TIMEOUT ({} seconds). Terminating all tasks.".format(self.opt_timeout)) - self.status = "TIMEOUT" - self.terminate(timeout=True) - def taskloop_async(self): if os.name != "posix": loop = asyncio.ProactorEventLoop() @@ -360,7 +256,7 @@ async def task_poller(self): timer_fut.add_done_callback(partial(SbyJob.timeout, self)) for task in self.tasks_pending: - await task.maybe_spawn_async() + await task.maybe_spawn() while len(self.tasks_running): task_futs = [] @@ -371,7 +267,7 @@ async def task_poller(self): for task in self.tasks_running: if task.fut in done: - await task.shutdown_and_notify_async() + await task.shutdown_and_notify() if self.opt_timeout is not None: timer_fut.cancel() @@ -748,9 +644,7 @@ def run(self, setupmode): if opt not in self.used_options: self.error("Unused option: {}".format(opt)) - # self.taskloop() - self.taskloop_async() - + self.taskloop() total_clock_time = int(time() - self.start_clock_time) From 826cd1a6f21df8fd9e9f01e6dc4349595ff08dca Mon Sep 17 00:00:00 2001 From: "William D. Jones" Date: Fri, 29 Mar 2019 03:57:47 -0400 Subject: [PATCH 05/15] run() -> init(). Signed-off-by: William D. Jones --- sbysrc/sby_core.py | 8 ++++---- sbysrc/sby_engine_abc.py | 2 +- sbysrc/sby_engine_aiger.py | 2 +- sbysrc/sby_engine_btor.py | 2 +- sbysrc/sby_engine_smtbmc.py | 6 +++--- sbysrc/sby_mode_bmc.py | 8 ++++---- sbysrc/sby_mode_cover.py | 4 ++-- sbysrc/sby_mode_live.py | 4 ++-- sbysrc/sby_mode_prove.py | 8 ++++---- 9 files changed, 22 insertions(+), 22 deletions(-) diff --git a/sbysrc/sby_core.py b/sbysrc/sby_core.py index 1a9bd0bb..fb21d177 100644 --- a/sbysrc/sby_core.py +++ b/sbysrc/sby_core.py @@ -623,19 +623,19 @@ def run(self, setupmode): if self.opt_mode == "bmc": import sby_mode_bmc - sby_mode_bmc.run(self) + sby_mode_bmc.init(self) elif self.opt_mode == "prove": import sby_mode_prove - sby_mode_prove.run(self) + sby_mode_prove.init(self) elif self.opt_mode == "live": import sby_mode_live - sby_mode_live.run(self) + sby_mode_live.init(self) elif self.opt_mode == "cover": import sby_mode_cover - sby_mode_cover.run(self) + sby_mode_cover.init(self) else: assert False diff --git a/sbysrc/sby_engine_abc.py b/sbysrc/sby_engine_abc.py index 49d044d0..900a1125 100644 --- a/sbysrc/sby_engine_abc.py +++ b/sbysrc/sby_engine_abc.py @@ -19,7 +19,7 @@ import re, os, getopt from sby_core import SbyTask -def run(mode, job, engine_idx, engine): +def init(mode, job, engine_idx, engine): abc_opts, abc_command = getopt.getopt(engine[1:], "", []) if len(abc_command) == 0: diff --git a/sbysrc/sby_engine_aiger.py b/sbysrc/sby_engine_aiger.py index dbe05c50..187c8570 100644 --- a/sbysrc/sby_engine_aiger.py +++ b/sbysrc/sby_engine_aiger.py @@ -19,7 +19,7 @@ import re, os, getopt from sby_core import SbyTask -def run(mode, job, engine_idx, engine): +def init(mode, job, engine_idx, engine): opts, solver_args = getopt.getopt(engine[1:], "", []) if len(solver_args) == 0: diff --git a/sbysrc/sby_engine_btor.py b/sbysrc/sby_engine_btor.py index 79c071ce..c971f544 100644 --- a/sbysrc/sby_engine_btor.py +++ b/sbysrc/sby_engine_btor.py @@ -20,7 +20,7 @@ from types import SimpleNamespace from sby_core import SbyTask -def run(mode, job, engine_idx, engine): +def init(mode, job, engine_idx, engine): random_seed = None opts, solver_args = getopt.getopt(engine[1:], "", ["seed="]) diff --git a/sbysrc/sby_engine_smtbmc.py b/sbysrc/sby_engine_smtbmc.py index 6d2ffc47..0ce4286c 100644 --- a/sbysrc/sby_engine_smtbmc.py +++ b/sbysrc/sby_engine_smtbmc.py @@ -19,7 +19,7 @@ import re, os, getopt from sby_core import SbyTask -def run(mode, job, engine_idx, engine): +def init(mode, job, engine_idx, engine): smtbmc_opts = [] nomem_opt = False presat_opt = True @@ -102,9 +102,9 @@ def run(mode, job, engine_idx, engine): if mode == "prove": if not induction_only: - run("prove_basecase", job, engine_idx, engine) + init("prove_basecase", job, engine_idx, engine) if not basecase_only: - run("prove_induction", job, engine_idx, engine) + init("prove_induction", job, engine_idx, engine) return taskname = "engine_{}".format(engine_idx) diff --git a/sbysrc/sby_mode_bmc.py b/sbysrc/sby_mode_bmc.py index 20ffe237..b12756a1 100644 --- a/sbysrc/sby_mode_bmc.py +++ b/sbysrc/sby_mode_bmc.py @@ -19,7 +19,7 @@ import re, os, getopt from sby_core import SbyTask -def run(job): +def init(job): job.handle_int_option("depth", 20) job.handle_int_option("append", 0) job.handle_str_option("aigsmt", "yices") @@ -33,15 +33,15 @@ def run(job): if engine[0] == "smtbmc": import sby_engine_smtbmc - sby_engine_smtbmc.run("bmc", job, engine_idx, engine) + sby_engine_smtbmc.init("bmc", job, engine_idx, engine) elif engine[0] == "abc": import sby_engine_abc - sby_engine_abc.run("bmc", job, engine_idx, engine) + sby_engine_abc.init("bmc", job, engine_idx, engine) elif engine[0] == "btor": import sby_engine_btor - sby_engine_btor.run("bmc", job, engine_idx, engine) + sby_engine_btor.init("bmc", job, engine_idx, engine) else: job.error("Invalid engine '{}' for bmc mode.".format(engine[0])) diff --git a/sbysrc/sby_mode_cover.py b/sbysrc/sby_mode_cover.py index dbefac27..8781b5b6 100644 --- a/sbysrc/sby_mode_cover.py +++ b/sbysrc/sby_mode_cover.py @@ -19,7 +19,7 @@ import re, os, getopt from sby_core import SbyTask -def run(job): +def init(job): job.handle_int_option("depth", 20) job.handle_int_option("append", 0) @@ -32,7 +32,7 @@ def run(job): if engine[0] == "smtbmc": import sby_engine_smtbmc - sby_engine_smtbmc.run("cover", job, engine_idx, engine) + sby_engine_smtbmc.init("cover", job, engine_idx, engine) elif engine[0] == "btor": import sby_engine_btor diff --git a/sbysrc/sby_mode_live.py b/sbysrc/sby_mode_live.py index 9a046cbe..24c86077 100644 --- a/sbysrc/sby_mode_live.py +++ b/sbysrc/sby_mode_live.py @@ -19,7 +19,7 @@ import re, os, getopt from sby_core import SbyTask -def run(job): +def init(job): job.handle_str_option("aigsmt", "yices") job.status = "UNKNOWN" @@ -33,7 +33,7 @@ def run(job): if engine[0] == "aiger": import sby_engine_aiger - sby_engine_aiger.run("live", job, engine_idx, engine) + sby_engine_aiger.init("live", job, engine_idx, engine) else: job.error("Invalid engine '{}' for live mode.".format(engine[0])) diff --git a/sbysrc/sby_mode_prove.py b/sbysrc/sby_mode_prove.py index f3c3ac93..6346119d 100644 --- a/sbysrc/sby_mode_prove.py +++ b/sbysrc/sby_mode_prove.py @@ -19,7 +19,7 @@ import re, os, getopt from sby_core import SbyTask -def run(job): +def init(job): job.handle_int_option("depth", 20) job.handle_int_option("append", 0) job.handle_str_option("aigsmt", "yices") @@ -40,15 +40,15 @@ def run(job): if engine[0] == "smtbmc": import sby_engine_smtbmc - sby_engine_smtbmc.run("prove", job, engine_idx, engine) + sby_engine_smtbmc.init("prove", job, engine_idx, engine) elif engine[0] == "aiger": import sby_engine_aiger - sby_engine_aiger.run("prove", job, engine_idx, engine) + sby_engine_aiger.init("prove", job, engine_idx, engine) elif engine[0] == "abc": import sby_engine_abc - sby_engine_abc.run("prove", job, engine_idx, engine) + sby_engine_abc.init("prove", job, engine_idx, engine) else: job.error("Invalid engine '{}' for prove mode.".format(engine[0])) From b6007aa68c5f759dd7fdee15a428297b54ecfd33 Mon Sep 17 00:00:00 2001 From: "William D. Jones" Date: Sat, 30 Mar 2019 08:13:37 -0400 Subject: [PATCH 06/15] Add Windows workaround for forceful subprocess termination. Signed-off-by: William D. Jones --- sbysrc/sby_core.py | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/sbysrc/sby_core.py b/sbysrc/sby_core.py index fb21d177..69d043c9 100644 --- a/sbysrc/sby_core.py +++ b/sbysrc/sby_core.py @@ -114,14 +114,22 @@ def terminate(self, timeout=False): if self.running: if not self.silent: self.job.log("{}: terminating process".format(self.info)) - if os.name == "posix": + if os.name != "posix": + # self.p.terminate does not actually terminate underlying + # processes on Windows, so use taskkill to kill the shell + # and children. This for some reason does not cause the + # associated future (self.fut) to complete until it is awaited + # on one last time. + subprocess.Popen("taskkill /T /F /PID {}".format(self.p.pid), stdin=subprocess.DEVNULL, + stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + else: try: os.killpg(self.p.pid, signal.SIGTERM) except PermissionError: pass self.p.terminate() self.job.tasks_running.remove(self) - all_tasks_running.remove(self) + self.job.tasks_retired.append(self) self.terminated = True async def output_async(self): @@ -159,6 +167,7 @@ async def maybe_spawn(self): async def shutdown_and_notify(self): self.job.log("%s: finished (returncode=%d)" % (self.info, self.p.returncode)) self.job.tasks_running.remove(self) + self.job.tasks_retired.append(self) self.running = False self.handle_exit(self.p.returncode) @@ -207,6 +216,7 @@ def __init__(self, sbyconfig, workdir, early_logs, reusedir): self.tasks_running = [] self.tasks_pending = [] + self.tasks_retired = [] self.start_clock_time = time() @@ -272,6 +282,14 @@ async def task_poller(self): if self.opt_timeout is not None: timer_fut.cancel() + # Required on Windows. I am unsure why, but subprocesses that were + # terminated will not have their futures complete until awaited on + # one last time. + if os.name != "posix": + for t in self.tasks_retired: + if not t.fut.done(): + await t.fut + def log(self, logmessage): tm = localtime() print("SBY {:2d}:{:02d}:{:02d} [{}] {}".format(tm.tm_hour, tm.tm_min, tm.tm_sec, self.workdir, logmessage), flush=True) From a3204629766daf14eb3f7f29562a45719f9a973b Mon Sep 17 00:00:00 2001 From: "William D. Jones" Date: Tue, 2 Apr 2019 21:20:30 -0400 Subject: [PATCH 07/15] Prevent the timeout callback from running if all tasks completed. Signed-off-by: William D. Jones --- sbysrc/sby_core.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sbysrc/sby_core.py b/sbysrc/sby_core.py index 69d043c9..4555e80b 100644 --- a/sbysrc/sby_core.py +++ b/sbysrc/sby_core.py @@ -263,7 +263,8 @@ def timeout(self, fut): async def task_poller(self): if self.opt_timeout is not None: timer_fut = asyncio.ensure_future(self.timekeeper()) - timer_fut.add_done_callback(partial(SbyJob.timeout, self)) + done_cb = partial(SbyJob.timeout, self) + timer_fut.add_done_callback(done_cb) for task in self.tasks_pending: await task.maybe_spawn() @@ -280,6 +281,7 @@ async def task_poller(self): await task.shutdown_and_notify() if self.opt_timeout is not None: + timer_fut.remove_done_callback(done_cb) timer_fut.cancel() # Required on Windows. I am unsure why, but subprocesses that were From fc563941d00b610cff75d0cd93544d3b29231c15 Mon Sep 17 00:00:00 2001 From: "William D. Jones" Date: Tue, 2 Apr 2019 23:30:06 -0400 Subject: [PATCH 08/15] Remove debug mode from event loop. Signed-off-by: William D. Jones --- sbysrc/sby_core.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sbysrc/sby_core.py b/sbysrc/sby_core.py index 4555e80b..65f3dbb3 100644 --- a/sbysrc/sby_core.py +++ b/sbysrc/sby_core.py @@ -241,7 +241,6 @@ def taskloop_async(self): loop = asyncio.ProactorEventLoop() asyncio.set_event_loop(loop) loop = asyncio.get_event_loop() - loop.set_debug(enabled=True) poll_fut = asyncio.ensure_future(self.task_poller()) loop.run_until_complete(poll_fut) From 021b6a70933d2aeb651872ec67a6ef8789fe9370 Mon Sep 17 00:00:00 2001 From: Ed Bordin Date: Sat, 1 Aug 2020 14:09:33 +1000 Subject: [PATCH 09/15] manually improve merge --- sbysrc/sby_core.py | 41 +++++++++++++++++++++++++++++++-------- sbysrc/sby_engine_btor.py | 2 +- 2 files changed, 34 insertions(+), 9 deletions(-) diff --git a/sbysrc/sby_core.py b/sbysrc/sby_core.py index 65f3dbb3..90e22c2b 100644 --- a/sbysrc/sby_core.py +++ b/sbysrc/sby_core.py @@ -127,12 +127,13 @@ def terminate(self, timeout=False): os.killpg(self.p.pid, signal.SIGTERM) except PermissionError: pass - self.p.terminate() + self.p.terminate() self.job.tasks_running.remove(self) self.job.tasks_retired.append(self) + all_tasks_running.remove(self) self.terminated = True - async def output_async(self): + async def output(self): while True: outs = await self.p.stdout.readline() await asyncio.sleep(0) # https://bugs.python.org/issue24532 @@ -154,27 +155,48 @@ async def maybe_spawn(self): if not dep.finished: return - self.job.log("%s: starting process \"%s\"" % (self.info, self.cmdline)) + if not self.silent: + self.job.log("{}: starting process \"{}\"".format(self.info, self.cmdline)) + if os.name == "posix": + def preexec_fn(): + signal.signal(signal.SIGINT, signal.SIG_IGN) + os.setpgrp() + + subp_kwargs = { "preexec_fn" : preexec_fn } + else: + subp_kwargs = {} self.p = await asyncio.create_subprocess_shell(self.cmdline, stdin=asyncio.subprocess.DEVNULL, stdout=asyncio.subprocess.PIPE, - stderr=(asyncio.subprocess.STDOUT if self.logstderr else None)) + stderr=(asyncio.subprocess.STDOUT if self.logstderr else None), + **subp_kwargs) self.job.tasks_pending.remove(self) self.job.tasks_running.append(self) + all_tasks_running.append(self) self.running = True asyncio.ensure_future(self.output()) self.fut = asyncio.ensure_future(self.p.wait()) async def shutdown_and_notify(self): - self.job.log("%s: finished (returncode=%d)" % (self.info, self.p.returncode)) + if not self.silent: + self.job.log("{}: finished (returncode={})".format(self.info, self.p.returncode)) self.job.tasks_running.remove(self) self.job.tasks_retired.append(self) self.running = False self.handle_exit(self.p.returncode) + if self.p.returncode == 127: + self.job.status = "ERROR" + if not self.silent: + self.job.log("{}: COMMAND NOT FOUND. ERROR.".format(self.info)) + self.terminated = True + self.job.terminate() + return + if self.checkretcode and self.p.returncode != 0: self.job.status = "ERROR" - self.job.log("%s: job failed. ERROR." % self.info) + if not self.silent: + self.job.log("{}: job failed. ERROR.".format(self.info)) self.terminated = True self.job.terminate() return @@ -236,7 +258,7 @@ def __init__(self, sbyconfig, workdir, early_logs, reusedir): for line in sbyconfig: print(line, file=f) - def taskloop_async(self): + def taskloop(self): if os.name != "posix": loop = asyncio.ProactorEventLoop() asyncio.set_event_loop(loop) @@ -255,7 +277,7 @@ async def timekeeper(self): pass def timeout(self, fut): - self.log("Reached TIMEOUT (%d seconds). Terminating all tasks." % self.opt_timeout) + self.log("Reached TIMEOUT ({} seconds). Terminating all tasks.".format(self.opt_timeout)) self.status = "TIMEOUT" self.terminate(timeout=True) @@ -409,6 +431,7 @@ def make_model(self, model_name): print("opt -fast", file=f) print("abc", file=f) print("opt_clean", file=f) + print("dffunmap", file=f) print("stat", file=f) if "_stbv" in model_name: print("write_smt2 -stbv -wires design_{}.smt2".format(model_name), file=f) @@ -438,6 +461,7 @@ def make_model(self, model_name): else: print("opt -fast", file=f) print("delete -output", file=f) + print("dffunmap", file=f) print("stat", file=f) print("write_btor {}-i design_{m}.info design_{m}.btor".format("-c " if self.opt_mode == "cover" else "", m=model_name), file=f) @@ -458,6 +482,7 @@ def make_model(self, model_name): print("opt -full", file=f) print("techmap", file=f) print("opt -fast", file=f) + print("dffunmap", file=f) print("abc -g AND -fast", file=f) print("opt_clean", file=f) print("stat", file=f) diff --git a/sbysrc/sby_engine_btor.py b/sbysrc/sby_engine_btor.py index c971f544..2d1e5beb 100644 --- a/sbysrc/sby_engine_btor.py +++ b/sbysrc/sby_engine_btor.py @@ -190,7 +190,7 @@ def output_callback(line): def exit_callback(retcode): if solver_args[0] == "pono": - assert retcode in [1, 2] + assert retcode in [0, 1, 255] # UNKNOWN = -1, FALSE = 0, TRUE = 1, ERROR = 2 else: assert retcode == 0 if common_state.expected_cex != 0: From 20ec63899a54670863198ba157b368559957364c Mon Sep 17 00:00:00 2001 From: Ed Bordin Date: Sat, 8 Aug 2020 15:12:17 +1000 Subject: [PATCH 10/15] replace taskkill with ctypes/WinAPI implementation --- sbysrc/sby_core.py | 13 ++++--- sbysrc/win_killpg.py | 88 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 95 insertions(+), 6 deletions(-) create mode 100644 sbysrc/win_killpg.py diff --git a/sbysrc/sby_core.py b/sbysrc/sby_core.py index 90e22c2b..77378d97 100644 --- a/sbysrc/sby_core.py +++ b/sbysrc/sby_core.py @@ -19,6 +19,8 @@ import os, re, sys, signal if os.name == "posix": import resource, fcntl +else: + import win_killpg import subprocess import asyncio from functools import partial @@ -116,12 +118,11 @@ def terminate(self, timeout=False): self.job.log("{}: terminating process".format(self.info)) if os.name != "posix": # self.p.terminate does not actually terminate underlying - # processes on Windows, so use taskkill to kill the shell - # and children. This for some reason does not cause the - # associated future (self.fut) to complete until it is awaited - # on one last time. - subprocess.Popen("taskkill /T /F /PID {}".format(self.p.pid), stdin=subprocess.DEVNULL, - stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + # processes on Windows, so we resort to using the WinAPI to + # kill the shell and children. This for some reason does + # not cause the associated future (self.fut) to complete + # until it is awaited on one last time. + win_killpg.win_killpg(self.p.pid) else: try: os.killpg(self.p.pid, signal.SIGTERM) diff --git a/sbysrc/win_killpg.py b/sbysrc/win_killpg.py new file mode 100644 index 00000000..cfd7c3a9 --- /dev/null +++ b/sbysrc/win_killpg.py @@ -0,0 +1,88 @@ +from ctypes import sizeof, windll +from ctypes.wintypes import DWORD, LONG, ULONG +from ctypes import Structure, c_char, pointer, POINTER +import os + +# relevant WinAPI references: +# https://docs.microsoft.com/en-us/windows/win32/api/tlhelp32/nf-tlhelp32-createtoolhelp32snapshot +# https://docs.microsoft.com/en-us/windows/win32/api/tlhelp32/ns-tlhelp32-processentry32 +# https://docs.microsoft.com/en-us/windows/win32/api/tlhelp32/nf-tlhelp32-process32first +# https://docs.microsoft.com/en-us/windows/win32/api/tlhelp32/nf-tlhelp32-process32next + +TH32CS_SNAPPROCESS = 0x00000002 +INVALID_HANDLE_VALUE = -1 + + +class PROCESSENTRY32(Structure): + _fields_ = [ + ("dwSize", DWORD), + ("cntUsage", DWORD), + ("th32ProcessID", DWORD), + ("th32DefaultHeapID", POINTER(ULONG)), + ("th32ModuleID", DWORD), + ("cntThreads", DWORD), + ("th32ParentProcessID", DWORD), + ("pcPriClassBase", LONG), + ("dwFlags", DWORD), + ("szExeFile", c_char * 260), + ] + + +def _all_children(pid, lookup, visited): + if pid in lookup and pid not in visited: + visited.add(pid) + for c in lookup[pid]: + if c not in visited: + visited.add(c) + visited |= _all_children(c, lookup, visited) + return visited + else: + return set() + + +def _update_lookup(pid_lookup, pe): + cpid = pe.contents.th32ProcessID + ppid = pe.contents.th32ParentProcessID + # pid 0 should be the only one that is its own parent + assert ( + cpid == 0 or cpid != ppid + ), "Internal error listing windows processes - process is its own parent" + if ppid not in pid_lookup: + pid_lookup[ppid] = [] + pid_lookup[ppid].append(cpid) + + +def _create_process_lookup(): + handle = windll.kernel32.CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0) + if handle == INVALID_HANDLE_VALUE: + raise RuntimeError( + "Could not snapshot running processes - invalid handle returned" + ) + pe = pointer(PROCESSENTRY32()) + pe.contents.dwSize = sizeof(PROCESSENTRY32) + pid_lookup = {} + if windll.kernel32. (handle, pe) != 0: + _update_lookup(pid_lookup, pe) + while windll.kernel32.Process32Next(handle, pe) != 0: + _update_lookup(pid_lookup, pe) + + return pid_lookup + + +def win_killpg(pid): + """ + Approximate the behaviour of os.killpg(pid, signal.SIGKILL) on Windows. + + Windows processes appear to keep track of their parent rather than their + children, so it is necessary to build a graph of all running processes + first and use this to derive a list of child processes to be killed. + """ + pid_lookup = _create_process_lookup() + to_kill = _all_children(pid, pid_lookup, set()) + for p in to_kill: + try: + # "Any other value for sig will cause the process to be + # unconditionally killed by the TerminateProcess API" + os.kill(p, sig=-1) + except PermissionError as pe: + print("WARNING: error while killing pid {}: {}".format(p, pe)) From daa8329baaa9b3adc2178efa74971729cf88bad1 Mon Sep 17 00:00:00 2001 From: Ed Bordin Date: Sat, 8 Aug 2020 15:35:51 +1000 Subject: [PATCH 11/15] minor fixes after testing --- sbysrc/win_killpg.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sbysrc/win_killpg.py b/sbysrc/win_killpg.py index cfd7c3a9..d6b96b58 100644 --- a/sbysrc/win_killpg.py +++ b/sbysrc/win_killpg.py @@ -61,7 +61,7 @@ def _create_process_lookup(): pe = pointer(PROCESSENTRY32()) pe.contents.dwSize = sizeof(PROCESSENTRY32) pid_lookup = {} - if windll.kernel32. (handle, pe) != 0: + if windll.kernel32.Process32First(handle, pe) != 0: _update_lookup(pid_lookup, pe) while windll.kernel32.Process32Next(handle, pe) != 0: _update_lookup(pid_lookup, pe) @@ -83,6 +83,6 @@ def win_killpg(pid): try: # "Any other value for sig will cause the process to be # unconditionally killed by the TerminateProcess API" - os.kill(p, sig=-1) - except PermissionError as pe: - print("WARNING: error while killing pid {}: {}".format(p, pe)) + os.kill(p, -1) + except (PermissionError, OSError) as e: + print("WARNING: error while killing pid {}: {}".format(p, e)) From 7ac54c6a2e2559b7fb5b46a89f7b83b77d42f1dc Mon Sep 17 00:00:00 2001 From: Ed Bordin Date: Sun, 9 Aug 2020 14:22:31 +1000 Subject: [PATCH 12/15] simplify graph traversal code --- sbysrc/win_killpg.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/sbysrc/win_killpg.py b/sbysrc/win_killpg.py index d6b96b58..364e1756 100644 --- a/sbysrc/win_killpg.py +++ b/sbysrc/win_killpg.py @@ -28,24 +28,23 @@ class PROCESSENTRY32(Structure): ] -def _all_children(pid, lookup, visited): - if pid in lookup and pid not in visited: +def _visit_all_children(pid, lookup, visited): + if pid not in visited: visited.add(pid) - for c in lookup[pid]: - if c not in visited: - visited.add(c) - visited |= _all_children(c, lookup, visited) - return visited - else: - return set() + if pid in lookup: + for c in lookup[pid]: + _visit_all_children(c, lookup, visited) def _update_lookup(pid_lookup, pe): cpid = pe.contents.th32ProcessID ppid = pe.contents.th32ParentProcessID - # pid 0 should be the only one that is its own parent + # pid 0 should be the only one that is its own parent. + # don't add this entry to the graph to avoid an unwanted cycle + if ppid == 0 and cpid == 0: + return assert ( - cpid == 0 or cpid != ppid + cpid != ppid ), "Internal error listing windows processes - process is its own parent" if ppid not in pid_lookup: pid_lookup[ppid] = [] @@ -78,7 +77,8 @@ def win_killpg(pid): first and use this to derive a list of child processes to be killed. """ pid_lookup = _create_process_lookup() - to_kill = _all_children(pid, pid_lookup, set()) + to_kill = set() + _visit_all_children(pid, pid_lookup, to_kill) for p in to_kill: try: # "Any other value for sig will cause the process to be From 031cdb8df38efd3feef806b77137b13642e1e5e9 Mon Sep 17 00:00:00 2001 From: Ed Bordin Date: Sun, 9 Aug 2020 16:44:22 +1000 Subject: [PATCH 13/15] use a console process group and send ctrl+break signal instead --- sbysrc/sby_core.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sbysrc/sby_core.py b/sbysrc/sby_core.py index 77378d97..33f57139 100644 --- a/sbysrc/sby_core.py +++ b/sbysrc/sby_core.py @@ -118,11 +118,12 @@ def terminate(self, timeout=False): self.job.log("{}: terminating process".format(self.info)) if os.name != "posix": # self.p.terminate does not actually terminate underlying - # processes on Windows, so we resort to using the WinAPI to - # kill the shell and children. This for some reason does + # processes on Windows, so send ctrl+break to the process + # group we created. This for some reason does # not cause the associated future (self.fut) to complete # until it is awaited on one last time. - win_killpg.win_killpg(self.p.pid) + + os.kill(self.p.pid, signal.CTRL_BREAK_EVENT) else: try: os.killpg(self.p.pid, signal.SIGTERM) @@ -165,7 +166,7 @@ def preexec_fn(): subp_kwargs = { "preexec_fn" : preexec_fn } else: - subp_kwargs = {} + subp_kwargs = { "creationflags" : subprocess.CREATE_NEW_PROCESS_GROUP } self.p = await asyncio.create_subprocess_shell(self.cmdline, stdin=asyncio.subprocess.DEVNULL, stdout=asyncio.subprocess.PIPE, stderr=(asyncio.subprocess.STDOUT if self.logstderr else None), From 3a776c329f38e9eada3cc14c3d32437d86e0955e Mon Sep 17 00:00:00 2001 From: Ed Bordin Date: Sun, 9 Aug 2020 18:16:32 +1000 Subject: [PATCH 14/15] rm ctypes impl as it is no longer needed --- sbysrc/win_killpg.py | 88 -------------------------------------------- 1 file changed, 88 deletions(-) delete mode 100644 sbysrc/win_killpg.py diff --git a/sbysrc/win_killpg.py b/sbysrc/win_killpg.py deleted file mode 100644 index 364e1756..00000000 --- a/sbysrc/win_killpg.py +++ /dev/null @@ -1,88 +0,0 @@ -from ctypes import sizeof, windll -from ctypes.wintypes import DWORD, LONG, ULONG -from ctypes import Structure, c_char, pointer, POINTER -import os - -# relevant WinAPI references: -# https://docs.microsoft.com/en-us/windows/win32/api/tlhelp32/nf-tlhelp32-createtoolhelp32snapshot -# https://docs.microsoft.com/en-us/windows/win32/api/tlhelp32/ns-tlhelp32-processentry32 -# https://docs.microsoft.com/en-us/windows/win32/api/tlhelp32/nf-tlhelp32-process32first -# https://docs.microsoft.com/en-us/windows/win32/api/tlhelp32/nf-tlhelp32-process32next - -TH32CS_SNAPPROCESS = 0x00000002 -INVALID_HANDLE_VALUE = -1 - - -class PROCESSENTRY32(Structure): - _fields_ = [ - ("dwSize", DWORD), - ("cntUsage", DWORD), - ("th32ProcessID", DWORD), - ("th32DefaultHeapID", POINTER(ULONG)), - ("th32ModuleID", DWORD), - ("cntThreads", DWORD), - ("th32ParentProcessID", DWORD), - ("pcPriClassBase", LONG), - ("dwFlags", DWORD), - ("szExeFile", c_char * 260), - ] - - -def _visit_all_children(pid, lookup, visited): - if pid not in visited: - visited.add(pid) - if pid in lookup: - for c in lookup[pid]: - _visit_all_children(c, lookup, visited) - - -def _update_lookup(pid_lookup, pe): - cpid = pe.contents.th32ProcessID - ppid = pe.contents.th32ParentProcessID - # pid 0 should be the only one that is its own parent. - # don't add this entry to the graph to avoid an unwanted cycle - if ppid == 0 and cpid == 0: - return - assert ( - cpid != ppid - ), "Internal error listing windows processes - process is its own parent" - if ppid not in pid_lookup: - pid_lookup[ppid] = [] - pid_lookup[ppid].append(cpid) - - -def _create_process_lookup(): - handle = windll.kernel32.CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0) - if handle == INVALID_HANDLE_VALUE: - raise RuntimeError( - "Could not snapshot running processes - invalid handle returned" - ) - pe = pointer(PROCESSENTRY32()) - pe.contents.dwSize = sizeof(PROCESSENTRY32) - pid_lookup = {} - if windll.kernel32.Process32First(handle, pe) != 0: - _update_lookup(pid_lookup, pe) - while windll.kernel32.Process32Next(handle, pe) != 0: - _update_lookup(pid_lookup, pe) - - return pid_lookup - - -def win_killpg(pid): - """ - Approximate the behaviour of os.killpg(pid, signal.SIGKILL) on Windows. - - Windows processes appear to keep track of their parent rather than their - children, so it is necessary to build a graph of all running processes - first and use this to derive a list of child processes to be killed. - """ - pid_lookup = _create_process_lookup() - to_kill = set() - _visit_all_children(pid, pid_lookup, to_kill) - for p in to_kill: - try: - # "Any other value for sig will cause the process to be - # unconditionally killed by the TerminateProcess API" - os.kill(p, -1) - except (PermissionError, OSError) as e: - print("WARNING: error while killing pid {}: {}".format(p, e)) From 34a2139a213eab9d502bbf34713519e35d111fc1 Mon Sep 17 00:00:00 2001 From: Ed Bordin Date: Sun, 9 Aug 2020 18:21:38 +1000 Subject: [PATCH 15/15] rm old import --- sbysrc/sby_core.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/sbysrc/sby_core.py b/sbysrc/sby_core.py index 33f57139..a7a08331 100644 --- a/sbysrc/sby_core.py +++ b/sbysrc/sby_core.py @@ -19,8 +19,6 @@ import os, re, sys, signal if os.name == "posix": import resource, fcntl -else: - import win_killpg import subprocess import asyncio from functools import partial