diff --git a/sbysrc/sby_core.py b/sbysrc/sby_core.py index 4ec2ab36..a7a08331 100644 --- a/sbysrc/sby_core.py +++ b/sbysrc/sby_core.py @@ -20,6 +20,8 @@ if os.name == "posix": import resource, fcntl import subprocess +import asyncio +from functools import partial from shutil import copyfile, rmtree from select import select from time import time, localtime, sleep @@ -79,17 +81,12 @@ def __init__(self, job, info, deps, cmdline, logfile=None, logstderr=True, silen self.job.tasks_pending.append(self) for dep in self.deps: - dep.register_dep(self) + if not dep.finished: + dep.notify.append(self) self.output_callback = None self.exit_callback = None - def register_dep(self, next_task): - if self.finished: - next_task.poll() - else: - self.notify.append(next_task) - def log(self, line): if line is not None and (self.noprintregex is None or not self.noprintregex.match(line)): if self.logfile is not None: @@ -117,17 +114,39 @@ def terminate(self, timeout=False): if self.running: if not self.silent: self.job.log("{}: terminating process".format(self.info)) - if os.name == "posix": + if os.name != "posix": + # self.p.terminate does not actually terminate underlying + # processes on Windows, so send ctrl+break to the process + # group we created. This for some reason does + # not cause the associated future (self.fut) to complete + # until it is awaited on one last time. + + os.kill(self.p.pid, signal.CTRL_BREAK_EVENT) + else: try: os.killpg(self.p.pid, signal.SIGTERM) except PermissionError: pass - self.p.terminate() + self.p.terminate() self.job.tasks_running.remove(self) + self.job.tasks_retired.append(self) all_tasks_running.remove(self) self.terminated = True - def poll(self): + async def output(self): + while True: + outs = await self.p.stdout.readline() + await asyncio.sleep(0) # https://bugs.python.org/issue24532 + outs = outs.decode("utf-8") + if len(outs) == 0: break + if outs[-1] != '\n': + self.linebuffer += outs + break + outs = (self.linebuffer + outs).strip() + self.linebuffer = "" + self.handle_output(outs) + + async def maybe_spawn(self): if self.finished or self.terminated: return @@ -138,68 +157,54 @@ def poll(self): if not self.silent: self.job.log("{}: starting process \"{}\"".format(self.info, self.cmdline)) - if os.name == "posix": def preexec_fn(): signal.signal(signal.SIGINT, signal.SIG_IGN) os.setpgrp() - self.p = subprocess.Popen(["/usr/bin/env", "bash", "-c", self.cmdline], stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, - stderr=(subprocess.STDOUT if self.logstderr else None), preexec_fn=preexec_fn) - - fl = fcntl.fcntl(self.p.stdout, fcntl.F_GETFL) - fcntl.fcntl(self.p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK) - + subp_kwargs = { "preexec_fn" : preexec_fn } else: - self.p = subprocess.Popen(self.cmdline, shell=True, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, - stderr=(subprocess.STDOUT if self.logstderr else None)) - + subp_kwargs = { "creationflags" : subprocess.CREATE_NEW_PROCESS_GROUP } + self.p = await asyncio.create_subprocess_shell(self.cmdline, stdin=asyncio.subprocess.DEVNULL, + stdout=asyncio.subprocess.PIPE, + stderr=(asyncio.subprocess.STDOUT if self.logstderr else None), + **subp_kwargs) self.job.tasks_pending.remove(self) self.job.tasks_running.append(self) all_tasks_running.append(self) self.running = True - return + asyncio.ensure_future(self.output()) + self.fut = asyncio.ensure_future(self.p.wait()) + + async def shutdown_and_notify(self): + if not self.silent: + self.job.log("{}: finished (returncode={})".format(self.info, self.p.returncode)) + self.job.tasks_running.remove(self) + self.job.tasks_retired.append(self) + self.running = False - while True: - outs = self.p.stdout.readline().decode("utf-8") - if len(outs) == 0: break - if outs[-1] != '\n': - self.linebuffer += outs - break - outs = (self.linebuffer + outs).strip() - self.linebuffer = "" - self.handle_output(outs) + self.handle_exit(self.p.returncode) - if self.p.poll() is not None: + if self.p.returncode == 127: + self.job.status = "ERROR" if not self.silent: - self.job.log("{}: finished (returncode={})".format(self.info, self.p.returncode)) - self.job.tasks_running.remove(self) - all_tasks_running.remove(self) - self.running = False - - if self.p.returncode == 127: - self.job.status = "ERROR" - if not self.silent: - self.job.log("{}: COMMAND NOT FOUND. ERROR.".format(self.info)) - self.terminated = True - self.job.terminate() - return - - self.handle_exit(self.p.returncode) - - if self.checkretcode and self.p.returncode != 0: - self.job.status = "ERROR" - if not self.silent: - self.job.log("{}: job failed. ERROR.".format(self.info)) - self.terminated = True - self.job.terminate() - return - - self.finished = True - for next_task in self.notify: - next_task.poll() + self.job.log("{}: COMMAND NOT FOUND. ERROR.".format(self.info)) + self.terminated = True + self.job.terminate() return + if self.checkretcode and self.p.returncode != 0: + self.job.status = "ERROR" + if not self.silent: + self.job.log("{}: job failed. ERROR.".format(self.info)) + self.terminated = True + self.job.terminate() + return + + self.finished = True + for next_task in self.notify: + await next_task.maybe_spawn() + return class SbyAbort(BaseException): pass @@ -233,6 +238,7 @@ def __init__(self, sbyconfig, workdir, early_logs, reusedir): self.tasks_running = [] self.tasks_pending = [] + self.tasks_retired = [] self.start_clock_time = time() @@ -253,35 +259,59 @@ def __init__(self, sbyconfig, workdir, early_logs, reusedir): print(line, file=f) def taskloop(self): + if os.name != "posix": + loop = asyncio.ProactorEventLoop() + asyncio.set_event_loop(loop) + loop = asyncio.get_event_loop() + poll_fut = asyncio.ensure_future(self.task_poller()) + loop.run_until_complete(poll_fut) + + async def timekeeper(self): + total_clock_time = int(time() - self.start_clock_time) + + try: + while total_clock_time <= self.opt_timeout: + await asyncio.sleep(1) + total_clock_time = int(time() - self.start_clock_time) + except asyncio.CancelledError: + pass + + def timeout(self, fut): + self.log("Reached TIMEOUT ({} seconds). Terminating all tasks.".format(self.opt_timeout)) + self.status = "TIMEOUT" + self.terminate(timeout=True) + + async def task_poller(self): + if self.opt_timeout is not None: + timer_fut = asyncio.ensure_future(self.timekeeper()) + done_cb = partial(SbyJob.timeout, self) + timer_fut.add_done_callback(done_cb) + for task in self.tasks_pending: - task.poll() + await task.maybe_spawn() while len(self.tasks_running): - fds = [] + task_futs = [] for task in self.tasks_running: if task.running: - fds.append(task.p.stdout) - - if os.name == "posix": - try: - select(fds, [], [], 1.0) == ([], [], []) - except InterruptedError: - pass - else: - sleep(0.1) + task_futs.append(task.fut) + (done, pending) = await asyncio.wait(task_futs, return_when=asyncio.FIRST_COMPLETED) for task in self.tasks_running: - task.poll() + if task.fut in done: + await task.shutdown_and_notify() - for task in self.tasks_pending: - task.poll() + if self.opt_timeout is not None: + timer_fut.remove_done_callback(done_cb) + timer_fut.cancel() - if self.opt_timeout is not None: - total_clock_time = int(time() - self.start_clock_time) - if total_clock_time > self.opt_timeout: - self.log("Reached TIMEOUT ({} seconds). Terminating all tasks.".format(self.opt_timeout)) - self.status = "TIMEOUT" - self.terminate(timeout=True) + # Required on Windows. I am unsure why, but subprocesses that were + # terminated will not have their futures complete until awaited on + # one last time. + if os.name != "posix": + for t in self.tasks_retired: + if not t.fut.done(): + await t.fut def log(self, logmessage): tm = localtime() @@ -401,6 +431,7 @@ def make_model(self, model_name): print("opt -fast", file=f) print("abc", file=f) print("opt_clean", file=f) + print("dffunmap", file=f) print("stat", file=f) if "_stbv" in model_name: print("write_smt2 -stbv -wires design_{}.smt2".format(model_name), file=f) @@ -430,6 +461,7 @@ def make_model(self, model_name): else: print("opt -fast", file=f) print("delete -output", file=f) + print("dffunmap", file=f) print("stat", file=f) print("write_btor {}-i design_{m}.info design_{m}.btor".format("-c " if self.opt_mode == "cover" else "", m=model_name), file=f) @@ -450,6 +482,7 @@ def make_model(self, model_name): print("opt -full", file=f) print("techmap", file=f) print("opt -fast", file=f) + print("dffunmap", file=f) print("abc -g AND -fast", file=f) print("opt_clean", file=f) print("stat", file=f) @@ -634,19 +667,19 @@ def run(self, setupmode): if self.opt_mode == "bmc": import sby_mode_bmc - sby_mode_bmc.run(self) + sby_mode_bmc.init(self) elif self.opt_mode == "prove": import sby_mode_prove - sby_mode_prove.run(self) + sby_mode_prove.init(self) elif self.opt_mode == "live": import sby_mode_live - sby_mode_live.run(self) + sby_mode_live.init(self) elif self.opt_mode == "cover": import sby_mode_cover - sby_mode_cover.run(self) + sby_mode_cover.init(self) else: assert False diff --git a/sbysrc/sby_engine_abc.py b/sbysrc/sby_engine_abc.py index 49d044d0..900a1125 100644 --- a/sbysrc/sby_engine_abc.py +++ b/sbysrc/sby_engine_abc.py @@ -19,7 +19,7 @@ import re, os, getopt from sby_core import SbyTask -def run(mode, job, engine_idx, engine): +def init(mode, job, engine_idx, engine): abc_opts, abc_command = getopt.getopt(engine[1:], "", []) if len(abc_command) == 0: diff --git a/sbysrc/sby_engine_aiger.py b/sbysrc/sby_engine_aiger.py index dbe05c50..187c8570 100644 --- a/sbysrc/sby_engine_aiger.py +++ b/sbysrc/sby_engine_aiger.py @@ -19,7 +19,7 @@ import re, os, getopt from sby_core import SbyTask -def run(mode, job, engine_idx, engine): +def init(mode, job, engine_idx, engine): opts, solver_args = getopt.getopt(engine[1:], "", []) if len(solver_args) == 0: diff --git a/sbysrc/sby_engine_btor.py b/sbysrc/sby_engine_btor.py index 79c071ce..2d1e5beb 100644 --- a/sbysrc/sby_engine_btor.py +++ b/sbysrc/sby_engine_btor.py @@ -20,7 +20,7 @@ from types import SimpleNamespace from sby_core import SbyTask -def run(mode, job, engine_idx, engine): +def init(mode, job, engine_idx, engine): random_seed = None opts, solver_args = getopt.getopt(engine[1:], "", ["seed="]) @@ -190,7 +190,7 @@ def output_callback(line): def exit_callback(retcode): if solver_args[0] == "pono": - assert retcode in [1, 2] + assert retcode in [0, 1, 255] # UNKNOWN = -1, FALSE = 0, TRUE = 1, ERROR = 2 else: assert retcode == 0 if common_state.expected_cex != 0: diff --git a/sbysrc/sby_engine_smtbmc.py b/sbysrc/sby_engine_smtbmc.py index 6d2ffc47..0ce4286c 100644 --- a/sbysrc/sby_engine_smtbmc.py +++ b/sbysrc/sby_engine_smtbmc.py @@ -19,7 +19,7 @@ import re, os, getopt from sby_core import SbyTask -def run(mode, job, engine_idx, engine): +def init(mode, job, engine_idx, engine): smtbmc_opts = [] nomem_opt = False presat_opt = True @@ -102,9 +102,9 @@ def run(mode, job, engine_idx, engine): if mode == "prove": if not induction_only: - run("prove_basecase", job, engine_idx, engine) + init("prove_basecase", job, engine_idx, engine) if not basecase_only: - run("prove_induction", job, engine_idx, engine) + init("prove_induction", job, engine_idx, engine) return taskname = "engine_{}".format(engine_idx) diff --git a/sbysrc/sby_mode_bmc.py b/sbysrc/sby_mode_bmc.py index 20ffe237..b12756a1 100644 --- a/sbysrc/sby_mode_bmc.py +++ b/sbysrc/sby_mode_bmc.py @@ -19,7 +19,7 @@ import re, os, getopt from sby_core import SbyTask -def run(job): +def init(job): job.handle_int_option("depth", 20) job.handle_int_option("append", 0) job.handle_str_option("aigsmt", "yices") @@ -33,15 +33,15 @@ def run(job): if engine[0] == "smtbmc": import sby_engine_smtbmc - sby_engine_smtbmc.run("bmc", job, engine_idx, engine) + sby_engine_smtbmc.init("bmc", job, engine_idx, engine) elif engine[0] == "abc": import sby_engine_abc - sby_engine_abc.run("bmc", job, engine_idx, engine) + sby_engine_abc.init("bmc", job, engine_idx, engine) elif engine[0] == "btor": import sby_engine_btor - sby_engine_btor.run("bmc", job, engine_idx, engine) + sby_engine_btor.init("bmc", job, engine_idx, engine) else: job.error("Invalid engine '{}' for bmc mode.".format(engine[0])) diff --git a/sbysrc/sby_mode_cover.py b/sbysrc/sby_mode_cover.py index dbefac27..8781b5b6 100644 --- a/sbysrc/sby_mode_cover.py +++ b/sbysrc/sby_mode_cover.py @@ -19,7 +19,7 @@ import re, os, getopt from sby_core import SbyTask -def run(job): +def init(job): job.handle_int_option("depth", 20) job.handle_int_option("append", 0) @@ -32,7 +32,7 @@ def run(job): if engine[0] == "smtbmc": import sby_engine_smtbmc - sby_engine_smtbmc.run("cover", job, engine_idx, engine) + sby_engine_smtbmc.init("cover", job, engine_idx, engine) elif engine[0] == "btor": import sby_engine_btor diff --git a/sbysrc/sby_mode_live.py b/sbysrc/sby_mode_live.py index 9a046cbe..24c86077 100644 --- a/sbysrc/sby_mode_live.py +++ b/sbysrc/sby_mode_live.py @@ -19,7 +19,7 @@ import re, os, getopt from sby_core import SbyTask -def run(job): +def init(job): job.handle_str_option("aigsmt", "yices") job.status = "UNKNOWN" @@ -33,7 +33,7 @@ def run(job): if engine[0] == "aiger": import sby_engine_aiger - sby_engine_aiger.run("live", job, engine_idx, engine) + sby_engine_aiger.init("live", job, engine_idx, engine) else: job.error("Invalid engine '{}' for live mode.".format(engine[0])) diff --git a/sbysrc/sby_mode_prove.py b/sbysrc/sby_mode_prove.py index f3c3ac93..6346119d 100644 --- a/sbysrc/sby_mode_prove.py +++ b/sbysrc/sby_mode_prove.py @@ -19,7 +19,7 @@ import re, os, getopt from sby_core import SbyTask -def run(job): +def init(job): job.handle_int_option("depth", 20) job.handle_int_option("append", 0) job.handle_str_option("aigsmt", "yices") @@ -40,15 +40,15 @@ def run(job): if engine[0] == "smtbmc": import sby_engine_smtbmc - sby_engine_smtbmc.run("prove", job, engine_idx, engine) + sby_engine_smtbmc.init("prove", job, engine_idx, engine) elif engine[0] == "aiger": import sby_engine_aiger - sby_engine_aiger.run("prove", job, engine_idx, engine) + sby_engine_aiger.init("prove", job, engine_idx, engine) elif engine[0] == "abc": import sby_engine_abc - sby_engine_abc.run("prove", job, engine_idx, engine) + sby_engine_abc.init("prove", job, engine_idx, engine) else: job.error("Invalid engine '{}' for prove mode.".format(engine[0]))