Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor(r2): add addr wrap and move wrap to utils
@wrap_arg_addr makes function accept name/R2Data as addr and return same func when args is empty

rename: get_fcn_at -> get_fcn
rename: get_bb_at -> get_bb
  • Loading branch information
chinggg committed Dec 30, 2022
commit dd679e945368b4f11cb0b981d9e1e07cdc55eeb5
2 changes: 1 addition & 1 deletion examples/extensions/r2/deflat_r2.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
ql = R2Qiling(['rootfs/x86_linux/bin/test_fla_argv', '1'], 'rootfs/x86_linux', verbose=QL_VERBOSE.DEFAULT)
r2 = ql.r2
# now we can use r2 parsed symbol name instead of address
fcn = r2.get_fcn_at(r2.where('target_function'))
fcn = r2.get_fcn('target_function')
print(fcn)
r2.deflat(fcn)
ql.run()
Expand Down
4 changes: 2 additions & 2 deletions qiling/extensions/r2/deflat.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def _guide_hook(self, ql: Qiling, addr: int, size: int):
ql.emu_stop()
self.hook_data['result'] = False
return
cur_bb = self.r2.get_bb_at(addr)
cur_bb = self.r2.get_bb(addr)
if "force" in self.hook_data and addr in self.hook_data['force']:
if self.hook_data['force'][addr]: # is True
ql.log.info(f"Force execution at cond branch {hex(addr)}")
Expand Down Expand Up @@ -133,7 +133,7 @@ def _search_path(self):
braddr = self._find_branch_in_block(bb)
self.hook_data = {
"startbb": bb,
"func": self.r2.get_fcn_at(self.first_block.addr),
"func": self.r2.get_fcn(self.first_block),
"result": True,
}
ql_bb_start_ea = bb.addr
Expand Down
51 changes: 23 additions & 28 deletions qiling/extensions/r2/r2.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
import re
import libr
from dataclasses import dataclass, field, fields
from functools import cached_property, wraps
from functools import cached_property
from typing import TYPE_CHECKING, Dict, List, Literal, Optional, Pattern, Tuple, Union
from qiling.const import QL_ARCH
from qiling.extensions import trace
from unicorn import UC_PROT_NONE, UC_PROT_READ, UC_PROT_WRITE, UC_PROT_EXEC, UC_PROT_ALL
from .callstack import CallStack
from .deflat import R2Deflator
from .utils import wrap_aaa, wrap_arg_addr

if TYPE_CHECKING:
from qiling.extensions.r2 import R2Qiling
Expand Down Expand Up @@ -279,15 +280,6 @@ def _cmdj(self, cmd: str, r2c = None) -> Union[Dict, List[Dict]]:
def offset(self) -> int:
return self._r2c.contents.offset

def aaa(fun):
@wraps(fun)
def wrapper(self, *args, **kwargs):
if self.analyzed is False:
self._cmd("aaa")
self.analyzed = True
return fun(self, *args, **kwargs)
return wrapper

@cached_property
def binfo(self) -> Dict[str, str]:
return self._cmdj("iIj")
Expand Down Expand Up @@ -316,46 +308,50 @@ def symbols(self) -> Dict[str, Symbol]:
return {dic['name']: Symbol(**dic).vaddr for dic in sym_lst}

@cached_property
@aaa
@wrap_aaa
def functions(self) -> Dict[str, Function]:
fcn_lst = self._cmdj("aflj")
return {dic['name']: Function(**dic) for dic in fcn_lst}

@cached_property
@aaa
@wrap_aaa
def flags(self) -> List[Flag]:
return [Flag(**dic) for dic in self._cmdj("fj")]

@cached_property
@aaa
@wrap_aaa
def xrefs(self) -> List[Xref]:
return [Xref(**dic) for dic in self._cmdj("axj")]

@aaa
@wrap_aaa
@wrap_arg_addr
def get_fcn_bbs(self, addr: int):
'''list basic blocks of function'''
return [BasicBlock(**dic) for dic in self._cmdj(f"afbj @ {addr}")]

@aaa
def get_bb_at(self, addr: int):
@wrap_aaa
@wrap_arg_addr
def get_bb(self, addr: int):
'''get basic block at address'''
try:
dic = self._cmdj(f"afbj. {addr}")[0]
return BasicBlock(**dic)
except IndexError:
pass

@aaa
def get_fcn_at(self, addr: int):
@wrap_aaa
@wrap_arg_addr
def get_fcn(self, addr: int):
try:
dic = self._cmdj(f"afij {addr}")[0] # afi show function information
return Function(**dic)
except IndexError:
pass

@aaa
def anal_op(self, target: Union[int, Instruction]):
addr = target.offset if isinstance(target, Instruction) else target
@wrap_aaa
@wrap_arg_addr
def anal_op(self, addr: int):
'''r2 opcode analysis (detail about an instruction) at address'''
dic = self._cmdj(f"aoj @ {addr}")[0]
return AnalOp(**dic)

Expand Down Expand Up @@ -427,13 +423,12 @@ def _backtrace_fuzzy(self, at: int = None, depth: int = 128) -> Optional[CallSta
cursp += wordsize
return frame

def set_backtrace(self, target: Union[int, str]):
@wrap_arg_addr
def set_backtrace(self, addr: int):
'''Set backtrace at target address before executing'''
if isinstance(target, str):
target = self.where(target)
def bt_hook(__ql: 'R2Qiling', *args):
print(self._backtrace_fuzzy())
self.ql.hook_address(bt_hook, target)
self.ql.hook_address(bt_hook, addr)

def disassembler(self, ql: 'R2Qiling', addr: int, size: int, filt: Pattern[str]=None) -> int:
'''A human-friendly monkey patch of QlArchUtils.disassembler powered by r2, can be used for hook_code
Expand Down Expand Up @@ -468,9 +463,9 @@ def enable_trace(self, mode='full'):
elif mode == 'history':
trace.enable_history_trace(self.ql)

def deflat(self, target: Union[int, R2Data]):
'''Create deflator with self r2 instance, will patch ql code'''
addr = target if isinstance(target, int) else target.start_ea
@wrap_arg_addr
def deflat(self, addr: int):
'''Deflat function at given address, will patch ql code'''
deflator = R2Deflator(self)
deflator.parse_blocks_for_deobf(addr)
deflator._search_path()
Expand Down
28 changes: 28 additions & 0 deletions qiling/extensions/r2/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from functools import wraps


def wrap_aaa(fun):
@wraps(fun)
def wrapper(self, *args, **kwargs):
if self.analyzed is False:
self._cmd("aaa")
self.analyzed = True
return fun(self, *args, **kwargs)
return wrapper

def wrap_arg_addr(fun):
@wraps(fun)
def wrapper(self, *args, **kwargs):
if not args: # just return same func if not args
return fun(self, *args, **kwargs)
# parse first argument to address
target = args[0]
if isinstance(target, int): # first arg is address
addr = target
elif isinstance(target, str): # first arg is name
addr = self.where(args[0])
else: # isinstance(target, R2Data)
addr = target.start_ea
newargs = (addr,) + args[1:]
return fun(self, *newargs, **kwargs)
return wrapper