Skip to content

Commit

Permalink
make pylint happy
Browse files Browse the repository at this point in the history
  • Loading branch information
Kyle-Kyle committed Jan 22, 2024
1 parent 4ed96da commit 64e5ea6
Showing 1 changed file with 36 additions and 35 deletions.
71 changes: 36 additions & 35 deletions angrop/rop.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,22 @@
import pickle
import inspect
import logging
from multiprocessing import Pool

import tqdm
from angr import Analysis, register_analysis
from angr.errors import SimEngineError, SimMemoryError
from angr.misc.loggers import CuteFormatter
from angr.analyses.bindiff import differing_constants
from angr.analyses.bindiff import UnmatchedStatementsException
from angr.misc.loggers import CuteFormatter
from angr import Analysis, register_analysis

from . import chain_builder
from . import gadget_analyzer
from . import common
from .arch import get_arch

import pickle
import inspect
import logging
import tqdm

from .errors import RopException
from .rop_gadget import RopGadget, StackPivot

from multiprocessing import Pool

l = logging.getLogger('angrop.rop')
logging.getLogger('pyvex.lifting').setLevel("ERROR")

Expand All @@ -34,7 +32,7 @@ def _disable_loggers():

# global initializer for multiprocessing
def _set_global_gadget_analyzer(rop_gadget_analyzer):
global _global_gadget_analyzer
global _global_gadget_analyzer # pylint: disable=global-statement
_global_gadget_analyzer = rop_gadget_analyzer
_disable_loggers()

Expand All @@ -54,7 +52,8 @@ class ROP(Analysis):
Additionally, all public methods from ChainBuilder are copied into ROP.
"""

def __init__(self, only_check_near_rets=True, max_block_size=None, max_sym_mem_access=None, fast_mode=None, rebase=True, is_thumb=False, kernel_mode=False):
def __init__(self, only_check_near_rets=True, max_block_size=None, max_sym_mem_access=None,
fast_mode=None, rebase=True, is_thumb=False, kernel_mode=False):
"""
Initializes the rop gadget finder
:param only_check_near_rets: If true we skip blocks that are not near rets
Expand Down Expand Up @@ -85,6 +84,7 @@ def __init__(self, only_check_near_rets=True, max_block_size=None, max_sym_mem_a

# get ret locations
self._ret_locations = None
self._cache = {}

# list of RopGadget's
self._gadgets = []
Expand Down Expand Up @@ -137,7 +137,8 @@ def _initialize_gadget_analyzer(self):
l.info("There are %d addresses within %d bytes of a ret",
num_to_check, self.arch.max_block_size)

self._gadget_analyzer = gadget_analyzer.GadgetAnalyzer(self.project, self._fast_mode, arch=self.arch, kernel_mode=self.kernel_mode)
self._gadget_analyzer = gadget_analyzer.GadgetAnalyzer(self.project, self._fast_mode, arch=self.arch,
kernel_mode=self.kernel_mode)

def find_gadgets(self, processes=4, show_progress=True):
"""
Expand All @@ -149,17 +150,17 @@ def find_gadgets(self, processes=4, show_progress=True):
self._initialize_gadget_analyzer()
self._gadgets = []

pool = Pool(processes=processes, initializer=_set_global_gadget_analyzer, initargs=(self._gadget_analyzer,))

it = pool.imap_unordered(run_worker, self._addresses_to_check_with_caching(show_progress), chunksize=5)
for gadget in it:
if gadget is not None:
if isinstance(gadget, RopGadget):
self._gadgets.append(gadget)
elif isinstance(gadget, StackPivot):
self.stack_pivots.append(gadget)
initargs = (self._gadget_analyzer)
with Pool(processes=processes, initializer=_set_global_gadget_analyzer, initargs=initargs) as pool:

pool.close()
it = pool.imap_unordered(run_worker, self._addresses_to_check_with_caching(show_progress), chunksize=5)
for gadget in it:
if gadget is not None:
if isinstance(gadget, RopGadget):
self._gadgets.append(gadget)
elif isinstance(gadget, StackPivot):
self.stack_pivots.append(gadget)

# fix up gadgets from cache
for g in self._gadgets:
Expand Down Expand Up @@ -218,8 +219,9 @@ def load_gadgets(self, path):
Loads gadgets from a file.
:param path: A path for a file where the gadgets are loaded
"""
cache_tuple = pickle.load(open(path, "rb"))
self._load_cache_tuple(cache_tuple)
with open(path, "rb") as f:
cache_tuple = pickle.load(f)
self._load_cache_tuple(cache_tuple)

def set_badbytes(self, badbytes):
"""
Expand Down Expand Up @@ -277,7 +279,8 @@ def chain_builder(self):
if self._chain_builder is not None:
return self._chain_builder
if len(self._gadgets) == 0:
l.warning("Could not find gadgets for %s, check your badbytes and make sure find_gadgets() or load_gadgets() was called.", self.project)
l.warning("Could not find gadgets for %s", self.project)
l.warning("check your badbytes and make sure find_gadgets() or load_gadgets() was called.")
self._chain_builder = chain_builder.ChainBuilder(self.project, self.gadgets, self._duplicates,
self.arch.reg_list, self.arch.base_pointer, self.badbytes,
self.roparg_filler, rebase=self._rebase)
Expand Down Expand Up @@ -306,7 +309,6 @@ def _block_has_ip_relative(self, addr, bl):

def _addresses_to_check_with_caching(self, show_progress=True):
num_addrs = self._num_addresses_to_check()
self._cache = {}
seen = {}

iterable = self._addresses_to_check()
Expand All @@ -325,13 +327,12 @@ def _addresses_to_check_with_caching(self, show_progress=True):
if block_data in seen:
self._cache[seen[block_data]].add(a)
continue
else:
if self._is_jumpkind_valid(bl.vex.jumpkind) and \
len(bl.vex.constant_jump_targets) == 0 and \
not self._block_has_ip_relative(a, bl):
seen[block_data] = a
self._cache[a] = set()
yield a
if self._is_jumpkind_valid(bl.vex.jumpkind) and \
len(bl.vex.constant_jump_targets) == 0 and \
not self._block_has_ip_relative(a, bl):
seen[block_data] = a
self._cache[a] = set()
yield a

def _addresses_to_check(self):
"""
Expand All @@ -354,7 +355,7 @@ def _addresses_to_check(self):
else:
for segment in self.project.loader.main_object.segments:
if segment.is_executable:
l.debug("Analyzing segment with address range: 0x%x, 0x%x" % (segment.min_addr, segment.max_addr))
l.debug("Analyzing segment with address range: 0x%x, 0x%x", segment.min_addr, segment.max_addr)
for addr in range(segment.min_addr, segment.max_addr):
yield addr

Expand Down Expand Up @@ -416,7 +417,7 @@ def _get_ret_locations_by_string(self):
uses a string filter to find the return instructions
:return: all the locations in the binary with a ret instruction
"""
if self.project.arch.linux_name == "x86_64" or self.project.arch.linux_name == "i386":
if self.project.arch.linux_name in ("x86_64", "i386"):
ret_instructions = {b"\xc2", b"\xc3", b"\xca", b"\xcb"}
else:
raise RopException("Only have ret strings for i386 and x86_64")
Expand Down

0 comments on commit 64e5ea6

Please sign in to comment.