From 64e5ea69ca407df80fa566f465cd71d797b7283b Mon Sep 17 00:00:00 2001 From: Kyle Zeng Date: Mon, 22 Jan 2024 16:25:32 -0700 Subject: [PATCH] make pylint happy --- angrop/rop.py | 71 ++++++++++++++++++++++++++------------------------- 1 file changed, 36 insertions(+), 35 deletions(-) diff --git a/angrop/rop.py b/angrop/rop.py index f5f02db..2764231 100644 --- a/angrop/rop.py +++ b/angrop/rop.py @@ -1,24 +1,22 @@ +import pickle +import inspect +import logging +from multiprocessing import Pool + +import tqdm +from angr import Analysis, register_analysis from angr.errors import SimEngineError, SimMemoryError +from angr.misc.loggers import CuteFormatter from angr.analyses.bindiff import differing_constants from angr.analyses.bindiff import UnmatchedStatementsException -from angr.misc.loggers import CuteFormatter -from angr import Analysis, register_analysis from . import chain_builder from . import gadget_analyzer from . import common from .arch import get_arch - -import pickle -import inspect -import logging -import tqdm - from .errors import RopException from .rop_gadget import RopGadget, StackPivot -from multiprocessing import Pool - l = logging.getLogger('angrop.rop') logging.getLogger('pyvex.lifting').setLevel("ERROR") @@ -34,7 +32,7 @@ def _disable_loggers(): # global initializer for multiprocessing def _set_global_gadget_analyzer(rop_gadget_analyzer): - global _global_gadget_analyzer + global _global_gadget_analyzer # pylint: disable=global-statement _global_gadget_analyzer = rop_gadget_analyzer _disable_loggers() @@ -54,7 +52,8 @@ class ROP(Analysis): Additionally, all public methods from ChainBuilder are copied into ROP. """ - def __init__(self, only_check_near_rets=True, max_block_size=None, max_sym_mem_access=None, fast_mode=None, rebase=True, is_thumb=False, kernel_mode=False): + def __init__(self, only_check_near_rets=True, max_block_size=None, max_sym_mem_access=None, + fast_mode=None, rebase=True, is_thumb=False, kernel_mode=False): """ Initializes the rop gadget finder :param only_check_near_rets: If true we skip blocks that are not near rets @@ -85,6 +84,7 @@ def __init__(self, only_check_near_rets=True, max_block_size=None, max_sym_mem_a # get ret locations self._ret_locations = None + self._cache = {} # list of RopGadget's self._gadgets = [] @@ -137,7 +137,8 @@ def _initialize_gadget_analyzer(self): l.info("There are %d addresses within %d bytes of a ret", num_to_check, self.arch.max_block_size) - self._gadget_analyzer = gadget_analyzer.GadgetAnalyzer(self.project, self._fast_mode, arch=self.arch, kernel_mode=self.kernel_mode) + self._gadget_analyzer = gadget_analyzer.GadgetAnalyzer(self.project, self._fast_mode, arch=self.arch, + kernel_mode=self.kernel_mode) def find_gadgets(self, processes=4, show_progress=True): """ @@ -149,17 +150,17 @@ def find_gadgets(self, processes=4, show_progress=True): self._initialize_gadget_analyzer() self._gadgets = [] - pool = Pool(processes=processes, initializer=_set_global_gadget_analyzer, initargs=(self._gadget_analyzer,)) - it = pool.imap_unordered(run_worker, self._addresses_to_check_with_caching(show_progress), chunksize=5) - for gadget in it: - if gadget is not None: - if isinstance(gadget, RopGadget): - self._gadgets.append(gadget) - elif isinstance(gadget, StackPivot): - self.stack_pivots.append(gadget) + initargs = (self._gadget_analyzer) + with Pool(processes=processes, initializer=_set_global_gadget_analyzer, initargs=initargs) as pool: - pool.close() + it = pool.imap_unordered(run_worker, self._addresses_to_check_with_caching(show_progress), chunksize=5) + for gadget in it: + if gadget is not None: + if isinstance(gadget, RopGadget): + self._gadgets.append(gadget) + elif isinstance(gadget, StackPivot): + self.stack_pivots.append(gadget) # fix up gadgets from cache for g in self._gadgets: @@ -218,8 +219,9 @@ def load_gadgets(self, path): Loads gadgets from a file. :param path: A path for a file where the gadgets are loaded """ - cache_tuple = pickle.load(open(path, "rb")) - self._load_cache_tuple(cache_tuple) + with open(path, "rb") as f: + cache_tuple = pickle.load(f) + self._load_cache_tuple(cache_tuple) def set_badbytes(self, badbytes): """ @@ -277,7 +279,8 @@ def chain_builder(self): if self._chain_builder is not None: return self._chain_builder if len(self._gadgets) == 0: - l.warning("Could not find gadgets for %s, check your badbytes and make sure find_gadgets() or load_gadgets() was called.", self.project) + l.warning("Could not find gadgets for %s", self.project) + l.warning("check your badbytes and make sure find_gadgets() or load_gadgets() was called.") self._chain_builder = chain_builder.ChainBuilder(self.project, self.gadgets, self._duplicates, self.arch.reg_list, self.arch.base_pointer, self.badbytes, self.roparg_filler, rebase=self._rebase) @@ -306,7 +309,6 @@ def _block_has_ip_relative(self, addr, bl): def _addresses_to_check_with_caching(self, show_progress=True): num_addrs = self._num_addresses_to_check() - self._cache = {} seen = {} iterable = self._addresses_to_check() @@ -325,13 +327,12 @@ def _addresses_to_check_with_caching(self, show_progress=True): if block_data in seen: self._cache[seen[block_data]].add(a) continue - else: - if self._is_jumpkind_valid(bl.vex.jumpkind) and \ - len(bl.vex.constant_jump_targets) == 0 and \ - not self._block_has_ip_relative(a, bl): - seen[block_data] = a - self._cache[a] = set() - yield a + if self._is_jumpkind_valid(bl.vex.jumpkind) and \ + len(bl.vex.constant_jump_targets) == 0 and \ + not self._block_has_ip_relative(a, bl): + seen[block_data] = a + self._cache[a] = set() + yield a def _addresses_to_check(self): """ @@ -354,7 +355,7 @@ def _addresses_to_check(self): else: for segment in self.project.loader.main_object.segments: if segment.is_executable: - l.debug("Analyzing segment with address range: 0x%x, 0x%x" % (segment.min_addr, segment.max_addr)) + l.debug("Analyzing segment with address range: 0x%x, 0x%x", segment.min_addr, segment.max_addr) for addr in range(segment.min_addr, segment.max_addr): yield addr @@ -416,7 +417,7 @@ def _get_ret_locations_by_string(self): uses a string filter to find the return instructions :return: all the locations in the binary with a ret instruction """ - if self.project.arch.linux_name == "x86_64" or self.project.arch.linux_name == "i386": + if self.project.arch.linux_name in ("x86_64", "i386"): ret_instructions = {b"\xc2", b"\xc3", b"\xca", b"\xcb"} else: raise RopException("Only have ret strings for i386 and x86_64")