angr之cle加载模块

 

最近看到关于angr这一框架的分析,但对于某些部分讲的比较模糊,于是就根据个人理解重新写入一篇分析,如果有理解错误的地方,希望大家指正.

在project模块中,如果初始化参数时,参数不是cle.loader类,则使用cle模块的loader来加载二进制模块

 cle.Loader(thing, **load_options)

在angr中首次调用loader类

 

loader的作用

使用loader后可以查看加载文件的属性 以及一些共享库

 

loader类的初始化

hasattr(),判断对象是否有某一属性

先解析传入参数的类型,路径或者文件对象

__init__(self, 
        main_binary, 
        auto_load_libs=True,
        concrete_target = None,
        force_load_libs=(), 
        skip_libs=(),
        main_opts=None,
        lib_opts=None, 
        ld_path=(), 
        use_system_libs=True,
        ignore_import_version_numbers=True,
        case_insensitive=False, 
        rebase_granularity=0x100000,
        except_missing_libs=False,
        aslr=False,
        perform_relocations=True,
        load_debug_info=False,
        page_size=0x1,
        preload_libs=(),
        arch=None):

初始化参数 在最后调用下面的方法 _internal_load 进行初始化

self._internal_load(main_binary, *preload_libs , *force_load_libs, preloading=(main_binary, *preload_libs))

定义如下

  def _internal_load(self, *args, preloading=()):
  • 递归的加载依赖,知道所有依赖满足
  • 解决符号依赖
  • 布局地址空间
  • 映射到内存
  • 实现重定位

首先遍历参数,加载文件,根据传入参数可得 分别是main_binary 等一系列需要处理的文件

for main_spec in args:  
    is_preloading = any(spec is main_spec for spec in preloading)
    if self.find_object(main_spec, extra_objects=objects) is not None:
        l.info("Skipping load request %s - already loaded", main_spec)
        continue

检查当前文件是否在已预加载目录中

find_object 如果给定的文件被加载 则返回obj本身否则返回none

def find_object(self, spec, extra_objects=()):
        """
        If the given library specification has been loaded, return its object, otherwise return None.
        """
        if isinstance(spec, Backend):
            for obj in self.all_objects:
                if obj is spec:
                    return obj
            return None

all_objects在初始化时定义 默认为空 如果没有进行预加载 则通过_load_object_isolated加载文件

         obj = self._load_object_isolated(main_spec)#加载文件

进入_load_object_isolated

def _load_object_isolated(self, spec):
     if isinstance(spec, Backend):
            return spec
     elif hasattr(spec, 'read') and hasattr(spec, 'seek'):
        .....
     elif type(spec) in (bytes, str):
        binary = self._search_load_path(spec) # this is allowed to cheat and do partial static loading
        l.debug("... using full path %s", binary)
        binary_stream = open(binary, 'rb')

分别对字符类型 Backend类型 文件流类型进行判断处理,最终将binary_stream作为结果输出

 try:
            # STEP 2: collect options 选项
            if self.main_object is None:
                options = dict(self._main_opts)
            else:
                for ident in self._possible_idents(binary_stream if binary is None else binary): # also allowed to cheat
                    if ident in self._lib_opts:
                        options = dict(self._lib_opts[ident])
                        break
                else:
                    options = {}

进入_possible_idents ,在这里尝试通过binary_stream获取backend.

elif hasattr(spec, 'read') and hasattr(spec, 'seek'):
            backend_cls = self._static_backend(spec, ignore_hints=True)
            if backend_cls is not None:
                soname = backend_cls.extract_soname(spec)
                if soname is not None:
                    yield soname
                    if self._ignore_import_version_numbers:
                        yield soname.rstrip('.0123456789')

_static_backend会返回 对于文件的正确加载器 如果传递参数是未知的类型 或者 blob类型 则返回none

实现类似于binwalk来处理blob数据

blob是一个储存二进制的文件类型

with stream_or_path(spec) as stream:
            for rear in ALL_BACKENDS.values():
                if rear.is_default and rear.is_compatible(stream):
                    return rear

