Created
February 25, 2025 09:45
-
-
Save 1ndahous3/cb07befbcbc8cb3df45dd0dda828d05d to your computer and use it in GitHub Desktop.
python3 script to save PE modules from Windows memory dumps (MDMP)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import sys | |
| import struct | |
| MDMP_HEADER_SIZE = 4096 | |
| MDMP_SIGNATURE = b'MDMP' | |
| MDMP_DIRECTORY_SIZE = 12 | |
| MDMP_STREAM_TYPE_MEMORY_LIST = 0x5 | |
| MDMP_STREAM_TYPE_MEMORY_64_LIST = 0x9 | |
| MDMP_STREAM_MEMORY_64_LIST_HEADER_SIZE = 0x10 | |
| MDMP_MEMORY_DESCRIPTOR_64_SIZE = 0x10 | |
| MDMP_STREAM_MEMORY_LIST_HEADER_SIZE = 0x4 | |
| MDMP_MEMORY_DESCRIPTOR_SIZE = 0x10 | |
| MDMP_DIRECTORY_SIZE = 12 | |
| def parse_minidump(file_path): | |
| memory_map = {} | |
| with open(file_path, 'rb') as f: | |
| header = f.read(MDMP_HEADER_SIZE) | |
| signature, _, stream_count, directory_rva = struct.unpack('<4sIII', header[:16]) | |
| if signature != MDMP_SIGNATURE: | |
| return | |
| f.seek(directory_rva) | |
| directory = f.read(stream_count * MDMP_DIRECTORY_SIZE) # MINIDUMP_DIRECTORY | |
| for i in range(stream_count): | |
| stream_type, stream_data_size, stream_data_rva = struct.unpack('<3I', directory[i * MDMP_DIRECTORY_SIZE:i * MDMP_DIRECTORY_SIZE + MDMP_DIRECTORY_SIZE]) | |
| f.seek(stream_data_rva) | |
| if stream_type == MDMP_STREAM_TYPE_MEMORY_LIST: # MemoryListStream | |
| ranges_count = struct.unpack('<I', f.read(MDMP_STREAM_MEMORY_LIST_HEADER_SIZE))[0] # MINIDUMP_MEMORY_LIST | |
| for _ in range(ranges_count): | |
| start, size, rva = struct.unpack('<QII', f.read(MDMP_MEMORY_DESCRIPTOR_SIZE)) # MINIDUMP_MEMORY_DESCRIPTOR | |
| memory_map[start] = (rva, size) | |
| elif stream_type == MDMP_STREAM_TYPE_MEMORY_64_LIST: # Memory64ListStream | |
| ranges_count, rva = struct.unpack('<2Q', f.read(MDMP_STREAM_MEMORY_64_LIST_HEADER_SIZE)) # MINIDUMP_MEMORY64_LIST | |
| item_offset = 0 | |
| for _ in range(ranges_count): | |
| start, size = struct.unpack('<2Q', f.read(MDMP_MEMORY_DESCRIPTOR_64_SIZE)) # MINIDUMP_MEMORY_DESCRIPTOR64 | |
| memory_map[start] = (rva + item_offset, size) | |
| item_offset += size | |
| return memory_map | |
| from ctypes import * | |
| class IMAGE_DOS_HEADER(Structure): | |
| _fields_ = [("e_magic", c_uint16), | |
| ("e_cblp", c_uint16), | |
| ("e_cp", c_uint16), | |
| ("e_crlc", c_uint16), | |
| ("e_cparhdr", c_uint16), | |
| ("e_minalloc", c_uint16), | |
| ("e_maxalloc", c_uint16), | |
| ("e_ss", c_uint16), | |
| ("e_sp", c_uint16), | |
| ("e_csum", c_uint16), | |
| ("e_ip", c_uint16), | |
| ("e_cs", c_uint16), | |
| ("e_lfarlc", c_uint16), | |
| ("e_ovno", c_uint16), | |
| ("e_res", c_uint16 * 4), | |
| ("e_oemid", c_uint16), | |
| ("e_oeminfo", c_uint16), | |
| ("e_res2", c_uint16 * 10), | |
| ("e_lfanew", c_int32)] | |
| class IMAGE_FILE_HEADER(Structure): | |
| _fields_ = [("Machine", c_uint16), | |
| ("NumberOfSections", c_uint16), | |
| ("TimeDateStamp", c_uint32), | |
| ("PointerToSymbolTable", c_uint32), | |
| ("NumberOfSymbols", c_uint32), | |
| ("SizeOfOptionalHeader", c_uint16), | |
| ("Characteristics", c_uint16)] | |
| class IMAGE_OPTIONAL_HEADER(Structure): | |
| _fields_ = [("Magic", c_uint16), | |
| ("MajorLinkerVersion", c_uint8), | |
| ("MinorLinkerVersion", c_uint8), | |
| ("SizeOfCode", c_uint32), | |
| ("SizeOfInitializedData", c_uint32), | |
| ("SizeOfUninitializedData", c_uint32), | |
| ("AddressOfEntryPoint", c_uint32), | |
| ("BaseOfCode", c_uint32), | |
| ("BaseOfData", c_uint32), | |
| ("ImageBase", c_uint32), | |
| ("SectionAlignment", c_uint32), | |
| ("FileAlignment", c_uint32), | |
| ("MajorOperatingSystemVersion", c_uint16), | |
| ("MinorOperatingSystemVersion", c_uint16), | |
| ("MajorImageVersion", c_uint16), | |
| ("MinorImageVersion", c_uint16), | |
| ("MajorSubsystemVersion", c_uint16), | |
| ("MinorSubsystemVersion", c_uint16), | |
| ("Win32VersionValue", c_uint32), | |
| ("SizeOfImage", c_uint32), | |
| ("SizeOfHeaders", c_uint32), | |
| ("CheckSum", c_uint32), | |
| ("Subsystem", c_uint16), | |
| ("DllCharacteristics", c_uint16), | |
| ("SizeOfStackReserve", c_uint32), | |
| ("SizeOfStackCommit", c_uint32), | |
| ("SizeOfHeapReserve", c_uint32), | |
| ("SizeOfHeapCommit", c_uint32), | |
| ("LoaderFlags", c_uint32), | |
| ("NumberOfRvaAndSizes", c_uint32), | |
| ("DataDirectory", c_uint32 * 16)] | |
| class IMAGE_NT_HEADERS(Structure): | |
| _fields_ = [("Signature", c_uint32), | |
| ("FileHeader", IMAGE_FILE_HEADER), | |
| ("OptionalHeader", IMAGE_OPTIONAL_HEADER)] | |
| class IMAGE_SECTION_HEADER(Structure): | |
| _fields_ = [("Name", c_char * 8), | |
| ("VirtualSize", c_uint32), | |
| ("VirtualAddress", c_uint32), | |
| ("SizeOfRawData", c_uint32), | |
| ("PointerToRawData", c_uint32), | |
| ("PointerToRelocations", c_uint32), | |
| ("PointerToLinenumbers", c_uint32), | |
| ("NumberOfRelocations", c_uint16), | |
| ("NumberOfLinenumbers", c_uint16), | |
| ("Characteristics", c_uint32)] | |
| def find_mz_occurrences(file_path, memory_map): | |
| with open(file_path, 'rb') as file: | |
| occurrences = [] | |
| for block_va, (offset, size) in memory_map.items(): | |
| file.seek(offset) | |
| data = file.read(size) | |
| pos = 0 | |
| while pos < len(data) - 1: | |
| pos = data.find(b'MZ', pos) | |
| if pos != -1: | |
| occurrences.append(block_va + pos) | |
| pos += 1 | |
| else: | |
| break | |
| return occurrences | |
| def get_data(file_path, memory_map, va, size, checked_addr=True): | |
| with open(file_path, 'rb') as file: | |
| res = bytearray() | |
| read = 0 | |
| while read != size: | |
| current_va = va + read | |
| size_left = size - read | |
| block = None | |
| for block_va, (file_offset, block_size) in memory_map.items(): | |
| if current_va >= block_va and current_va < block_va + block_size: | |
| block = (file_offset, block_size) | |
| break | |
| if not block: | |
| if checked_addr: | |
| print(f"unable to find memory block for VA (0x{current_va:x})") | |
| return None | |
| if block_size == 0: | |
| if checked_addr: | |
| print(f"unable to get data for memory block for VA (0x{current_va:x})") | |
| return None | |
| offset_block = current_va - block_va | |
| remaining_size = block_size - offset_block | |
| chunk_size = min(remaining_size, size_left) | |
| file.seek(file_offset + offset_block) | |
| res += file.read(chunk_size) | |
| read += chunk_size | |
| return bytes(res) | |
| def parse_pe_module(file_path, memory_map, va): | |
| # unchecked part: probably PE | |
| data = get_data(file_path, memory_map, va, sizeof(IMAGE_DOS_HEADER), checked_addr=False) | |
| if not data: | |
| return None | |
| DosHeader = IMAGE_DOS_HEADER() | |
| memmove(addressof(DosHeader), data, sizeof(IMAGE_DOS_HEADER)) | |
| if DosHeader.e_lfanew < 0: | |
| return None | |
| data = get_data(file_path, memory_map, va + DosHeader.e_lfanew, sizeof(IMAGE_NT_HEADERS), checked_addr=False) | |
| if not data: | |
| return None | |
| NtHeader = IMAGE_NT_HEADERS() | |
| memmove(addressof(NtHeader), data, sizeof(IMAGE_NT_HEADERS)) | |
| if NtHeader.Signature != 0x00004550: # IMAGE_NT_SIGNATURE | |
| return None | |
| # checked part: it's PE | |
| data = get_data(file_path, memory_map, va, NtHeader.OptionalHeader.SizeOfHeaders) | |
| if not data: | |
| return None | |
| module = bytearray(data) | |
| sections_offset = va + DosHeader.e_lfanew + IMAGE_NT_HEADERS.OptionalHeader.offset + NtHeader.FileHeader.SizeOfOptionalHeader | |
| for SecIndex in range(NtHeader.FileHeader.NumberOfSections): | |
| data = get_data(file_path, memory_map, sections_offset + SecIndex * sizeof(IMAGE_SECTION_HEADER), sizeof(IMAGE_SECTION_HEADER)) | |
| if not data: | |
| return None | |
| SectionHeader = IMAGE_SECTION_HEADER() | |
| memmove(addressof(SectionHeader), data, sizeof(IMAGE_SECTION_HEADER)) | |
| section_size = max(SectionHeader.SizeOfRawData, SectionHeader.VirtualSize) | |
| data = get_data(file_path, memory_map, va + SectionHeader.VirtualAddress, section_size) | |
| if not data: | |
| return None | |
| module += data | |
| return bytes(module) | |
| memory_map = parse_minidump(sys.argv[1]) | |
| occurrences = find_mz_occurrences(sys.argv[1], memory_map) | |
| for va in occurrences: | |
| pe = parse_pe_module(sys.argv[1], memory_map, va) | |
| if pe: | |
| print(f"found full PE on VA = 0x{va:x}") | |
| f = open(f"0x{va:x}", "wb") | |
| f.write(pe) | |
| f.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment