diff --git a/floss/language/go/extract.py b/floss/language/go/extract.py index 9aa0ffd27..c1e5bf95f 100644 --- a/floss/language/go/extract.py +++ b/floss/language/go/extract.py @@ -37,6 +37,9 @@ def find_stack_strings_with_regex( if not binary_string: continue + if binary_string.endswith(b"\x00"): + binary_string = binary_string[:-1] + addr = m.start() # need to subtract opcode bytes offset off_regex = len(m.group(0)) - len(binary_string) @@ -98,6 +101,9 @@ def find_i386_stackstrings(section_data, offset, min_length): def get_stackstrings(pe: pefile.PE, min_length: int) -> Iterable[StaticString]: """ Find stackstrings in the given PE file. + + TODO(mr-tz): algorithms need improvements / rethinking of approach + https://github.com/mandiant/flare-floss/issues/828 """ for section in pe.sections: @@ -269,7 +275,9 @@ def get_string_blob_strings(pe: pefile.PE, min_length) -> Iterable[StaticString] with floss.utils.timing("find struct string candidates"): struct_strings = list(sorted(set(get_struct_string_candidates(pe)), key=lambda s: s.address)) if not struct_strings: - logger.warning("Failed to find struct string candidates: Is this a Go binary?") + logger.warning( + "Failed to find struct string candidates: Is this a Go binary? If so, the Go version may be unsupported." + ) return with floss.utils.timing("find string blob"): @@ -354,12 +362,14 @@ def get_string_blob_strings(pe: pefile.PE, min_length) -> Iterable[StaticString] last_buf = string_blob_buf[last_pointer_offset:] for size in range(len(last_buf), 0, -1): try: - s = last_buf[:size].decode("utf-8") + _ = last_buf[:size].decode("utf-8") except UnicodeDecodeError: continue else: try: - string = StaticString.from_utf8(last_buf[:size], last_pointer, min_length) + string = StaticString.from_utf8( + last_buf[:size], pe.get_offset_from_rva(last_pointer - image_base), min_length + ) yield string except ValueError: pass @@ -382,6 +392,25 @@ def extract_go_strings(sample, min_length) -> List[StaticString]: return go_strings +def get_static_strings_from_blob_range(sample: pathlib.Path, static_strings: List[StaticString]) -> List[StaticString]: + pe = pefile.PE(data=pathlib.Path(sample).read_bytes(), fast_load=True) + + struct_strings = list(sorted(set(get_struct_string_candidates(pe)), key=lambda s: s.address)) + if not struct_strings: + return [] + + try: + string_blob_start, string_blob_end = find_string_blob_range(pe, struct_strings) + except ValueError: + return [] + + image_base = pe.OPTIONAL_HEADER.ImageBase + string_blob_start = pe.get_offset_from_rva(string_blob_start - image_base) + string_blob_end = pe.get_offset_from_rva(string_blob_end - image_base) + + return list(filter(lambda s: string_blob_start <= s.offset < string_blob_end, static_strings)) + + def main(argv=None): parser = argparse.ArgumentParser(description="Get Go strings") parser.add_argument("path", help="file or path to analyze") diff --git a/floss/language/identify.py b/floss/language/identify.py index 84202fbef..fbe4a8963 100644 --- a/floss/language/identify.py +++ b/floss/language/identify.py @@ -2,7 +2,7 @@ import re from enum import Enum -from typing import Iterable +from typing import Tuple, Iterable from pathlib import Path import pefile @@ -14,38 +14,43 @@ logger = floss.logging_.getLogger(__name__) +VERSION_UNKNOWN_OR_NA = "version unknown" + + class Language(Enum): GO = "go" RUST = "rust" DOTNET = "dotnet" UNKNOWN = "unknown" + DISABLED = "none" -def identify_language(sample: Path, static_strings: Iterable[StaticString]) -> Language: - """ - Identify the language of the binary given - """ - if is_rust_bin(static_strings): - return Language.RUST +def identify_language_and_version(sample: Path, static_strings: Iterable[StaticString]) -> Tuple[Language, str]: + is_rust, version = get_if_rust_and_version(static_strings) + if is_rust: + logger.info("Rust binary found with version: %s", version) + return Language.RUST, version # Open the file as PE for further checks try: pe = pefile.PE(str(sample)) except pefile.PEFormatError as err: logger.debug(f"NOT a valid PE file: {err}") - return Language.UNKNOWN + return Language.UNKNOWN, VERSION_UNKNOWN_OR_NA - if is_go_bin(pe): - return Language.GO + is_go, version = get_if_go_and_version(pe) + if is_go: + logger.info("Go binary found with version %s", version) + return Language.GO, version elif is_dotnet_bin(pe): - return Language.DOTNET + return Language.DOTNET, VERSION_UNKNOWN_OR_NA else: - return Language.UNKNOWN + return Language.UNKNOWN, VERSION_UNKNOWN_OR_NA -def is_rust_bin(static_strings: Iterable[StaticString]) -> bool: +def get_if_rust_and_version(static_strings: Iterable[StaticString]) -> Tuple[bool, str]: """ - Check if the binary given is compiled with Rust compiler or not + Return if the binary given is compiled with Rust compiler and its version reference: https://github.com/mandiant/flare-floss/issues/766 """ @@ -62,19 +67,17 @@ def is_rust_bin(static_strings: Iterable[StaticString]) -> bool: matches = regex_hash.search(string) if matches and matches["hash"] in rust_commit_hash.keys(): version = rust_commit_hash[matches["hash"]] - logger.info("Rust binary found with version: %s", version) - return True + return True, version if regex_version.search(string): - logger.info("Rust binary found with version: %s", string) - return True + return True, string - return False + return False, VERSION_UNKNOWN_OR_NA -def is_go_bin(pe: pefile.PE) -> bool: +def get_if_go_and_version(pe: pefile.PE) -> Tuple[bool, str]: """ - Check if the binary given is compiled with Go compiler or not - it checks the magic header of the pclntab structure -pcHeader- + Return if the binary given is compiled with Go compiler and its version + this checks the magic header of the pclntab structure -pcHeader- the magic values varies through the version reference: https://github.com/0xjiayu/go_parser/blob/865359c297257e00165beb1683ef6a679edc2c7f/pclntbl.py#L46 @@ -101,11 +104,9 @@ def is_go_bin(pe: pefile.PE) -> bool: if magic in section_data: pclntab_va = section_data.index(magic) + section_va if verify_pclntab(section, pclntab_va): - logger.info("Go binary found with version %s", get_go_version(magic)) - return True + return True, get_go_version(magic) # if not found, search in all the available sections - for magic in go_magic: for section in pe.sections: section_va = section.VirtualAddress @@ -114,10 +115,8 @@ def is_go_bin(pe: pefile.PE) -> bool: if magic in section_data: pclntab_va = section_data.index(magic) + section_va if verify_pclntab(section, pclntab_va): - # just for testing - logger.info("Go binary found with version %s", get_go_version(magic)) - return True - return False + return True, get_go_version(magic) + return False, VERSION_UNKNOWN_OR_NA def get_go_version(magic): @@ -137,7 +136,7 @@ def get_go_version(magic): elif magic == MAGIC_120: return "1.20" else: - return "unknown" + return VERSION_UNKNOWN_OR_NA def verify_pclntab(section, pclntab_va: int) -> bool: diff --git a/floss/language/rust/extract.py b/floss/language/rust/extract.py index 3d3dc3960..71c3495e0 100644 --- a/floss/language/rust/extract.py +++ b/floss/language/rust/extract.py @@ -123,6 +123,20 @@ def extract_rust_strings(sample: pathlib.Path, min_length: int) -> List[StaticSt return rust_strings +def get_static_strings_from_rdata(sample, static_strings) -> List[StaticString]: + pe = pefile.PE(data=pathlib.Path(sample).read_bytes(), fast_load=True) + + try: + rdata_section = get_rdata_section(pe) + except ValueError: + return [] + + start_rdata = rdata_section.PointerToRawData + end_rdata = start_rdata + rdata_section.SizeOfRawData + + return list(filter(lambda s: start_rdata <= s.offset < end_rdata, static_strings)) + + def get_string_blob_strings(pe: pefile.PE, min_length: int) -> Iterable[StaticString]: image_base = pe.OPTIONAL_HEADER.ImageBase @@ -145,6 +159,11 @@ def get_string_blob_strings(pe: pefile.PE, min_length: int) -> Iterable[StaticSt # select only UTF-8 strings and adjust offset static_strings = filter_and_transform_utf8_strings(fixed_strings, start_rdata) + # TODO(mr-tz) - handle miss in rust-hello64.exe + # .rdata:00000001400C1270 0A aPanickedAfterP db 0Ah ; DATA XREF: .rdata:00000001400C12B8↓o + # .rdata:00000001400C1271 70 61 6E 69 63 6B 65 64… db 'panicked after panic::always_abort(), aborting.',0Ah,0 + # .rdata:00000001400C12A2 00 00 00 00 00 00 align 8 + struct_string_addrs = map(lambda c: c.address, get_struct_string_candidates(pe)) if pe.FILE_HEADER.Machine == pefile.MACHINE_TYPE["IMAGE_FILE_MACHINE_I386"]: @@ -157,6 +176,11 @@ def get_string_blob_strings(pe: pefile.PE, min_length: int) -> Iterable[StaticSt xrefs_lea = find_lea_xrefs(pe) xrefs = itertools.chain(struct_string_addrs, xrefs_lea) + # TODO(mr-tz) - handle movdqa rust-hello64.exe + # .text:0000000140026046 66 0F 6F 05 02 71 09 00 movdqa xmm0, cs:xmmword_1400BD150 + # .text:000000014002604E 66 0F 6F 0D 0A 71 09 00 movdqa xmm1, cs:xmmword_1400BD160 + # .text:0000000140026056 66 0F 6F 15 12 71 09 00 movdqa xmm2, cs:xmmword_1400BD170 + else: logger.error("unsupported architecture: %s", pe.FILE_HEADER.Machine) return [] diff --git a/floss/main.py b/floss/main.py index acd288acf..a49edea73 100644 --- a/floss/main.py +++ b/floss/main.py @@ -54,7 +54,7 @@ from floss.stackstrings import extract_stackstrings from floss.tightstrings import extract_tightstrings from floss.string_decoder import decode_strings -from floss.language.identify import Language, identify_language +from floss.language.identify import Language, identify_language_and_version SIGNATURES_PATH_DEFAULT_STRING = "(embedded signatures)" EXTENSIONS_SHELLCODE_32 = ("sc32", "raw32") @@ -198,9 +198,11 @@ def make_parser(argv): advanced_group.add_argument( "--language", type=str, - choices=[l.value for l in Language if l != Language.UNKNOWN] + ["none"], - default="", - help="use language-specific string extraction, disable using 'none'" if show_all_options else argparse.SUPPRESS, + choices=[l.value for l in Language if l != Language.UNKNOWN], + default=Language.UNKNOWN.value, + help="use language-specific string extraction, auto-detect language by default, disable using 'none'" + if show_all_options + else argparse.SUPPRESS, ) advanced_group.add_argument( "-l", @@ -547,35 +549,44 @@ def main(argv=None) -> int: static_runtime = get_runtime_diff(interim) # set language configurations - lang_id: Language - if args.language == Language.GO.value: - lang_id = Language.GO - elif args.language == Language.RUST.value: - lang_id = Language.RUST - elif args.language == Language.DOTNET.value: - lang_id = Language.DOTNET - elif args.language == "none": - lang_id = Language.UNKNOWN + selected_lang = Language(args.language) + if selected_lang == Language.DISABLED: + results.metadata.language = "" + results.metadata.language_version = "" + results.metadata.language_selected = "" else: - lang_id = identify_language(sample, static_strings) + lang_id, lang_version = identify_language_and_version(sample, static_strings) + + if selected_lang == Language.UNKNOWN: + pass + elif selected_lang != lang_id: + logger.warning( + "the selected language '%s' differs to the automatically identified language '%s (%s)' - extracted " + "strings may be incomplete or inaccurate", + selected_lang.value, + lang_id.value, + lang_version, + ) + results.metadata.language_selected = selected_lang.value - if lang_id == Language.GO: + results.metadata.language = lang_id.value + results.metadata.language_version = lang_version + + if results.metadata.language == Language.GO.value: if analysis.enable_tight_strings or analysis.enable_stack_strings or analysis.enable_decoded_strings: logger.warning( "FLOSS handles Go static strings, but string deobfuscation may be inaccurate and take a long time" ) - results.metadata.language = Language.GO.value - elif lang_id == Language.RUST: + elif results.metadata.language == Language.RUST.value: if analysis.enable_tight_strings or analysis.enable_stack_strings or analysis.enable_decoded_strings: logger.warning( "FLOSS handles Rust static strings, but string deobfuscation may be inaccurate and take a long time" ) - results.metadata.language = Language.RUST.value - elif lang_id == Language.DOTNET: + elif results.metadata.language == Language.DOTNET.value: logger.warning(".NET language-specific string extraction is not supported yet") - logger.warning("Furthermore, FLOSS does NOT attempt to deobfuscate any strings from .NET binaries") + logger.warning("FLOSS does NOT attempt to deobfuscate any strings from .NET binaries") # enable .NET strings once we can extract them # results.metadata.language = Language.DOTNET.value @@ -585,7 +596,7 @@ def main(argv=None) -> int: analysis.enable_tight_strings = False analysis.enable_decoded_strings = False - if results.metadata.language != "": + if results.metadata.language not in ("", "unknown"): if args.enabled_types == [] and args.disabled_types == []: prompt = input("Do you want to enable string deobfuscation? (this could take a long time) [y/N] ") @@ -603,40 +614,42 @@ def main(argv=None) -> int: # in order of expected run time, fast to slow # 1. static strings (done above) + # a) includes language-specific strings, if applicable # 2. stack strings # 3. tight strings # 4. decoded strings if results.analysis.enable_static_strings: + logger.info("extracting static strings") results.strings.static_strings = static_strings results.metadata.runtime.static_strings = static_runtime - if not lang_id: - logger.info("extracting static strings") - else: - if lang_id == Language.GO: - logger.info("extracting language-specific Go strings") - - interim = time() - results.strings.language_strings = floss.language.go.extract.extract_go_strings(sample, args.min_length) - results.metadata.runtime.language_strings = get_runtime_diff(interim) + if results.metadata.language == Language.GO.value: + logger.info("extracting language-specific Go strings") - results.strings.language_strings_missed = floss.language.utils.get_missed_strings( - static_strings, results.strings.language_strings, args.min_length - ) + interim = time() + results.strings.language_strings = floss.language.go.extract.extract_go_strings(sample, args.min_length) + results.metadata.runtime.language_strings = get_runtime_diff(interim) + + # missed strings only includes non-identified strings in searched range + # here currently only focus on strings in string blob range + string_blob_strings = floss.language.go.extract.get_static_strings_from_blob_range(sample, static_strings) + results.strings.language_strings_missed = floss.language.utils.get_missed_strings( + string_blob_strings, results.strings.language_strings, args.min_length + ) - elif lang_id == Language.RUST: - logger.info("extracting language-specific Rust strings") + elif results.metadata.language == Language.RUST.value: + logger.info("extracting language-specific Rust strings") - interim = time() - results.strings.language_strings = floss.language.rust.extract.extract_rust_strings( - sample, args.min_length - ) - results.metadata.runtime.language_strings = get_runtime_diff(interim) + interim = time() + results.strings.language_strings = floss.language.rust.extract.extract_rust_strings(sample, args.min_length) + results.metadata.runtime.language_strings = get_runtime_diff(interim) - results.strings.language_strings_missed = floss.language.utils.get_missed_strings( - static_strings, results.strings.language_strings, args.min_length - ) + # currently Rust strings are only extracted from the .rdata section + rdata_strings = floss.language.rust.extract.get_static_strings_from_rdata(sample, static_strings) + results.strings.language_strings_missed = floss.language.utils.get_missed_strings( + rdata_strings, results.strings.language_strings, args.min_length + ) if ( results.analysis.enable_decoded_strings or results.analysis.enable_stack_strings diff --git a/floss/render/default.py b/floss/render/default.py index b145539cd..898aad94f 100644 --- a/floss/render/default.py +++ b/floss/render/default.py @@ -46,8 +46,19 @@ def width(s: str, character_count: int) -> str: def render_meta(results: ResultDocument, console, verbose): rows: List[Tuple[str, str]] = list() + + lang = f"{results.metadata.language}" if results.metadata.language else "" + lang_v = ( + f" ({results.metadata.language_version})" + if results.metadata.language != "unknown" and results.metadata.language_version + else "" + ) + lang_s = f" - selected: {results.metadata.language_selected}" if results.metadata.language_selected else "" + language_value = f"{lang}{lang_v}{lang_s}" + if verbose == Verbosity.DEFAULT: rows.append((width("file path", MIN_WIDTH_LEFT_COL), width(results.metadata.file_path, MIN_WIDTH_RIGHT_COL))) + rows.append(("identified language", language_value)) else: rows.extend( [ @@ -55,7 +66,7 @@ def render_meta(results: ResultDocument, console, verbose): ("start date", results.metadata.runtime.start_date.strftime("%Y-%m-%d %H:%M:%S")), ("runtime", strtime(results.metadata.runtime.total)), ("version", results.metadata.version), - ("identified language", results.metadata.language), + ("identified language", language_value), ("imagebase", f"0x{results.metadata.imagebase:x}"), ("min string length", f"{results.metadata.min_length}"), ] @@ -145,7 +156,7 @@ def strtime(seconds): def render_language_strings(language, language_strings, language_strings_missed, console, verbose, disable_headers): strings = sorted(language_strings + language_strings_missed, key=lambda s: s.offset) - render_heading(f"FLOSS {language.upper()} STRINGS", len(strings), console, verbose, disable_headers) + render_heading(f"FLOSS {language.upper()} STRINGS ({len(strings)})", console, verbose, disable_headers) offset_len = len(f"{strings[-1].offset}") for s in strings: if verbose == Verbosity.DEFAULT: @@ -153,7 +164,6 @@ def render_language_strings(language, language_strings, language_strings_missed, else: colored_string = string_style(sanitize(s.string, is_ascii_only=False)) console.print(f"0x{s.offset:>0{offset_len}x} {colored_string}") - console.print("\n") def render_static_substrings(strings, encoding, offset_len, console, verbose, disable_headers): @@ -166,11 +176,10 @@ def render_static_substrings(strings, encoding, offset_len, console, verbose, di else: colored_string = string_style(sanitize(s.string)) console.print(f"0x{s.offset:>0{offset_len}x} {colored_string}") - console.print("\n") def render_staticstrings(strings, console, verbose, disable_headers): - render_heading("FLOSS STATIC STRINGS", len(strings), console, verbose, disable_headers) + render_heading(f"FLOSS STATIC STRINGS ({len(strings)})", console, verbose, disable_headers) ascii_strings = list(filter(lambda s: s.encoding == StringEncoding.ASCII, strings)) unicode_strings = list(filter(lambda s: s.encoding == StringEncoding.UTF16LE, strings)) @@ -184,6 +193,7 @@ def render_staticstrings(strings, console, verbose, disable_headers): offset_len = max(ascii_offset_len, unicode_offset_len) render_static_substrings(ascii_strings, "ASCII", offset_len, console, verbose, disable_headers) + console.print("\n") render_static_substrings(unicode_strings, "UTF-16LE", offset_len, console, verbose, disable_headers) @@ -249,13 +259,13 @@ def render_decoded_strings(decoded_strings: List[DecodedString], console, verbos console.print("\n") -def render_heading(heading, n, console, verbose, disable_headers): +def render_heading(heading, console, verbose, disable_headers): """ example:: - =========================== - ‖ FLOSS TIGHT STRINGS (0) ‖ - =========================== + ───────────────────────── + FLOSS TIGHT STRINGS (0) + ───────────────────────── """ if disable_headers: return @@ -314,6 +324,10 @@ def render(results: floss.results.ResultDocument, verbose, disable_headers, colo render_meta(results, console, verbose) console.print("\n") + if results.analysis.enable_static_strings: + render_staticstrings(results.strings.static_strings, console, verbose, disable_headers) + console.print("\n") + if results.metadata.language in ( floss.language.identify.Language.GO.value, floss.language.identify.Language.RUST.value, @@ -326,23 +340,22 @@ def render(results: floss.results.ResultDocument, verbose, disable_headers, colo verbose, disable_headers, ) - - elif results.analysis.enable_static_strings: - render_staticstrings(results.strings.static_strings, console, verbose, disable_headers) console.print("\n") if results.analysis.enable_stack_strings: - render_heading("FLOSS STACK STRINGS", len(results.strings.stack_strings), console, verbose, disable_headers) + render_heading(f"FLOSS STACK STRINGS ({len(results.strings.stack_strings)})", console, verbose, disable_headers) render_stackstrings(results.strings.stack_strings, console, verbose, disable_headers) console.print("\n") if results.analysis.enable_tight_strings: - render_heading("FLOSS TIGHT STRINGS", len(results.strings.tight_strings), console, verbose, disable_headers) + render_heading(f"FLOSS TIGHT STRINGS ({len(results.strings.tight_strings)})", console, verbose, disable_headers) render_stackstrings(results.strings.tight_strings, console, verbose, disable_headers) console.print("\n") if results.analysis.enable_decoded_strings: - render_heading("FLOSS DECODED STRINGS", len(results.strings.decoded_strings), console, verbose, disable_headers) + render_heading( + f"FLOSS DECODED STRINGS ({len(results.strings.decoded_strings)})", console, verbose, disable_headers + ) render_decoded_strings(results.strings.decoded_strings, console, verbose, disable_headers) console.file.seek(0) diff --git a/floss/results.py b/floss/results.py index 21a88834c..50849bca4 100644 --- a/floss/results.py +++ b/floss/results.py @@ -1,5 +1,6 @@ # Copyright (C) 2021 Mandiant, Inc. All Rights Reserved. +import re import json import datetime from enum import Enum @@ -139,6 +140,9 @@ def from_utf8(cls, buf, addr, min_length): except UnicodeDecodeError: raise ValueError("not utf-8") + if not re.sub(r"[\r\n\t]", "", decoded_string).isprintable(): + raise ValueError("not printable") + if len(decoded_string) < min_length: raise ValueError("too short") return cls(string=decoded_string, offset=addr, encoding=StringEncoding.UTF8) @@ -187,6 +191,8 @@ class Metadata: min_length: int = 0 runtime: Runtime = field(default_factory=Runtime) language: str = "" + language_version: str = "" + language_selected: str = "" # configured by user @dataclass diff --git a/tests/test_language_id.py b/tests/test_language_id.py index 57f42f9d9..3240d5eb7 100644 --- a/tests/test_language_id.py +++ b/tests/test_language_id.py @@ -1,30 +1,30 @@ -import os from pathlib import Path import pytest from floss.utils import get_static_strings -from floss.language.identify import Language, identify_language +from floss.language.identify import VERSION_UNKNOWN_OR_NA, Language, identify_language_and_version @pytest.mark.parametrize( - "binary_file, expected_result", + "binary_file, expected_result, expected_version", [ - ("data/language/go/go-hello/bin/go-hello.exe", Language.GO), - ("data/language/rust/rust-hello/bin/rust-hello.exe", Language.RUST), - ("data/test-decode-to-stack.exe", Language.UNKNOWN), - ("data/language/dotnet/dotnet-hello/bin/dotnet-hello.exe", Language.DOTNET), - ("data/src/shellcode-stackstrings/bin/shellcode-stackstrings.bin", Language.UNKNOWN), + ("data/language/go/go-hello/bin/go-hello.exe", Language.GO, "1.20"), + ("data/language/rust/rust-hello/bin/rust-hello.exe", Language.RUST, "1.69.0"), + ("data/test-decode-to-stack.exe", Language.UNKNOWN, VERSION_UNKNOWN_OR_NA), + ("data/language/dotnet/dotnet-hello/bin/dotnet-hello.exe", Language.DOTNET, VERSION_UNKNOWN_OR_NA), + ("data/src/shellcode-stackstrings/bin/shellcode-stackstrings.bin", Language.UNKNOWN, VERSION_UNKNOWN_OR_NA), ], ) -def test_language_detection(binary_file, expected_result): +def test_language_detection(binary_file, expected_result, expected_version): CD = Path(__file__).resolve().parent abs_path = (CD / binary_file).resolve() - # check if the file exists + assert abs_path.exists(), f"File {binary_file} does not exist" static_strings = get_static_strings(abs_path, 4) - language = identify_language(abs_path, static_strings) - # Check the expected result + language, version = identify_language_and_version(abs_path, static_strings) + assert language == expected_result, f"Expected: {expected_result.value}, Actual: {language.value}" + assert version == expected_version, f"Expected: {expected_version}, Actual: {version}"