본문 바로가기
파이썬 스터디 과제/파이썬 해킹 프로그래밍

8장-퍼징_2 파일퍼저

by laoching 2015. 2. 15.
728x90
반응형

파일 퍼저

파일 포맷 취약점을 이용하는 공격 벡터는 매우 다양하고 빠르게 변하기 때문에 파일 포맷 파서 자체의 버그를 찾아내는 방향으로 관심을 기울여야 한다.

모든 종류의 다양한 파일 포맷에 대한 변형을 만들 수 있어야 한다.

또 에러를 분석해 그것이 공격 가능한 것인지 디버깅 기능이 필요하다.

에러가 발생할 때마다 이메일로 에러 정보를 보내주는 기능을 구현할 것이다.

첫 번째 단계는 클래스의 골격과 간단한 파일 선택기를 구현하는 것이다.

file_fuzzer.py를 짜자

<textarea name="code" class="brush:python;">

from pydbg import *
from pydbg.defines import *

import utils
import random
import sys
import struct
import threading
import os
import shutil
import time
import getopt

class file_fuzzer:

    def __init__(self, exe_path, ext, notify):

        self.exe_path        =exe_path
        self.ext            =ext
        self.notify_crash    =notify
        self.orig_file        =None
        self.mutated_file    =None
        self.iteration        =0
        self.exe_path         =exe_path
        self.orig_file         =None
        self.mutated_file     =None
        self.iteration         =0
        self.crash             =None
        self.send_notify     =False
        self.pid             =None
        self.in_accessv_handler =False
        self.dbg             =None
        self.running         =False
        self.ready            =False

        self.smtpserver = 'mail.nostarch.com'
        self.recipients = ['jms@bughunter.ca',]
        self.sender     = 'jms@bughunter.ca'
        self.test_cases = ["%s%n%s%n%s%n", "\xff", "\x00", "A"]

    def file_picker(self):

        file_list = os.listdir("examples/")
        list_length = len(file_list)
        file = file_list[random.randint(0, list_length-1)]
        shutil.copy("examples\\%s" % file, "test.%s" % self.ext)

        return file

</textarea>

file_picker 함수는 단순히 파이썬 함수 몇 개를 이용해 디렉토리 내의 파일 리스트를 구하고 그 중 하나의 파일을 임시로 선택한다.

이제는 퍼징을 수행할 애플리케이션을 로드한 수 그 곳에서 에러가 발생하는지 추적하는 슬드와 해당 애플리케이션의 문서 파싱 작업이 완료되면 그것을 종료하는 스레드를 만들어야 한다.

<textarea name="code" class="brush:python;">

def fuzz(self):

        while 1:
            if not self.running:

                self.test_file = self.file_picker()
                self.mutate_file()

                pydbg_thread = threading.Thread(target=self.start_debugger)
                pydbg_thread.setDaemon(0)
                pydbg_thread.start()

                while self.pid == None:
                    time.sleep(1)

                    monitor_thread = threading.Thread(target=self.monitor_debugger)
                    monitor_thread.setDaemon(0)
                    monitor_thread.start()

                    self.iteration += 1

                else :
                    time.sleep(1)


    def start_debugger(self):

        print "[*] starting debugger for iteration : %d" % self.iteration
        self.running = True
        self.dbg = pydbg()

        self.dbg.set_callback(EXCEPTION_ACCESS_VIOLATION, self.check_accessv)
        pid=self.dbg.load(self.exe_path, "test.%s" % self.ext)

        self.pid = self.dbg.pid
        self.dbg.run()

    def check_accessv(self,dbg):

        if dbg.dbg.u.Exception.dwFirstChance:

            return DBG_CONTINUE

        print"[*] woot! handling an access violation"
        self.in_accessv_handler = True
        crash_bin = utils.crash_binning.crash_binning()
        crash_bin.record_crash(dbg)
        self.crash = crash_bin.crash_synopsis()

        crash_fd = open("crashes\\crash-%d" % self.iteration, "w")
        crash_fd.write(self.crash)

        shutil.copy("test.%s" % self.ext, "crashes\\%d.%s" % (self.iteration, self.ext))
        shutil.copy("examples\\%s" % self.test_file, "crashes\\%d_orig.%s" % (self.iteration, self.ext))
        self.dbg.terminate_process()
        self.in_accessv_handler = False
        self.running = False

        return DBG_EXCEPTION_NOT_HANDLED

    def monitor_debugger(self):

        counter = 0

        print "[*] monitor_thread for pid : %d" % self.pid,
        while counter < 3:
            time.sleep(1)
            print counter,
            counter +=1

        if self.in_accessv_handler != True:
            time.sleep(1)
            self.dbg.terminate_process()
            self.pid = None
            self.running = False
        else :
            print"[*] The access violation handler is doing its business"
            while self.running:
                time.sleep(1)


    def notify(self):

        crash_message = "from : %s\r\n\r\nTo:\r\n\r\nIteration:%d\n\nOutput:\n\n %s" % (self.sender,self.iteration, self.crash)

        session = smtplib.SMTP(smtpserver)
        session.sendmail(sender, recipients, crash_message)
        session.quit()

        return
