파일 퍼저
파일 포맷 취약점을 이용하는 공격 벡터는 매우 다양하고 빠르게 변하기 때문에 파일 포맷 파서 자체의 버그를 찾아내는 방향으로 관심을 기울여야 한다.
모든 종류의 다양한 파일 포맷에 대한 변형을 만들 수 있어야 한다.
또 에러를 분석해 그것이 공격 가능한 것인지 디버깅 기능이 필요하다.
에러가 발생할 때마다 이메일로 에러 정보를 보내주는 기능을 구현할 것이다.
첫 번째 단계는 클래스의 골격과 간단한 파일 선택기를 구현하는 것이다.
file_fuzzer.py를 짜자
<textarea name="code" class="brush:python;">
from pydbg import *
from pydbg.defines import *
import utils
import random
import sys
import struct
import threading
import os
import shutil
import time
import getopt
class file_fuzzer:
def __init__(self, exe_path, ext, notify):
self.exe_path =exe_path
self.ext =ext
self.notify_crash =notify
self.orig_file =None
self.mutated_file =None
self.iteration =0
self.exe_path =exe_path
self.orig_file =None
self.mutated_file =None
self.iteration =0
self.crash =None
self.send_notify =False
self.pid =None
self.in_accessv_handler =False
self.dbg =None
self.running =False
self.ready =False
self.smtpserver = 'mail.nostarch.com'
self.recipients = ['jms@bughunter.ca',]
self.sender = 'jms@bughunter.ca'
self.test_cases = ["%s%n%s%n%s%n", "\xff", "\x00", "A"]
def file_picker(self):
file_list = os.listdir("examples/")
list_length = len(file_list)
file = file_list[random.randint(0, list_length-1)]
shutil.copy("examples\\%s" % file, "test.%s" % self.ext)
return file
</textarea>
file_picker 함수는 단순히 파이썬 함수 몇 개를 이용해 디렉토리 내의 파일 리스트를 구하고 그 중 하나의 파일을 임시로 선택한다.
이제는 퍼징을 수행할 애플리케이션을 로드한 수 그 곳에서 에러가 발생하는지 추적하는 슬드와 해당 애플리케이션의 문서 파싱 작업이 완료되면 그것을 종료하는 스레드를 만들어야 한다.
<textarea name="code" class="brush:python;">
def fuzz(self):
while 1:
if not self.running:
self.test_file = self.file_picker()
self.mutate_file()
pydbg_thread = threading.Thread(target=self.start_debugger)
pydbg_thread.setDaemon(0)
pydbg_thread.start()
while self.pid == None:
time.sleep(1)
monitor_thread = threading.Thread(target=self.monitor_debugger)
monitor_thread.setDaemon(0)
monitor_thread.start()
self.iteration += 1
else :
time.sleep(1)
def start_debugger(self):
print "[*] starting debugger for iteration : %d" % self.iteration
self.running = True
self.dbg = pydbg()
self.dbg.set_callback(EXCEPTION_ACCESS_VIOLATION, self.check_accessv)
pid=self.dbg.load(self.exe_path, "test.%s" % self.ext)
self.pid = self.dbg.pid
self.dbg.run()
def check_accessv(self,dbg):
if dbg.dbg.u.Exception.dwFirstChance:
return DBG_CONTINUE
print"[*] woot! handling an access violation"
self.in_accessv_handler = True
crash_bin = utils.crash_binning.crash_binning()
crash_bin.record_crash(dbg)
self.crash = crash_bin.crash_synopsis()
crash_fd = open("crashes\\crash-%d" % self.iteration, "w")
crash_fd.write(self.crash)
shutil.copy("test.%s" % self.ext, "crashes\\%d.%s" % (self.iteration, self.ext))
shutil.copy("examples\\%s" % self.test_file, "crashes\\%d_orig.%s" % (self.iteration, self.ext))
self.dbg.terminate_process()
self.in_accessv_handler = False
self.running = False
return DBG_EXCEPTION_NOT_HANDLED
def monitor_debugger(self):
counter = 0
print "[*] monitor_thread for pid : %d" % self.pid,
while counter < 3:
time.sleep(1)
print counter,
counter +=1
if self.in_accessv_handler != True:
time.sleep(1)
self.dbg.terminate_process()
self.pid = None
self.running = False
else :
print"[*] The access violation handler is doing its business"
while self.running:
time.sleep(1)
def notify(self):
crash_message = "from : %s\r\n\r\nTo:\r\n\r\nIteration:%d\n\nOutput:\n\n %s" % (self.sender,self.iteration, self.crash)
session = smtplib.SMTP(smtpserver)
session.sendmail(sender, recipients, crash_message)
session.quit()
return
</textarea>
fuzz함수 로직을 살펴보자
먼저 퍼징을 수행하고 있는지 확인한다.
변형을 가할 문서를 선택한뒤 그것을 mutate_file 함수에 전달한다.
변형한 문서에 대한 파싱을 수행할 애플리케이션을 실행시키는 디버거 스레드를 생성한다.
PID가 등록되면 적당한 시간이 흐른 후 해당 애플리케이션을 종료시키기 위한 모니터링 스레드를 생성한다.
파일에 변형을 가하는 mutate_file 함수를 추가하자.
<textarea name="code" class="brush:python;">
def mutate_file(self):
fd=open("test.%s" % self.ext, "rb")
stream = fd.read()
fd.close()
test_case = self.test_cases[random.randint(0,len(self,test_cases)-1)]
stream_length = len(stream)
rand_offset =random.randit(0, stream_length -1 )
rand_len =random.randit(1, 1000)
test_case = test_case * rand_len
fuzz_file = stream[0:rand_offset]
fuzz_file += str(test_case)
fuzz_file += stream[rand_offset:]
fd=open("test.%s" % self.ext, "wb")
fd.write( fuzz_file)
fd.close()
return
</textarea>
전역 변수인 test_case 리스트에서 임의의 test_case 하나를 선택한다.
임의의 파일 오프셋과 파일 데이터의 크기를 구한다.
오프셋과 크기 정보를 이용해 파일을 나눈 후 파일 데이터에 대한 변형작업을 수행한다.
마지막으로 버퍼의 내용을 파일에 써 넣으면 디버거 스레드는 곧바로 그것을 이용해 애플리케이션을 테스트한다.
해석 루틴을 추가해보자.
<textarea name="code" class="brush:python;">
def print_usage():
print "[*]"
print "[*] file_fuzzer.py -e <Executable Path> -x <File Extension>"
print "[*]"
sys.exit(0)
if __name__ == "__main__":
print "[*] Generic File Fuzzer"
try :
opts, argo = getopt.geropt(sys.argv[1:], "e:x:n")
except getopt.GetoptError:
print_usage()
exe_path =None
ext =None
notify =False
for o,a in opts:
if o == "-e":
exe_path = a
elif o == "-x":
ext = a
elif o == "-n":
notify = True
if exe_path is not None and ext is not None:
fuzzer = file_fuzzer(exe_path, ext, notify)
fuzzer.fuzz()
else :
print_usage()
</textarea>
file_fuzzer.py 스크립트는 커맨드라인 파라미터를 받아들일 수 있게 됬다.
이 스크립트는 메모장을 이용해 테스트한다.
file_fuzzer.py 스크립트가 있는 디렉토리에 examples 디렉토리와 crashes 디렉토리를 만들고 examples 디렉토리안에 더미 텍스트 파일을 몇 개 만든다.
그 다음
이 커맨드라인 명령을 통해 퍼저를 실행시킨다.
그러면 메모장 프로그램이 실행되면서 내가 만들 테스트 파일이 변형될것이라고 한다.
최종 소스
from pydbg import *
from pydbg.defines import *
import utils
import random
import sys
import struct
import threading
import os
import shutil
import time
import getopt
class file_fuzzer:
def __init__(self, exe_path, ext, notify):
self.exe_path = exe_path
self.ext = ext
self.notify_crash = notify
self.orig_file = None
self.mutated_file = None
self.iteration = 0
self.exe_path = exe_path
self.orig_file = None
self.mutated_file = None
self.iteration = 0
self.crash = None
self.send_notify = False
self.pid = None
self.in_accessv_handler = False
self.dbg = None
self.running = False
self.ready = False
self.smtpserver = 'mail.nostarch.com'
self.recipients = ['jms@bughunter.ca',]
self.sender = 'jms@bughunter.ca'
self.test_cases = ["%s%n%s%n%s%n", "\xff", "\x00", "A"]
def file_picker(self):
file_list = os.listdir("examples/")
list_length = len(file_list)
file = file_list[random.randint(0, list_length - 1)]
shutil.copy("examples\\%s" % file, "test.%s" % self.ext)
return file
def fuzz(self):
while 1:
if not self.running:
self.test_file = self.file_picker()
self.mutate_file()
pydbg_thread = threading.Thread(target = self.start_debugger)
pydbg_thread.setDaemon(0)
pydbg_thread.start()
while self.pid == None:
time.sleep(1)
monitor_thread = threading.Thread(target = self.monitor_debugger)
monitor_thread.setDaemon(0)
monitor_thread.start()
self.iteration += 1
else :
time.sleep(1)
def start_debugger(self):
print "[*] starting debugger for iteration : %d" % self.iteration
self.running = True
self.dbg = pydbg()
self.dbg.set_callback(EXCEPTION_ACCESS_VIOLATION, self.check_accessv)
pid = self.dbg.load(self.exe_path, "test.%s" % self.ext)
self.pid = self.dbg.pid
self.dbg.run()
def check_accessv(self,dbg):
if dbg.dbg.u.Exception.dwFirstChance:
return DBG_CONTINUE
print"[*] woot! handling an access violation"
self.in_accessv_handler = True
crash_bin = utils.crash_binning.crash_binning()
crash_bin.record_crash(dbg)
self.crash = crash_bin.crash_synopsis()
crash_fd = open("crashes\\crash-%d" % self.iteration, "w")
crash_fd.write(self.crash)
shutil.copy("test.%s" % self.ext, \
"crashes\\%d.%s" % (self.iteration, self.ext))
shutil.copy("examples\\%s" % self.test_file, \
"crashes\\%d_orig.%s" % (self.iteration, self.ext))
self.dbg.terminate_process()
self.in_accessv_handler = False
self.running = False
return DBG_EXCEPTION_NOT_HANDLED
def monitor_debugger(self):
counter = 0
print "[*] monitor_thread for pid : %d" % self.pid,
while counter < 3:
time.sleep(1)
print counter,
counter +=1
if self.in_accessv_handler != True:
time.sleep(1)
self.dbg.terminate_process()
self.pid = None
self.running = False
else :
print"[*] The access violation handler is doing its business"
while self.running:
time.sleep(1)
def notify(self):
crash_message = "from : %s\r\n\r\nTo:\r\n\r\nIteration:%d\n\nOutput:\n\n %s" % \
(self.sender, self.iteration, self.crash)
session = smtplib.SMTP(smtpserver)
session.sendmail(sender, recipients, crash_message)
session.quit()
return
def mutate_file(self):
fd = open("test.%s" % self.ext, "rb")
stream = fd.read()
fd.close()
test_case = self.test_cases[random.randint(0, len(self.test_cases) - 1)]
stream_length = len(stream)
rand_offset = random.randint(0, stream_length - 1 )
rand_len = random.randint(1, 1000)
test_case = test_case * rand_len
fuzz_file = stream[0 : rand_offset]
fuzz_file += str(test_case)
fuzz_file += stream[rand_offset : ]
fd = open("test.%s" % self.ext, "wb")
fd.write(fuzz_file)
fd.close()
return
def print_usage():
print "[*]"
print "[*] file_fuzzer.py -e <Executable Path> -x <File Extension>"
print "[*]"
sys.exit(0)
if __name__ == "__main__":
print "[*] Generic File Fuzzer"
try :
opts, argo = getopt.getopt(sys.argv[1:], "e:x:n")
except getopt.GetoptError:
print_usage()
exe_path = None
ext = None
notify = False
for o, a in opts:
if o == "-e":
exe_path = a
elif o == "-x":
ext = a
elif o == "-n":
notify = True
if exe_path is not None and ext is not None:
fuzzer = file_fuzzer(exe_path, ext, notify)
fuzzer.fuzz()
else :
print_usage()
'파이썬 스터디 과제 > 파이썬 해킹 프로그래밍' 카테고리의 다른 글
10장-윈도우 드라이버 퍼징 (0) | 2015.02.17 |
---|---|
9장-sulley (0) | 2015.02.16 |
8장-퍼징 (0) | 2015.02.10 |
7장-DLL과 코드 인젝션-3 (백도어 제작) (0) | 2015.01.31 |
7장-DLL과 코드 인젝션-2 (코드 인젝션) (0) | 2015.01.31 |
댓글