From 66115aad261448bfefb8333e8f8418a1f2b12d97 Mon Sep 17 00:00:00 2001 From: Carl Flottmann Date: Fri, 17 Jan 2025 15:18:25 +1000 Subject: [PATCH] refactor: refactoring existing source code analysis functionality --- src/macaron/__main__.py | 11 +- src/macaron/config/defaults.ini | 4 + .../pypi_heuristics/heuristics.py | 3 + .../pypi_sourcecode_analyzer.py | 444 ++++++++++++------ .../suspicious_patterns.yaml} | 6 +- src/macaron/slsa_analyzer/analyze_context.py | 8 +- src/macaron/slsa_analyzer/analyzer.py | 14 +- .../checks/detect_malicious_metadata_check.py | 57 ++- .../package_registry/pypi_registry.py | 122 ++--- src/macaron/util.py | 15 +- 10 files changed, 421 insertions(+), 263 deletions(-) rename src/macaron/malware_analyzer/pypi_heuristics/{ => sourcecode}/pypi_sourcecode_analyzer.py (58%) rename src/macaron/malware_analyzer/pypi_heuristics/{suspicious_pattern.yaml => sourcecode/suspicious_patterns.yaml} (95%) diff --git a/src/macaron/__main__.py b/src/macaron/__main__.py index bd0b378b5..3c4f99f05 100644 --- a/src/macaron/__main__.py +++ b/src/macaron/__main__.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022 - 2024, Oracle and/or its affiliates. All rights reserved. +# Copyright (c) 2022 - 2025, Oracle and/or its affiliates. All rights reserved. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. """This is the main entrypoint to run Macaron.""" @@ -179,7 +179,7 @@ def analyze_slsa_levels_single(analyzer_single_args: argparse.Namespace) -> None analyzer_single_args.sbom_path, deps_depth, provenance_payload=prov_payload, - validate_malware_switch=analyzer_single_args.validate_malware_switch, + analyze_source=analyzer_single_args.analyze_source, ) sys.exit(status_code) @@ -486,10 +486,13 @@ def main(argv: list[str] | None = None) -> None: ) single_analyze_parser.add_argument( - "--validate-malware-switch", + "--analyze-source", required=False, action="store_true", - help=("Enable malware validation."), + help=( + "EXPERIMENTAL. For improved malware detection, analyze the source code of the" + + " (PyPI) package using a textual scan and dataflow analysis." + ), ) # Dump the default values. diff --git a/src/macaron/config/defaults.ini b/src/macaron/config/defaults.ini index f895c20aa..4dd5c3d6c 100644 --- a/src/macaron/config/defaults.ini +++ b/src/macaron/config/defaults.ini @@ -594,3 +594,7 @@ major_threshold = 20 epoch_threshold = 3 # The number of days +/- the day of publish the calendar versioning day may be. day_publish_error = 4 + +# yaml configuration file containing suspicious patterns. Can be full path or relative to +# folder where macaron is installed +suspicious_patterns_file = src/macaron/malware_analyzer/pypi_heuristics/sourcecode/suspicious_patterns.yaml diff --git a/src/macaron/malware_analyzer/pypi_heuristics/heuristics.py b/src/macaron/malware_analyzer/pypi_heuristics/heuristics.py index 1bd724fad..16a07f404 100644 --- a/src/macaron/malware_analyzer/pypi_heuristics/heuristics.py +++ b/src/macaron/malware_analyzer/pypi_heuristics/heuristics.py @@ -37,6 +37,9 @@ class Heuristics(str, Enum): #: Indicates that the package has an unusually large version number for a single release. ANOMALOUS_VERSION = "anomalous_version" + #: Indicates that the package source code contains suspicious code patterns. + SUSPICIOUS_PATTERNS = "suspicious_patterns" + class HeuristicResult(str, Enum): """Result type indicating the outcome of a heuristic.""" diff --git a/src/macaron/malware_analyzer/pypi_heuristics/pypi_sourcecode_analyzer.py b/src/macaron/malware_analyzer/pypi_heuristics/sourcecode/pypi_sourcecode_analyzer.py similarity index 58% rename from src/macaron/malware_analyzer/pypi_heuristics/pypi_sourcecode_analyzer.py rename to src/macaron/malware_analyzer/pypi_heuristics/sourcecode/pypi_sourcecode_analyzer.py index edf7a1830..a616c8e57 100644 --- a/src/macaron/malware_analyzer/pypi_heuristics/pypi_sourcecode_analyzer.py +++ b/src/macaron/malware_analyzer/pypi_heuristics/sourcecode/pypi_sourcecode_analyzer.py @@ -1,4 +1,4 @@ -# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved. +# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. """ @@ -13,184 +13,253 @@ import ipaddress import logging import os -import pathlib import re +from dataclasses import dataclass import yaml +from macaron.config.defaults import defaults +from macaron.errors import ConfigurationError, HeuristicAnalyzerValueError from macaron.json_tools import JsonType +from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset logger: logging.Logger = logging.getLogger(__name__) +IMPORTS = "imports" +CONSTANTS = "constants" +CALLS = "calls" -class DataFlowTracer(ast.NodeVisitor): - """The class is used to create the symbol table and analyze the dataflow.""" - def __init__(self) -> None: - self.symbol_table: dict = {} # Store variable assignments - self.trace_path: list = [] +@dataclass(frozen=True) +class Import: + """Data class to hold information about extracted import statements. - def visit_Assign(self, node: ast.Assign) -> None: # noqa: N802 # pylint: disable=C0103 - """Visit the Assign node and build the symbol table.""" - for target in node.targets: - if isinstance(target, ast.Name): - target_name = target.id - if isinstance(node.value, ast.Name): - self.symbol_table[target_name] = str(node.value.id) - elif isinstance(node.value, ast.Constant): - self.symbol_table[target_name] = str(node.value.value) - # Handle other assignment types as needed (e.g., function calls, lists) - else: - self.symbol_table[target_name] = ast.unparse(node.value) - self.generic_visit(node) # Important for visiting nested assign + Name, alias, and module are referring to the following patterns of python import statements: + - [from ] import [as ] + """ - def trace_back(self, variable_name: str) -> list: - """Get the full path of the dataflow. + name: str + alias: str | None + module: str | None + lineno: int + statement: str - Parameters - ---------- - variable_name: str - The argument of the function call. + +class PyPISourcecodeAnalyzer: + """This class is used to analyze the source code of python PyPI packages. This analyzer is a work in progress. + + This analyzer works in two phases. In the first phase, it will perform a pattern-based scan of all python files + in the source code, looking for suspicious patterns defined by the YAML file in defaults.ini. By default, this + will include suspicious package imports, suspicious hardcoded constants, and suspicious function calls. If this + scan does not find any suspicious activity, the analysis will stop and the package will be marked as benign + by this analyzer. If the scan does find suspicious activity, the analyzer will move on to the second phase. + + In the second phase, the analyzer will perform dataflow analysis. This will track the flow of suspicious constants + and the results of suspicious function calls to where they are used, to determine if they are used in a malicious + manner. Suspicious activity includes data exfiltration, code execution, remote connections, operating system and + process manipulation, and encoded and obfuscated patterns. The types of activity, and their severity and quantity, + will then determine the probability of the package being malicious. + + Currently, this analyzer only supports the first phase, and will return simply boolean results on the maliciousness + of the package. + """ + + EXPECTED_PATTERN_CATEGORIES = [IMPORTS, CONSTANTS, CALLS] + + def __init__(self) -> None: + """Collect required data for analysing the source code.""" + self.suspicious_patterns = self._load_defaults() + + def _load_defaults(self) -> dict[str, dict[str, list]]: + """Load the suspicious pattern from suspicious_pattern.yaml. Returns ------- - list - The path of the dataflow. - """ - self.trace_path = [] - self._recursive_trace(variable_name) - return self.trace_path - - def _recursive_trace(self, variable_name: str) -> None: - """Recursively build the dataflow path by analyzing the symbol table. + dict[str: dict[str, list]] + The suspicious pattern. - Parameters - ---------- - variable_name: str - The argument of the function call. + Raises + ------ + ConfigurationError + if the suspicious pattern file is not in the expected format or cannot be accessed. """ - if variable_name in self.symbol_table: - value = self.symbol_table[variable_name] - if not self.trace_path: - self.trace_path.extend([variable_name, value]) + suspicious_patterns: dict[str, dict[str, list]] = {} + section_name = "heuristic.pypi" + + if defaults.has_section(section_name): + section = defaults[section_name] + else: + error_msg = f"Unable to find section {section_name}, which is required to load suspicious patterns." + logger.debug(error_msg) + raise ConfigurationError(error_msg) + + configuration_name = "suspicious_patterns_file" + filename = section.get(configuration_name) + if filename is None: + error_msg = f"Unable to find {configuration_name} in configuration file." + logger.debug(error_msg) + raise ConfigurationError(error_msg) + + filename = os.path.normpath(filename) + try: + with open(filename, encoding="utf-8") as file: + configured_patterns: dict[str, JsonType] = yaml.safe_load(file) + except FileNotFoundError as file_error: + error_msg = f"Unable to open locate {filename}" + logger.debug(error_msg) + raise ConfigurationError(error_msg) from file_error + except yaml.YAMLError as yaml_error: + error_msg = f"Unable to parse {filename} as a yaml file." + logger.debug(error_msg) + raise ConfigurationError(error_msg) from yaml_error + + for expected_category in self.EXPECTED_PATTERN_CATEGORIES: + if expected_category not in configured_patterns: + error_msg = ( + f"Expected suspicious pattern category {expected_category} present in" + + f" {filename}: must have categories {self.EXPECTED_PATTERN_CATEGORIES}" + ) + logger.debug(error_msg) + raise ConfigurationError(error_msg) + + for category, patterns in configured_patterns.items(): + suspicious_patterns[category] = {} + if isinstance(patterns, list): + suspicious_patterns[category][category] = patterns + elif isinstance(patterns, dict): + for subcategory, subpatterns in patterns.items(): + if not isinstance(subpatterns, list): + error_msg = f"Expected subcategory {subcategory} items to be" + f" a list in {filename}" + logger.debug(error_msg) + raise ConfigurationError(error_msg) + + suspicious_patterns[category][subcategory] = subpatterns else: - self.trace_path.append(value) - if ( - isinstance(value, str) and value in self.symbol_table and self.symbol_table[value] != value - ): # only trace if it is a var name - self._recursive_trace(value) + error_msg = f"Expected category {category} to be either a list" + f" or dictionary in {filename}" + logger.debug(error_msg) + raise ConfigurationError(error_msg) - def generate_symbol_table(self, source_code: str) -> None: - """Generate the symbol table. + return suspicious_patterns + + def analyze_patterns(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicResult, dict[str, JsonType]]: + """Analyze the source code of the package for malicious patterns. + + This is the first phase of the source code analyzer. Parameters ---------- - source_code: str - The source code of the script. + pypi_package_json: PyPIPackageJsonAsset + The PyPI package JSON asset object. + + Returns + ------- + tuple[HeuristicResult, dict[str, JsonType]] + Containing the analysis results and relevant patterns identified. + + Raises + ------ + HeuristicAnalyzerValueError + if there is no source code available. """ - tree = ast.parse(source_code) - self.visit(tree) + analysis_result: dict = {} + result: HeuristicResult = HeuristicResult.PASS + source_code = pypi_package_json.package_sourcecode + if not source_code: + error_msg = "Unable to retrieve PyPI package source code" + logger.debug(error_msg) + raise HeuristicAnalyzerValueError(error_msg) -class PyPISourcecodeAnalyzer: - """This class is used to analyze the source code.""" + for filename, content in source_code.items(): + detail_info = {} - def __init__(self, pypi_package_json: PyPIPackageJsonAsset) -> None: - """Collect required data for analysing the source code.""" - self.source_code: dict[str, str] | None = pypi_package_json.get_sourcecode() - self.suspicious_pattern: dict[str, JsonType] | None = self._load_suspicious_pattern() - # self.extracted_suspicious_content: dict[str, JsonType] = {} - self.analysis_result: dict = {} - self.is_malware: bool = False + try: + _ = ast.parse(content) + except (SyntaxError, ValueError) as ast_parse_error: + logger.debug("File %s cannot be parsed as a python file: %s", filename, ast_parse_error) + continue - def analyze(self) -> tuple[bool, dict]: - """Analyze the source code of the PyPI package. + imports = self._extract_imports(content) + import_names = set() + for i in imports: + if i.module: + import_names.add(".".join([i.module, i.name])) + import_names.add(i.name) - Returns - ------- - dict - The result of the analysis. - """ - if self.source_code and self.suspicious_pattern: - for filename, content in self.source_code.items(): - try: - imports = self._extract_imports_from_ast(content) - except SyntaxError: - imports = self._extract_imports_from_lines(content) - - if isinstance(self.suspicious_pattern["imports"], list): - suspicious_imports: set[str] | None = imports & set(self.suspicious_pattern["imports"]) - else: - suspicious_imports = None - - # No suspicious imports in the source code. Skip the further steps. - if not suspicious_imports: - logger.debug("No suspicious imports found in the file %s", filename) - continue - - # TODO: Currently the symbol table stores the data for dataflow analysis. - # In the future, the dataflow will be more complicated and even handle the cross-file dataflow. - tracer = DataFlowTracer() - tracer.generate_symbol_table(content) - logger.debug(tracer.symbol_table) - - # TODO: In the future, the probability policy to decide the file is malicious or not - # will be implemented. Therefore, the functioncall_analyzer.analyze() will return detail_info - # and analysis result. - functioncall_analyzer = FunctionCallAnalyzer(self.suspicious_pattern, tracer) - is_malware, detail_info = functioncall_analyzer.analyze(content) - if is_malware: - self.is_malware = is_malware - - # TODO: Currently, the result collector does not handle the situation that - # multiple same filename. In the future, this will be replace with absolute path. - if detail_info: - self.analysis_result[filename] = detail_info - - # TODO: Implement other suspicious setup in suspicious_pattern.yaml - # pattern = r"install_requires\s*=\s*\[(.*?)\]" - # matches: re.Match | None = re.search(pattern, content, re.DOTALL) - # if matches: - # install_requires: set[str] | None = set(re.findall(r"'(.*?)'", matches.group(1))) - # if ( - # install_requires - # and install_requires & set(self.suspicious_pattern["imports"]) - # and len(install_requires) < 4 - # # This threshold is based on historical malwares - # ): - # extracted_data["install_requires"] = install_requires - # TODO: In the future this result from each file will be used to calculate the probability. - # Then the is_malicious will be based on this value. - # Currently, the default policy is - return self.is_malware, self.analysis_result - - # def extract_susupicious_content(self) -> None: - # """Extract the suspicious content from the source code.""" - # if not self.source_code or not self.suspicious_pattern: - # return - # self.extracted_suspicious_content = self._extract_suspicious_content_from_source() - - def _load_suspicious_pattern(self) -> dict[str, JsonType] | None: - """Load the suspicious pattern from suspicious_pattern.yaml. + for category, patterns in self.suspicious_patterns[IMPORTS].items(): + category_info = [] + + suspicious_imports = set.intersection(import_names, set(patterns)) + if suspicious_imports: + category_info = [i for i in imports if i.name in suspicious_imports] + result = HeuristicResult.FAIL + + detail_info[category] = category_info + + analysis_result[filename] = {IMPORTS: detail_info} + + return result, analysis_result + + def analyze_dataflow(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicResult, dict[str, JsonType]]: + """Analyze the source code of the package for malicious dataflow. + + This is the second phase of the source code analyzer. Currently, this function is a placeholder for future + work. + + Parameters + ---------- + pypi_package_json: PyPIPackageJsonAsset + The PyPI package JSON asset object. Returns ------- - dict[str, JsonType] | None - The suspicious pattern. + tuple[HeuristicResult, dict[str, JsonType]] + Containing the analysis results and relevant dataflows identified. + + Raises + ------ + HeuristicAnalyzerValueError + if there is no source code available. """ - filename: str = "suspicious_pattern.yaml" - curr_dir: pathlib.Path = pathlib.Path(__file__).parent.absolute() - suspicious_pattern_file: str = os.path.join(curr_dir, filename) - with open(suspicious_pattern_file, encoding="utf-8") as file: + analysis_result: dict = {} + result: HeuristicResult = HeuristicResult.SKIP + + source_code = pypi_package_json.package_sourcecode + if not source_code: + error_msg = "Unable to retrieve PyPI package source code" + logger.debug(error_msg) + raise HeuristicAnalyzerValueError(error_msg) + + for filename, content in source_code.items(): try: - suspicious_pattern: dict[str, JsonType] = yaml.safe_load(file) - except yaml.YAMLError as yaml_exception: - logger.debug("Error parsing the yaml file: '%s'", yaml_exception) - return None - return suspicious_pattern + _ = ast.parse(content) + except (SyntaxError, ValueError) as ast_parse_error: + logger.debug("File %s cannot be parsed as a python file: %s", filename, ast_parse_error) + continue - def _extract_imports_from_ast(self, content: str) -> set[str]: + # tracer = DataFlowTracer() + # tracer.generate_symbol_table(content) + + # functioncall_analyzer = FunctionCallAnalyzer(self.suspicious_pattern, tracer) + # is_malware, detail_info = functioncall_analyzer.analyze(content) + # if is_malware: + # result = HeuristicResult.FAIL + + # # TODO: Currently, the result collector does not handle the situation that + # # multiple same filename. In the future, this will be replace with absolute path. + # if detail_info: + # analysis_result[filename] = detail_info + + return result, analysis_result + + def _extract_imports(self, content: str) -> set[Import]: + try: + return self._extract_imports_from_ast(content) + except SyntaxError: + return self._extract_imports_from_lines(content) + + def _extract_imports_from_ast(self, content: str) -> set[Import]: """Extract imports from source code using the parsed AST. Parameters @@ -213,18 +282,16 @@ def _extract_imports_from_ast(self, content: str) -> set[str]: for node in ast.walk(tree): if isinstance(node, ast.Import): for alias in node.names: - imports.add(alias.name) + imports.add(Import(alias.name, alias.asname, None, alias.lineno, "")) elif isinstance(node, ast.ImportFrom): module = node.module if module: _module = "." * node.level + module - imports.add(_module) for name in node.names: - imports.add(_module + "." + name.name) - + imports.add(Import(name.name, name.asname, _module, name.lineno, "")) return imports - def _extract_imports_from_lines(self, content: str) -> set[str]: + def _extract_imports_from_lines(self, content: str) -> set[Import]: """Extract imports from source code using per line pattern matching. Parameters @@ -275,7 +342,7 @@ def _extract_imports_from_lines(self, content: str) -> set[str]: # 3 - from import statement module components. imports = set() - for line in content.splitlines(): + for lineno, line in enumerate(content.splitlines()): line.strip() match = re.match(combined_pattern, line) if not match: @@ -285,15 +352,13 @@ def _extract_imports_from_lines(self, content: str) -> set[str]: # Standard import, handle commas and aliases if present. splits = self._prune_aliased_lines(match.group(1), alias_pattern) for split in splits: - imports.add(split) + imports.add(Import(split, None, None, lineno, "")) elif match.group(2): # From import - imports.add(match.group(2)) if match.group(3): splits = self._prune_aliased_lines(match.group(3), alias_pattern) for split in splits: - imports.add(match.group(2) + "." + split) - + imports.add(Import(split, None, match.group(2), lineno, "")) return imports def _prune_aliased_lines(self, text: str, alias_pattern: str) -> list[str]: @@ -306,6 +371,75 @@ def _prune_aliased_lines(self, text: str, alias_pattern: str) -> list[str]: return results +class DataFlowTracer(ast.NodeVisitor): + """The class is used to create the symbol table and analyze the dataflow.""" + + def __init__(self) -> None: + self.symbol_table: dict = {} # Store variable assignments + self.trace_path: list = [] + + def visit_Assign(self, node: ast.Assign) -> None: # noqa: N802 # pylint: disable=C0103 + """Visit the Assign node and build the symbol table.""" + for target in node.targets: + if isinstance(target, ast.Name): + target_name = target.id + if isinstance(node.value, ast.Name): + self.symbol_table[target_name] = str(node.value.id) + elif isinstance(node.value, ast.Constant): + self.symbol_table[target_name] = str(node.value.value) + # Handle other assignment types as needed (e.g., function calls, lists) + else: + self.symbol_table[target_name] = ast.unparse(node.value) + self.generic_visit(node) # Important for visiting nested assign + + def trace_back(self, variable_name: str) -> list: + """Get the full path of the dataflow. + + Parameters + ---------- + variable_name: str + The argument of the function call. + + Returns + ------- + list + The path of the dataflow. + """ + self.trace_path = [] + self._recursive_trace(variable_name) + return self.trace_path + + def _recursive_trace(self, variable_name: str) -> None: + """Recursively build the dataflow path by analyzing the symbol table. + + Parameters + ---------- + variable_name: str + The argument of the function call. + """ + if variable_name in self.symbol_table: + value = self.symbol_table[variable_name] + if not self.trace_path: + self.trace_path.extend([variable_name, value]) + else: + self.trace_path.append(value) + if ( + isinstance(value, str) and value in self.symbol_table and self.symbol_table[value] != value + ): # only trace if it is a var name + self._recursive_trace(value) + + def generate_symbol_table(self, source_code: str) -> None: + """Generate the symbol table. + + Parameters + ---------- + source_code: str + The source code of the script. + """ + tree = ast.parse(source_code) + self.visit(tree) + + class FunctionCallAnalyzer(ast.NodeVisitor): """This class analyzes Python source code to identify potential suspicious behavior.""" @@ -348,8 +482,8 @@ def visit_Module(self, node: ast.Module) -> None: # noqa: N802 # pylint: disabl def visit_Call(self, node: ast.Call) -> None: # noqa: N802 # pylint: disable=C0103 """Visit the Call node.""" - suspicious_calls: dict = self.suspicious_patterns["ast_calls"] - suspicious_const: dict = self.suspicious_patterns["ast_constant"] + suspicious_calls: dict = self.suspicious_patterns[CALLS] + suspicious_const: dict = self.suspicious_patterns[CONSTANTS] function_call: str = ast.unparse(node.func) args: str = " ".join([ast.unparse(arg) for arg in node.args]) expr: str = ast.unparse(node) diff --git a/src/macaron/malware_analyzer/pypi_heuristics/suspicious_pattern.yaml b/src/macaron/malware_analyzer/pypi_heuristics/sourcecode/suspicious_patterns.yaml similarity index 95% rename from src/macaron/malware_analyzer/pypi_heuristics/suspicious_pattern.yaml rename to src/macaron/malware_analyzer/pypi_heuristics/sourcecode/suspicious_patterns.yaml index 9c15144d4..3838e23a7 100644 --- a/src/macaron/malware_analyzer/pypi_heuristics/suspicious_pattern.yaml +++ b/src/macaron/malware_analyzer/pypi_heuristics/sourcecode/suspicious_patterns.yaml @@ -1,4 +1,4 @@ -# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved. +# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. @@ -20,7 +20,7 @@ imports: - subprocess - Request -ast_calls: +calls: os_detection: - os.name code_execution: @@ -59,7 +59,7 @@ ast_calls: reverse_shell: - os.dup2 -ast_constant: +constants: domains: - webhook.site - discord diff --git a/src/macaron/slsa_analyzer/analyze_context.py b/src/macaron/slsa_analyzer/analyze_context.py index c2f6a0042..c86ddaa85 100644 --- a/src/macaron/slsa_analyzer/analyze_context.py +++ b/src/macaron/slsa_analyzer/analyze_context.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022 - 2024, Oracle and/or its affiliates. All rights reserved. +# Copyright (c) 2022 - 2025, Oracle and/or its affiliates. All rights reserved. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. """This module contains the Analyze Context class. @@ -57,8 +57,8 @@ class ChecksOutputs(TypedDict): """True if the provenance exists and has been verified against a signed companion provenance.""" local_artifact_paths: list[str] """The local artifact absolute paths.""" - validate_malware_switch: bool - """True when the malware validation is enabled.""" + analyze_source: bool + """True when PyPI source code analysis has been enabled.""" class AnalyzeContext: @@ -115,7 +115,7 @@ def __init__( provenance_commit_digest=None, provenance_verified=False, local_artifact_paths=[], - validate_malware_switch=False, + analyze_source=False, ) @property diff --git a/src/macaron/slsa_analyzer/analyzer.py b/src/macaron/slsa_analyzer/analyzer.py index 894c82134..b73b3b639 100644 --- a/src/macaron/slsa_analyzer/analyzer.py +++ b/src/macaron/slsa_analyzer/analyzer.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022 - 2024, Oracle and/or its affiliates. All rights reserved. +# Copyright (c) 2022 - 2025, Oracle and/or its affiliates. All rights reserved. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. """This module handles the cloning and analyzing a Git repo.""" @@ -125,7 +125,7 @@ def run( sbom_path: str = "", deps_depth: int = 0, provenance_payload: InTotoPayload | None = None, - validate_malware_switch: bool = False, + analyze_source: bool = False, ) -> int: """Run the analysis and write results to the output path. @@ -142,6 +142,8 @@ def run( The depth of dependency resolution. Default: 0. provenance_payload : InToToPayload | None The provenance intoto payload for the main software component. + analyze_source : bool + When true, triggers source code analysis for PyPI packages. Defaults to False. Returns ------- @@ -174,7 +176,7 @@ def run( main_config, analysis, provenance_payload=provenance_payload, - validate_malware_switch=validate_malware_switch, + analyze_source=analyze_source, ) if main_record.status != SCMStatus.AVAILABLE or not main_record.context: @@ -292,7 +294,7 @@ def run_single( analysis: Analysis, existing_records: dict[str, Record] | None = None, provenance_payload: InTotoPayload | None = None, - validate_malware_switch: bool = False, + analyze_source: bool = False, ) -> Record: """Run the checks for a single repository target. @@ -309,6 +311,8 @@ def run_single( The mapping of existing records that the analysis has run successfully. provenance_payload : InToToPayload | None The provenance intoto payload for the analyzed software component. + analyze_source : bool + When true, triggers source code analysis for PyPI packages. Defaults to False. Returns ------- @@ -483,7 +487,7 @@ def run_single( analyze_ctx.dynamic_data["provenance_verified"] = provenance_is_verified analyze_ctx.dynamic_data["provenance_repo_url"] = provenance_repo_url analyze_ctx.dynamic_data["provenance_commit_digest"] = provenance_commit_digest - analyze_ctx.dynamic_data["validate_malware_switch"] = validate_malware_switch + analyze_ctx.dynamic_data["analyze_source"] = analyze_source if parsed_purl and parsed_purl.type in self.local_artifact_repo_mapper: local_artifact_repo_path = self.local_artifact_repo_mapper[parsed_purl.type] diff --git a/src/macaron/slsa_analyzer/checks/detect_malicious_metadata_check.py b/src/macaron/slsa_analyzer/checks/detect_malicious_metadata_check.py index 0e2fe0039..075f48516 100644 --- a/src/macaron/slsa_analyzer/checks/detect_malicious_metadata_check.py +++ b/src/macaron/slsa_analyzer/checks/detect_malicious_metadata_check.py @@ -11,7 +11,7 @@ from macaron.database.db_custom_types import DBJsonDict from macaron.database.table_definitions import CheckFacts -from macaron.errors import HeuristicAnalyzerValueError +from macaron.errors import ConfigurationError, HeuristicAnalyzerValueError from macaron.json_tools import JsonType, json_extract from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics @@ -23,7 +23,7 @@ from macaron.malware_analyzer.pypi_heuristics.metadata.unchanged_release import UnchangedReleaseAnalyzer from macaron.malware_analyzer.pypi_heuristics.metadata.unreachable_project_links import UnreachableProjectLinksAnalyzer from macaron.malware_analyzer.pypi_heuristics.metadata.wheel_absence import WheelAbsenceAnalyzer -from macaron.malware_analyzer.pypi_heuristics.pypi_sourcecode_analyzer import PyPISourcecodeAnalyzer +from macaron.malware_analyzer.pypi_heuristics.sourcecode.pypi_sourcecode_analyzer import PyPISourcecodeAnalyzer from macaron.malware_analyzer.pypi_heuristics.sourcecode.suspicious_setup import SuspiciousSetupAnalyzer from macaron.slsa_analyzer.analyze_context import AnalyzeContext from macaron.slsa_analyzer.build_tool.pip import Pip @@ -282,26 +282,26 @@ def _should_skip( return True return False - def validate_malware(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[bool, dict[str, JsonType] | None]: - """Validate the package is malicious. + def analyze_source(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicResult, dict[str, JsonType]]: + """Analyze the source code of the package with a textual scan, looking for malicious code patterns. Parameters ---------- pypi_package_json: PyPIPackageJsonAsset + The PyPI package JSON asset object. Returns ------- - tuple[bool, dict[str, JsonType] | None] - Returns True if the source code includes suspicious pattern. - Returns the result of the validation including the line number - and the suspicious arguments. - e.g. requests.get("http://malicious.com") - return the "http://malicious.com" + tuple[HeuristicResult, dict[str, JsonType]] + Containing the analysis results and relevant patterns identified. """ - # TODO: This redundant function might be removed - sourcecode_analyzer = PyPISourcecodeAnalyzer(pypi_package_json) - is_malware, detail_info = sourcecode_analyzer.analyze() - return is_malware, detail_info + logger.debug("Instantiating %s", PyPISourcecodeAnalyzer.__name__) + try: + sourcecode_analyzer = PyPISourcecodeAnalyzer() + return sourcecode_analyzer.analyze_patterns(pypi_package_json) + except (ConfigurationError, HeuristicAnalyzerValueError) as source_code_error: + logger.debug("Unable to perform source code analysis: %s", source_code_error) + return HeuristicResult.SKIP, {} def run_heuristics( self, pypi_package_json: PyPIPackageJsonAsset @@ -406,7 +406,7 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData: # Create an AssetLocator object for the PyPI package JSON object. pypi_package_json = PyPIPackageJsonAsset( - component=ctx.component, pypi_registry=pypi_registry, package_json={} + component=ctx.component, pypi_registry=pypi_registry, package_json={}, package_sourcecode={} ) pypi_registry_info.metadata.append(pypi_package_json) @@ -414,28 +414,33 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData: # Download the PyPI package JSON, but no need to persist it to the filesystem. if pypi_package_json.download(dest=""): try: - result, detail_info = self.run_heuristics(pypi_package_json) + heuristic_results, heuristics_detail_info = self.run_heuristics(pypi_package_json) except HeuristicAnalyzerValueError: return CheckResultData(result_tables=[], result_type=CheckResultType.UNKNOWN) - result_combo: tuple = tuple(result.values()) + result_combo: tuple = tuple(heuristic_results.values()) confidence: float | None = SUSPICIOUS_COMBO.get(result_combo, None) result_type = CheckResultType.FAILED if confidence is None: confidence = Confidence.HIGH result_type = CheckResultType.PASSED - elif ctx.dynamic_data["validate_malware_switch"]: - is_malware, validation_result = self.validate_malware(pypi_package_json) - if is_malware: # Find source code block matched the malicious pattern - confidence = Confidence.HIGH - elif validation_result: # Find suspicious source code, but cannot be confirmed - confidence = Confidence.MEDIUM - logger.debug(validation_result) + + # experimental analyze sourcecode feature + if ctx.dynamic_data["analyze_source"] and pypi_package_json.download_sourcecode(): + sourcecode_result, sourcecode_detail_info = self.analyze_source(pypi_package_json) + heuristic_results[Heuristics.SUSPICIOUS_PATTERNS] = sourcecode_result + heuristics_detail_info.update(sourcecode_detail_info) + + if sourcecode_result == HeuristicResult.FAIL: + if result_type == CheckResultType.PASSED: + # heuristics determined it benign, so lower the confidence + confidence = Confidence.LOW + result_type = CheckResultType.FAILED result_tables.append( MaliciousMetadataFacts( - result=result, - detail_information=detail_info, + result=heuristic_results, + detail_information=heuristics_detail_info, confidence=confidence, ) ) diff --git a/src/macaron/slsa_analyzer/package_registry/pypi_registry.py b/src/macaron/slsa_analyzer/package_registry/pypi_registry.py index 9636ccea7..ac22a6f6b 100644 --- a/src/macaron/slsa_analyzer/package_registry/pypi_registry.py +++ b/src/macaron/slsa_analyzer/package_registry/pypi_registry.py @@ -185,77 +185,73 @@ def download_package_json(self, url: str) -> dict: return res_obj - def fetch_sourcecode(self, src_url: str) -> dict[str, str] | None: - """Get the source code of the package. + def download_package_sourcecode(self, url: str) -> dict: + """Download the package source code from pypi registry. + + Parameters + ---------- + url: str + The package source code url. Returns ------- - str | None - The source code. + dict[str: bytes] + A dictionary of filenames and file contents. """ + sourcecode: dict = {} + # Get name of file. - _, _, file_name = src_url.rpartition("/") + _, _, file_name = url.rpartition("/") - # Create a temporary directory to store the downloaded source. + # temporary directory to unzip and read all source files with tempfile.TemporaryDirectory() as temp_dir: - try: - response = requests.get(src_url, stream=True, timeout=40) - response.raise_for_status() - except requests.exceptions.HTTPError as http_err: - logger.debug("HTTP error occurred: %s", http_err) - return None - - if response.status_code != 200: - return None + response = send_get_http_raw(url, stream=True) + if response is None: + error_msg = f"Unable to find package source code using URL: {url}" + logger.debug(error_msg) + raise InvalidHTTPResponseError(error_msg) source_file = os.path.join(temp_dir, file_name) with open(source_file, "wb") as file: try: for chunk in response.iter_content(): file.write(chunk) - except RequestException as error: - # Something went wrong with the request, abort. - logger.debug("Error while streaming source file: %s", error) - response.close() - return None - logger.debug("Begin fetching the source code from PyPI") - py_files_content: dict[str, str] = {} + except RequestException as stream_error: + error_msg = f"Error while streaming source file: {stream_error}" + logger.debug(error_msg) + raise InvalidHTTPResponseError from RequestException + if tarfile.is_tarfile(source_file): try: - with tarfile.open(source_file, "r:gz") as tar: - for member in tar.getmembers(): - if member.isfile() and member.name.endswith(".py") and member.size > 0: - file_obj = tar.extractfile(member) - if file_obj: - content = file_obj.read().decode("utf-8") - py_files_content[member.name] = content - except tarfile.ReadError as exception: - logger.debug("Error reading tar file: %s", exception) - return None + with tarfile.open(source_file, "r:gz") as sourcecode_tar: + for member in sourcecode_tar.getmembers(): + if member.isfile() and (file_obj := sourcecode_tar.extractfile(member)): + sourcecode[member.name] = file_obj.read() + + except tarfile.ReadError as read_error: + error_msg = f"Error reading source code tar file: {read_error}" + logger.debug(error_msg) + raise InvalidHTTPResponseError(error_msg) from read_error + elif zipfile.is_zipfile(source_file): try: - with zipfile.ZipFile(source_file, "r") as zip_ref: - for info in zip_ref.infolist(): - if info.filename.endswith(".py") and not info.is_dir() and info.file_size > 0: - with zip_ref.open(info) as file_obj: - content = file_obj.read().decode("utf-8") - py_files_content[info.filename] = content - except zipfile.BadZipFile as bad_zip_exception: - logger.debug("Error reading zip file: %s", bad_zip_exception) - return None - except zipfile.LargeZipFile as large_zip_exception: - logger.debug("Zip file too large to read: %s", large_zip_exception) - return None - # except KeyError as zip_key_exception: - # logger.debug( - # "Error finding target '%s' in zip file '%s': %s", archive_target, source_file, zip_key_exception - # ) - # return None + with zipfile.ZipFile(source_file, "r") as sourcecode_zipfile: + for info in sourcecode_zipfile.infolist(): + if not info.is_dir(): + with sourcecode_zipfile.open(info) as file_obj: + sourcecode[info.filename] = file_obj.read() + + except (zipfile.BadZipFile, zipfile.LargeZipFile) as zipfile_error: + error_msg = f"Error reading source code zip file: {zipfile_error}" + logger.debug(error_msg) + raise InvalidHTTPResponseError(error_msg) from zipfile_error + else: - logger.debug("Unable to extract file: %s", file_name) + error_msg = f"Unable to extract source code from file {file_name}" + logger.debug(error_msg) + raise InvalidHTTPResponseError(error_msg) - logger.debug("Successfully fetch the source code from PyPI") - return py_files_content + return sourcecode def get_package_page(self, package_name: str) -> str | None: """Implement custom API to get package main page. @@ -375,6 +371,9 @@ class PyPIPackageJsonAsset: #: The asset content. package_json: dict + #: The source code of the package hosted on PyPI + package_sourcecode: dict + #: The size of the asset (in bytes). This attribute is added to match the AssetLocator #: protocol and is not used because pypi API registry does not provide it. @property @@ -504,16 +503,19 @@ def get_latest_release_upload_time(self) -> str | None: return upload_time return None - def get_sourcecode(self) -> dict[str, str] | None: - """Get source code of the package. + def download_sourcecode(self) -> bool: + """Get the source code of the package and store it in the package_sourcecode attribute. Returns ------- - dict[str, str] | None - The source code of each script in the package + bool + ``True`` if the source code is downloaded successfully; ``False`` if not. """ - url: str | None = self.get_sourcecode_url() + url = self.get_sourcecode_url() if url: - source_code: dict[str, str] | None = self.pypi_registry.fetch_sourcecode(url) - return source_code - return None + try: + self.package_sourcecode = self.pypi_registry.download_package_sourcecode(url) + return True + except InvalidHTTPResponseError as error: + logger.debug(error) + return False diff --git a/src/macaron/util.py b/src/macaron/util.py index 047d14125..132ce9d84 100644 --- a/src/macaron/util.py +++ b/src/macaron/util.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022 - 2024, Oracle and/or its affiliates. All rights reserved. +# Copyright (c) 2022 - 2025, Oracle and/or its affiliates. All rights reserved. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. """This module includes utilities functions for Macaron.""" @@ -126,7 +126,11 @@ def send_head_http_raw( def send_get_http_raw( - url: str, headers: dict | None = None, timeout: int | None = None, allow_redirects: bool = True + url: str, + headers: dict | None = None, + timeout: int | None = None, + allow_redirects: bool = True, + stream: bool = False, ) -> Response | None: """Send the GET HTTP request with the given url and headers. @@ -142,6 +146,8 @@ def send_get_http_raw( The request timeout (optional). allow_redirects: bool Whether to allow redirects. Default: True. + stream: bool + Indicates whether the response should be immediately downloaded (False) or streamed (True). Default: False. Returns ------- @@ -157,10 +163,7 @@ def send_get_http_raw( retry_counter = error_retries try: response = requests.get( - url=url, - headers=headers, - timeout=timeout, - allow_redirects=allow_redirects, + url=url, headers=headers, timeout=timeout, allow_redirects=allow_redirects, stream=stream ) except requests.exceptions.RequestException as error: logger.debug(error)