ALL_BACKENDS_init_.py中 初始为空字典.

backends是cle里面的子项目

支持多种文件类型如cgc elf java macho minidump pe tls 对应每一个文件类型都会通过register_backend来更新

def register_backend(name, cls):
    ALL_BACKENDS.update({name: cls})

因此 ALL_BACKENDS 被更新为cle支持的文件类型后台字典

    def is_compatible(stream):
        stream.seek(0)
        identstring = stream.read(0x1000)
        stream.seek(0)
        if identstring.startswith(b'\x7fELF'):
            if elftools.elf.elffile.ELFFile(stream).header['e_type'] == 'ET_CORE':
                return False
            return True
        return False

每个类型都有一个类似的检查,通过文件头判断文件类型,从而选择对应的backend.

这样就很好理解了,

with stream_or_path(spec) as stream:
            for rear in ALL_BACKENDS.values():
                if rear.is_default and rear.is_compatible(stream):
                    return rear

对于传入的对应文件,遍历支持的backend 并通过is_compatlble检查,从而返回正确的backend ,

返回到_load_object_islated

  # STEP 4: LOAD!
            l.debug("... loading with %s", backend_cls)

            result = backend_cls(binary, binary_stream, is_main_bin=self.main_object is None, loader=self, **options)
            result.close()
            return result#加载

backend_cls 由backend_reslover取得 返回值为 ALL_BACKEND 字典中的值 即文件对应的后台 即对应类 这些类定义在对应backends的文件中

            result = backend_cls(binary, binary_stream, is_main_bin=self.main_object is None, loader=self, **options)

初始化对应后台类 并返回到_internal_load

            obj = self._load_object_isolated(main_spec)#加载文件
            objects.append(obj)
            objects.extend(obj.child_objects)
            dependencies.extend(obj.deps)

obj获取到正确的类 对于不同的文件类型有不同的处理方式 另一种描述方法是 继承自backend类的对应类

obj 和它的子类 添加到objects

以及它的依赖 添加到 dependencies

接下来开始基于正确的后台初始化 main_object为获取到的类

if self.main_object is None:
                # this is technically the first place we can start to initialize things based on platform
                self.main_object = obj
                self.memory = Clemory(obj.arch, root=True)#分配内存

获取对应后台处理线程

                chk_obj = self.main_object if isinstance(self.main_object, ELFCore) or not self.main_object.child_objects else self.main_object.child_objects[0]
                if isinstance(chk_obj, ELFCore):
                    self.tls = ELFCoreThreadManager(self, obj.arch)
                elif isinstance(obj, Minidump):
                    self.tls = MinidumpThreadManager(self, obj.arch)
                elif isinstance(chk_obj, MetaELF):
                    self.tls = ELFThreadManager(self, obj.arch)
                elif isinstance(chk_obj, PE):
                    self.tls = PEThreadManager(self, obj.arch)
                else:
                    self.tls = ThreadManager(self, obj.arch)

跟进一个ELFCoreThreadManager__init__

def __init__(self, loader, arch, **kwargs):  # pylint: disable=unused-argument
        self.loader = loader
        self.arch = arch
        self.threads = [ELFCoreThread(loader, arch, threadinfo) for threadinfo in loader.main_object._threads]

loader.main_object._threads同样在backend处理 这里猜测它是对于传输信息tls的处理 在深入backend后进行进一步处理

接下来处理以同样的方式处理依赖

在对于依赖这一部分的处理暂时不做深入考虑

然后映射到内存

        for obj in objects:
            self._map_object(obj)

处理重定位

      if self._perform_relocations:
            for obj in ordered_objects:
                obj.relocate()

通过名称插入合适的映射

   for obj in objects:
            self.requested_names.update(obj.deps)
            for ident in self._possible_idents(obj):
                self._satisfied_deps[ident] = obj

            if obj.provides is not None:
                self.shared_objects[obj.provides] = obj

        return objects

处理后返回对应object

(完)