</textarea>

fuzz함수 로직을 살펴보자

먼저 퍼징을 수행하고 있는지 확인한다.

변형을 가할 문서를 선택한뒤 그것을 mutate_file 함수에 전달한다.

변형한 문서에 대한 파싱을 수행할 애플리케이션을 실행시키는 디버거 스레드를 생성한다.

PID가 등록되면 적당한 시간이 흐른 후 해당 애플리케이션을 종료시키기 위한 모니터링 스레드를 생성한다.

파일에 변형을 가하는 mutate_file 함수를 추가하자.

<textarea name="code" class="brush:python;">

    def mutate_file(self):

        fd=open("test.%s" % self.ext, "rb")
        stream = fd.read()
        fd.close()

        test_case = self.test_cases[random.randint(0,len(self,test_cases)-1)]
        stream_length = len(stream)

        rand_offset        =random.randit(0, stream_length -1 )
        rand_len         =random.randit(1, 1000)

        test_case = test_case * rand_len

        fuzz_file = stream[0:rand_offset]
        fuzz_file += str(test_case)
        fuzz_file += stream[rand_offset:]

        fd=open("test.%s" % self.ext, "wb")
        fd.write( fuzz_file)
        fd.close()

        return

</textarea>

전역 변수인 test_case 리스트에서 임의의 test_case 하나를 선택한다.

임의의 파일 오프셋과 파일 데이터의 크기를 구한다.

오프셋과 크기 정보를 이용해 파일을 나눈 후 파일 데이터에 대한 변형작업을 수행한다.

마지막으로 버퍼의 내용을 파일에 써 넣으면 디버거 스레드는 곧바로 그것을 이용해 애플리케이션을 테스트한다.

해석 루틴을 추가해보자.

<textarea name="code" class="brush:python;">

    def print_usage():

        print "[*]"
        print "[*] file_fuzzer.py -e <Executable Path> -x <File Extension>"
        print "[*]"

        sys.exit(0)

        if __name__ == "__main__":
            print "[*] Generic File Fuzzer"

    try :
        opts, argo = getopt.geropt(sys.argv[1:], "e:x:n")
    except getopt.GetoptError:
        print_usage()

    exe_path     =None
    ext         =None
    notify         =False

    for o,a in opts:
        if o == "-e":
            exe_path = a
        elif o == "-x":
            ext = a
        elif o == "-n":
            notify = True        

    if exe_path is not None and ext is not None:
        fuzzer = file_fuzzer(exe_path, ext, notify)
        fuzzer.fuzz()
    else :
        print_usage()

</textarea>

file_fuzzer.py 스크립트는 커맨드라인 파라미터를 받아들일 수 있게 됬다.

이 스크립트는 메모장을 이용해 테스트한다.

file_fuzzer.py 스크립트가 있는 디렉토리에 examples 디렉토리와 crashes 디렉토리를 만들고 examples 디렉토리안에 더미 텍스트 파일을 몇 개 만든다.

그 다음

이 커맨드라인 명령을 통해 퍼저를 실행시킨다.

그러면 메모장 프로그램이 실행되면서 내가 만들 테스트 파일이 변형될것이라고 한다.


최종 소스

from pydbg import *
from pydbg.defines import *
import utils
import random
import sys
import struct
import threading
import os
import shutil
import time
import getopt

class file_fuzzer:
   def __init__(self, exe_path, ext, notify):
      self.exe_path           = exe_path
      self.ext                = ext
      self.notify_crash       = notify
      self.orig_file          = None
      self.mutated_file       = None
      self.iteration          = 0
      self.exe_path           = exe_path
      self.orig_file          = None
      self.mutated_file       = None
      self.iteration          = 0
      self.crash              = None
      self.send_notify        = False
      self.pid                = None
      self.in_accessv_handler = False
      self.dbg                = None
      self.running            = False
      self.ready              = False

      self.smtpserver = 'mail.nostarch.com'
      self.recipients = ['jms@bughunter.ca',]
      self.sender     = 'jms@bughunter.ca'
      self.test_cases = ["%s%n%s%n%s%n", "\xff", "\x00", "A"]

   def file_picker(self):
      file_list   = os.listdir("examples/")
      list_length = len(file_list)
      file        = file_list[random.randint(0, list_length - 1)]
      shutil.copy("examples\\%s" % file, "test.%s" % self.ext)

      return file

   def fuzz(self):
      while 1:
         if not self.running:
            self.test_file = self.file_picker()
            self.mutate_file()

            pydbg_thread = threading.Thread(target = self.start_debugger)
            pydbg_thread.setDaemon(0)
            pydbg_thread.start()

            while self.pid == None:
               time.sleep(1)

               monitor_thread = threading.Thread(target = self.monitor_debugger)
               monitor_thread.setDaemon(0)
               monitor_thread.start()

               self.iteration += 1

            else :
               time.sleep(1)


   def start_debugger(self):
      print "[*] starting debugger for iteration : %d" % self.iteration
      self.running = True
      self.dbg     = pydbg()

      self.dbg.set_callback(EXCEPTION_ACCESS_VIOLATION, self.check_accessv)
      pid = self.dbg.load(self.exe_path, "test.%s" % self.ext)

      self.pid = self.dbg.pid
      self.dbg.run()

   def check_accessv(self,dbg):
      if dbg.dbg.u.Exception.dwFirstChance:
         return DBG_CONTINUE

      print"[*] woot! handling an access violation"

      self.in_accessv_handler = True
      crash_bin               = utils.crash_binning.crash_binning()
      crash_bin.record_crash(dbg)
      self.crash              = crash_bin.crash_synopsis()

      crash_fd = open("crashes\\crash-%d" % self.iteration, "w")
      crash_fd.write(self.crash)

      shutil.copy("test.%s" % self.ext, \
               "crashes\\%d.%s" % (self.iteration, self.ext))
      shutil.copy("examples\\%s" % self.test_file, \
               "crashes\\%d_orig.%s" % (self.iteration, self.ext))
      self.dbg.terminate_process()
      self.in_accessv_handler = False
      self.running = False

      return DBG_EXCEPTION_NOT_HANDLED

   def monitor_debugger(self):
      counter = 0

      print "[*] monitor_thread for pid : %d" % self.pid,

      while counter < 3:
         time.sleep(1)

         print counter,

         counter +=1

      if self.in_accessv_handler != True:
         time.sleep(1)
         self.dbg.terminate_process()
         self.pid     = None
         self.running = False
      else :
         print"[*] The access violation handler is doing its business"

         while self.running:
            time.sleep(1)


   def notify(self):
      crash_message = "from : %s\r\n\r\nTo:\r\n\r\nIteration:%d\n\nOutput:\n\n %s" % \
                                                   (self.sender, self.iteration, self.crash)

      session = smtplib.SMTP(smtpserver)
      session.sendmail(sender, recipients, crash_message)
      session.quit()

      return


   def mutate_file(self):
      fd     = open("test.%s" % self.ext, "rb")
      stream = fd.read()
      fd.close()

      test_case     = self.test_cases[random.randint(0, len(self.test_cases) - 1)]
      stream_length = len(stream)

      rand_offset   = random.randint(0, stream_length - 1 )
      rand_len      = random.randint(1, 1000)

      test_case = test_case * rand_len

      fuzz_file  = stream[0 : rand_offset]
      fuzz_file += str(test_case)
      fuzz_file += stream[rand_offset : ]

      fd = open("test.%s" % self.ext, "wb")
      fd.write(fuzz_file)
      fd.close()

      return

   def print_usage():
      print "[*]"
      print "[*] file_fuzzer.py -e <Executable Path> -x <File Extension>"
      print "[*]"

      sys.exit(0)

      if __name__ == "__main__":
         print "[*] Generic File Fuzzer"

try :
   opts, argo = getopt.getopt(sys.argv[1:], "e:x:n")
except getopt.GetoptError:
   print_usage()

exe_path = None
ext      = None
notify   = False

for o, a in opts:
   if o == "-e":
      exe_path = a
   elif o == "-x":
      ext = a
   elif o == "-n":
      notify = True      

if exe_path is not None and ext is not None:
   fuzzer = file_fuzzer(exe_path, ext, notify)
   fuzzer.fuzz()
else :
   print_usage()

728x90
반응형

댓글