Skip to content

Commit

Permalink
Merge pull request #901 from mr-tz/update-result-json
Browse files Browse the repository at this point in the history
fix extractions and improve language_strings_missed contents
  • Loading branch information
mr-tz authored Nov 13, 2023
2 parents 727c544 + 801559a commit d5662cd
Show file tree
Hide file tree
Showing 7 changed files with 187 additions and 103 deletions.
35 changes: 32 additions & 3 deletions floss/language/go/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ def find_stack_strings_with_regex(
if not binary_string:
continue

if binary_string.endswith(b"\x00"):
binary_string = binary_string[:-1]

addr = m.start()
# need to subtract opcode bytes offset
off_regex = len(m.group(0)) - len(binary_string)
Expand Down Expand Up @@ -98,6 +101,9 @@ def find_i386_stackstrings(section_data, offset, min_length):
def get_stackstrings(pe: pefile.PE, min_length: int) -> Iterable[StaticString]:
"""
Find stackstrings in the given PE file.
TODO(mr-tz): algorithms need improvements / rethinking of approach
https://github.com/mandiant/flare-floss/issues/828
"""

for section in pe.sections:
Expand Down Expand Up @@ -269,7 +275,9 @@ def get_string_blob_strings(pe: pefile.PE, min_length) -> Iterable[StaticString]
with floss.utils.timing("find struct string candidates"):
struct_strings = list(sorted(set(get_struct_string_candidates(pe)), key=lambda s: s.address))
if not struct_strings:
logger.warning("Failed to find struct string candidates: Is this a Go binary?")
logger.warning(
"Failed to find struct string candidates: Is this a Go binary? If so, the Go version may be unsupported."
)
return

with floss.utils.timing("find string blob"):
Expand Down Expand Up @@ -354,12 +362,14 @@ def get_string_blob_strings(pe: pefile.PE, min_length) -> Iterable[StaticString]
last_buf = string_blob_buf[last_pointer_offset:]
for size in range(len(last_buf), 0, -1):
try:
s = last_buf[:size].decode("utf-8")
_ = last_buf[:size].decode("utf-8")
except UnicodeDecodeError:
continue
else:
try:
string = StaticString.from_utf8(last_buf[:size], last_pointer, min_length)
string = StaticString.from_utf8(
last_buf[:size], pe.get_offset_from_rva(last_pointer - image_base), min_length
)
yield string
except ValueError:
pass
Expand All @@ -382,6 +392,25 @@ def extract_go_strings(sample, min_length) -> List[StaticString]:
return go_strings


def get_static_strings_from_blob_range(sample: pathlib.Path, static_strings: List[StaticString]) -> List[StaticString]:
pe = pefile.PE(data=pathlib.Path(sample).read_bytes(), fast_load=True)

struct_strings = list(sorted(set(get_struct_string_candidates(pe)), key=lambda s: s.address))
if not struct_strings:
return []

try:
string_blob_start, string_blob_end = find_string_blob_range(pe, struct_strings)
except ValueError:
return []

image_base = pe.OPTIONAL_HEADER.ImageBase
string_blob_start = pe.get_offset_from_rva(string_blob_start - image_base)
string_blob_end = pe.get_offset_from_rva(string_blob_end - image_base)

return list(filter(lambda s: string_blob_start <= s.offset < string_blob_end, static_strings))


def main(argv=None):
parser = argparse.ArgumentParser(description="Get Go strings")
parser.add_argument("path", help="file or path to analyze")
Expand Down
59 changes: 29 additions & 30 deletions floss/language/identify.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import re
from enum import Enum
from typing import Iterable
from typing import Tuple, Iterable
from pathlib import Path

import pefile
Expand All @@ -14,38 +14,43 @@
logger = floss.logging_.getLogger(__name__)


VERSION_UNKNOWN_OR_NA = "version unknown"


class Language(Enum):
GO = "go"
RUST = "rust"
DOTNET = "dotnet"
UNKNOWN = "unknown"
DISABLED = "none"


def identify_language(sample: Path, static_strings: Iterable[StaticString]) -> Language:
"""
Identify the language of the binary given
"""
if is_rust_bin(static_strings):
return Language.RUST
def identify_language_and_version(sample: Path, static_strings: Iterable[StaticString]) -> Tuple[Language, str]:
is_rust, version = get_if_rust_and_version(static_strings)
if is_rust:
logger.info("Rust binary found with version: %s", version)
return Language.RUST, version

# Open the file as PE for further checks
try:
pe = pefile.PE(str(sample))
except pefile.PEFormatError as err:
logger.debug(f"NOT a valid PE file: {err}")
return Language.UNKNOWN
return Language.UNKNOWN, VERSION_UNKNOWN_OR_NA

if is_go_bin(pe):
return Language.GO
is_go, version = get_if_go_and_version(pe)
if is_go:
logger.info("Go binary found with version %s", version)
return Language.GO, version
elif is_dotnet_bin(pe):
return Language.DOTNET
return Language.DOTNET, VERSION_UNKNOWN_OR_NA
else:
return Language.UNKNOWN
return Language.UNKNOWN, VERSION_UNKNOWN_OR_NA


def is_rust_bin(static_strings: Iterable[StaticString]) -> bool:
def get_if_rust_and_version(static_strings: Iterable[StaticString]) -> Tuple[bool, str]:
"""
Check if the binary given is compiled with Rust compiler or not
Return if the binary given is compiled with Rust compiler and its version
reference: https://github.com/mandiant/flare-floss/issues/766
"""

Expand All @@ -62,19 +67,17 @@ def is_rust_bin(static_strings: Iterable[StaticString]) -> bool:
matches = regex_hash.search(string)
if matches and matches["hash"] in rust_commit_hash.keys():
version = rust_commit_hash[matches["hash"]]
logger.info("Rust binary found with version: %s", version)
return True
return True, version
if regex_version.search(string):
logger.info("Rust binary found with version: %s", string)
return True
return True, string

return False
return False, VERSION_UNKNOWN_OR_NA


def is_go_bin(pe: pefile.PE) -> bool:
def get_if_go_and_version(pe: pefile.PE) -> Tuple[bool, str]:
"""
Check if the binary given is compiled with Go compiler or not
it checks the magic header of the pclntab structure -pcHeader-
Return if the binary given is compiled with Go compiler and its version
this checks the magic header of the pclntab structure -pcHeader-
the magic values varies through the version
reference:
https://github.com/0xjiayu/go_parser/blob/865359c297257e00165beb1683ef6a679edc2c7f/pclntbl.py#L46
Expand All @@ -101,11 +104,9 @@ def is_go_bin(pe: pefile.PE) -> bool:
if magic in section_data:
pclntab_va = section_data.index(magic) + section_va
if verify_pclntab(section, pclntab_va):
logger.info("Go binary found with version %s", get_go_version(magic))
return True
return True, get_go_version(magic)

# if not found, search in all the available sections

for magic in go_magic:
for section in pe.sections:
section_va = section.VirtualAddress
Expand All @@ -114,10 +115,8 @@ def is_go_bin(pe: pefile.PE) -> bool:
if magic in section_data:
pclntab_va = section_data.index(magic) + section_va
if verify_pclntab(section, pclntab_va):
# just for testing
logger.info("Go binary found with version %s", get_go_version(magic))
return True
return False
return True, get_go_version(magic)
return False, VERSION_UNKNOWN_OR_NA


def get_go_version(magic):
Expand All @@ -137,7 +136,7 @@ def get_go_version(magic):
elif magic == MAGIC_120:
return "1.20"
else:
return "unknown"
return VERSION_UNKNOWN_OR_NA


def verify_pclntab(section, pclntab_va: int) -> bool:
Expand Down
24 changes: 24 additions & 0 deletions floss/language/rust/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,20 @@ def extract_rust_strings(sample: pathlib.Path, min_length: int) -> List[StaticSt
return rust_strings


def get_static_strings_from_rdata(sample, static_strings) -> List[StaticString]:
pe = pefile.PE(data=pathlib.Path(sample).read_bytes(), fast_load=True)

try:
rdata_section = get_rdata_section(pe)
except ValueError:
return []

start_rdata = rdata_section.PointerToRawData
end_rdata = start_rdata + rdata_section.SizeOfRawData

return list(filter(lambda s: start_rdata <= s.offset < end_rdata, static_strings))


def get_string_blob_strings(pe: pefile.PE, min_length: int) -> Iterable[StaticString]:
image_base = pe.OPTIONAL_HEADER.ImageBase

Expand All @@ -145,6 +159,11 @@ def get_string_blob_strings(pe: pefile.PE, min_length: int) -> Iterable[StaticSt
# select only UTF-8 strings and adjust offset
static_strings = filter_and_transform_utf8_strings(fixed_strings, start_rdata)

# TODO(mr-tz) - handle miss in rust-hello64.exe
# .rdata:00000001400C1270 0A aPanickedAfterP db 0Ah ; DATA XREF: .rdata:00000001400C12B8↓o
# .rdata:00000001400C1271 70 61 6E 69 63 6B 65 64… db 'panicked after panic::always_abort(), aborting.',0Ah,0
# .rdata:00000001400C12A2 00 00 00 00 00 00 align 8

struct_string_addrs = map(lambda c: c.address, get_struct_string_candidates(pe))

if pe.FILE_HEADER.Machine == pefile.MACHINE_TYPE["IMAGE_FILE_MACHINE_I386"]:
Expand All @@ -157,6 +176,11 @@ def get_string_blob_strings(pe: pefile.PE, min_length: int) -> Iterable[StaticSt
xrefs_lea = find_lea_xrefs(pe)
xrefs = itertools.chain(struct_string_addrs, xrefs_lea)

# TODO(mr-tz) - handle movdqa rust-hello64.exe
# .text:0000000140026046 66 0F 6F 05 02 71 09 00 movdqa xmm0, cs:xmmword_1400BD150
# .text:000000014002604E 66 0F 6F 0D 0A 71 09 00 movdqa xmm1, cs:xmmword_1400BD160
# .text:0000000140026056 66 0F 6F 15 12 71 09 00 movdqa xmm2, cs:xmmword_1400BD170

else:
logger.error("unsupported architecture: %s", pe.FILE_HEADER.Machine)
return []
Expand Down
Loading

0 comments on commit d5662cd

Please sign in to comment.