From 79349a65790cef45a485414ac546a096bbed4fbf Mon Sep 17 00:00:00 2001 From: Ben Selwyn-Smith Date: Wed, 29 Nov 2023 19:21:20 +1000 Subject: [PATCH] chore: rename comparison function; update commit finder failure message; update e2e test file path; extract purl type check into standalone function and add unit test Signed-off-by: Ben Selwyn-Smith --- src/macaron/repo_finder/commit_finder.py | 66 +++++++++++++++---- .../repo_finder/repo_finder_deps_dev.py | 12 ++-- src/macaron/slsa_analyzer/analyzer.py | 4 +- tests/e2e/repo_finder/commit_finder.py | 8 +-- tests/repo_finder/test_commit_finder.py | 39 +++++++++++ 5 files changed, 106 insertions(+), 23 deletions(-) diff --git a/src/macaron/repo_finder/commit_finder.py b/src/macaron/repo_finder/commit_finder.py index 5de075a50..a032ec06e 100644 --- a/src/macaron/repo_finder/commit_finder.py +++ b/src/macaron/repo_finder/commit_finder.py @@ -4,6 +4,7 @@ """This module contains the logic for matching PackageURL versions to repository commits via the tags they contain.""" import logging import re +from enum import Enum from re import Pattern from git import TagReference @@ -101,6 +102,17 @@ versioned_string = re.compile("[a-z]+[0-9]+$", flags=re.IGNORECASE) # e.g. RC1, M5, etc. +class PurlType(Enum): + """The type represented by a PURL in terms of repository versus artifact. + + Unsupported types are allowed as a third type. + """ + + REPOSITORY = (0,) + ARTIFACT = (1,) + UNSUPPORTED = (2,) + + def find_commit(git_obj: Git, purl: PackageURL) -> tuple[str, str]: """Try to find the commit matching the passed PURL. @@ -125,18 +137,40 @@ def find_commit(git_obj: Git, purl: PackageURL) -> tuple[str, str]: logger.debug("Missing version for analysis target: %s", purl.name) return "", "" + repo_type = abstract_purl_type(purl) + if repo_type == PurlType.REPOSITORY: + return extract_commit_from_version(git_obj, version) + if repo_type == PurlType.ARTIFACT: + return find_commit_from_version_and_name(git_obj, purl.name, version) + logger.debug("Type of PURL is not supported for commit finding: %s", purl.type) + return "", "" + + +def abstract_purl_type(purl: PackageURL) -> PurlType: + """Determine if the passed purl is a repository type, artifact type, or unsupported type. + + Parameters + ---------- + purl: PackageURL + A PURL that represents a repository, artifact, or something that is not supported. + + Returns + ------- + PurlType: + The identified type of the PURL. + """ available_domains = [git_service.hostname for git_service in GIT_SERVICES if git_service.hostname] domain = to_domain_from_known_purl_types(purl.type) or (purl.type if purl.type in available_domains else None) if domain: # PURL is a repository type. - return extract_commit_from_version(git_obj, version) + return PurlType.REPOSITORY try: repo_finder_deps_dev.DepsDevType(purl.type) - # PURL is a package manager type. - return find_commit_from_version_and_name(git_obj, purl.name, version) + # PURL is an artifact type. + return PurlType.ARTIFACT except ValueError: - logger.debug("Type of PURL is not supported for commit finding: %s", purl.type) - return "", "" + # PURL is an unsupported type. + return PurlType.UNSUPPORTED def extract_commit_from_version(git_obj: Git, version: str) -> tuple[str, str]: @@ -236,19 +270,25 @@ def find_commit_from_version_and_name(git_obj: Git, name: str, version: str) -> logger.debug("Missing tag name from tag dict: %s not in %s", tag_name, valid_tags.keys()) branch_name = _get_branch_of_commit(git_obj.get_commit_from_tag(tag_name)) + try: + hexsha = tag.commit.hexsha + except ValueError: + logger.debug("Error trying to retrieve digest of commit: %s", tag.commit) + return "", "" + if not branch_name: - logger.debug("No valid branch associated with tag (commit): %s (%s)", tag_name, tag.commit.hexsha) + logger.debug("No valid branch associated with tag (commit): %s (%s)", tag_name, hexsha) return "", "" logger.debug( "Found tag %s with commit %s of branch %s for artifact version %s@%s", tag, - tag.commit.hexsha, + hexsha, branch_name, name, version, ) - return branch_name, tag.commit.hexsha + return branch_name, hexsha def _build_version_pattern(name: str, version: str) -> tuple[Pattern | None, list[str], bool]: @@ -349,9 +389,9 @@ def match_tags(tag_list: list[str], artifact_name: str, artifact_version: str) - tag_list: list[str] The list of tags to check. artifact_name: str - The name of the artifact to match. + The name of the analysis target. artifact_version: str - The version of the artifact to match. + The version of the analysis target. Returns ------- @@ -419,13 +459,15 @@ def match_tags(tag_list: list[str], artifact_name: str, artifact_version: str) - # If multiple tags still remain, sort them based on the closest match in terms of individual parts. if len(matched_tags) > 1: matched_tags.sort( - key=lambda matched_tag: _count_parts_in_tag(matched_tag["version"], matched_tag["suffix"], parts) + key=lambda matched_tag: _compute_tag_version_similarity( + matched_tag["version"], matched_tag["suffix"], parts + ) ) return [_["tag"] for _ in matched_tags] -def _count_parts_in_tag(tag_version: str, tag_suffix: str, version_parts: list[str]) -> int: +def _compute_tag_version_similarity(tag_version: str, tag_suffix: str, version_parts: list[str]) -> int: """Return a sort value based on how well the tag version and tag suffix match the parts of the actual version. Parameters diff --git a/src/macaron/repo_finder/repo_finder_deps_dev.py b/src/macaron/repo_finder/repo_finder_deps_dev.py index 032686432..c3b8f5e87 100644 --- a/src/macaron/repo_finder/repo_finder_deps_dev.py +++ b/src/macaron/repo_finder/repo_finder_deps_dev.py @@ -4,7 +4,7 @@ """This module contains the PythonRepoFinderDD class to be used for finding repositories using deps.dev.""" import json import logging -from enum import Enum +from enum import StrEnum from urllib.parse import quote as encode from packageurl import PackageURL @@ -16,17 +16,17 @@ logger: logging.Logger = logging.getLogger(__name__) -class DepsDevType(Enum): +class DepsDevType(StrEnum): """ The package manager types supported by deps.dev. This enum should be updated based on updates to deps.dev. """ - MAVEN = ("maven",) - PYPI = ("pypi",) - NUGET = ("nuget",) - CARGO = ("cargo",) + MAVEN = "maven" + PYPI = "pypi" + NUGET = "nuget" + CARGO = "cargo" NPM = "npm" diff --git a/src/macaron/slsa_analyzer/analyzer.py b/src/macaron/slsa_analyzer/analyzer.py index 105a192ac..59b31197c 100644 --- a/src/macaron/slsa_analyzer/analyzer.py +++ b/src/macaron/slsa_analyzer/analyzer.py @@ -703,7 +703,9 @@ def _prepare_repo( if not digest and purl and purl.version: branch_name, digest = find_commit(git_obj, purl) if not (branch_name and digest): - logger.error("Could not map purl version to specific commit in repository.") + logger.error( + "Could not map the input purl string to a specific commit in the corresponding repository." + ) return None # Checking out the specific branch or commit. This operation varies depends on the git service that the diff --git a/tests/e2e/repo_finder/commit_finder.py b/tests/e2e/repo_finder/commit_finder.py index f63a2e1de..d7489acdd 100644 --- a/tests/e2e/repo_finder/commit_finder.py +++ b/tests/e2e/repo_finder/commit_finder.py @@ -18,12 +18,12 @@ # Set logging debug level. logger.setLevel(logging.DEBUG) -path = Path(__file__).parent.joinpath("resources", "java_tags.json") +java_tags_file_path = Path(__file__).parent.joinpath("resources", "java_tags.json") def test_commit_finder() -> int: """Test the commit finder's tag matching functionality.""" - with open(path, encoding="utf-8") as tag_file: + with open(java_tags_file_path, encoding="utf-8") as tag_file: json_data = json.load(tag_file) fail_count = 0 for item in json_data: @@ -52,7 +52,7 @@ def test_commit_finder() -> int: def update_commit_finder_results() -> None: """Run the commit finder with the current results file and update the match values (override the file).""" # pylint: disable=protected-access - with open(path, encoding="utf-8") as tag_file: + with open(java_tags_file_path, encoding="utf-8") as tag_file: json_data = json.load(tag_file) for item in json_data: name = str(item["name"]) @@ -60,7 +60,7 @@ def update_commit_finder_results() -> None: matched_tags = commit_finder.match_tags(item["tags"], name, version) matched_tag = matched_tags[0] if matched_tags else "" item["match"] = matched_tag - with open(path, "w", encoding="utf-8") as tag_file: + with open(java_tags_file_path, "w", encoding="utf-8") as tag_file: json.dump(json_data, tag_file, indent=4) diff --git a/tests/repo_finder/test_commit_finder.py b/tests/repo_finder/test_commit_finder.py index 2d7ea15f1..84e5ef563 100644 --- a/tests/repo_finder/test_commit_finder.py +++ b/tests/repo_finder/test_commit_finder.py @@ -6,11 +6,13 @@ import re import hypothesis +import pytest from hypothesis import given, settings from hypothesis.strategies import DataObject, data, text from packageurl import PackageURL from macaron.repo_finder import commit_finder +from macaron.repo_finder.commit_finder import PurlType logger: logging.Logger = logging.getLogger(__name__) @@ -42,6 +44,43 @@ def _test_version(tags: list[str], name: str, version: str, target_tag: str) -> assert matched_tags[0] == target_tag +@pytest.mark.parametrize( + ("purls", "expected"), + [ + pytest.param( + [ + "pkg:maven/apache/maven", + "pkg:maven/commons-io/commons-io@2.15.0", + "pkg:pypi/requests@2.31.0", + "pkg:npm/@colors/colors@1.4.0", + "pkg:nuget/system.text.json@8.0.0", + "pkg:cargo/mailmeld@1.0.0", + ], + PurlType.ARTIFACT, + id="Artifact PURLs", + ), + pytest.param( + [ + "pkg:github/apache/maven@69bc993b8089a2d3d1ddfd6c7d4f8dc6cc205995", + "pkg:github/oracle/macaron@v0.6.0", + "pkg:bitbucket/owner/project@tag_5", + ], + PurlType.REPOSITORY, + id="Repository PURLs", + ), + pytest.param( + ["pkg:gem/ruby-advisory-db-check@0.12.4", "pkg:unknown-domain/project/owner@tag"], + PurlType.UNSUPPORTED, + id="Unsupported PURLs", + ), + ], +) +def test_abstract_purl_type(purls: list[str], expected: PurlType) -> None: + """Test each purl in list is of expected type.""" + for purl in purls: + assert commit_finder.abstract_purl_type(PackageURL.from_string(purl)) == expected + + @given(text()) @settings(max_examples=1000) def test_pattern_generation(version: str) -> None: