-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpackage_dependecies.py
161 lines (133 loc) · 6.02 KB
/
package_dependecies.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import os
import shutil
import requests
import subprocess
import yaml
import argparse
from tempfile import mkdtemp
from concurrent.futures import ThreadPoolExecutor
def get_default_branch(owner, repo):
"""Fetch the default branch of a repository."""
url = f"https://api.github.com/repos/{owner}/{repo}"
response = requests.get(url)
if response.status_code == 200:
return response.json().get("default_branch", "main")
else:
print(f"Error fetching default branch: {response.status_code}")
return "main" # Fallback to "main"
def fetch_tree_with_git(repo_url, branch=None, temp_dir=None):
"""Clone the repository and fetch the tree structure."""
# Extract owner and repo from URL
parts = repo_url.rstrip('.git').split('/')
owner, repo = parts[-2], parts[-1]
# Get default branch if not provided
if not branch:
branch = get_default_branch(owner, repo)
# Clone the repository metadata to the temporary directory
repo_download_path = os.path.join(temp_dir, f"{owner}-{repo}-{branch}-bare")
subprocess.run(["git", "clone", "--bare", repo_url, repo_download_path], check=True)
result = subprocess.run(
["git", "ls-tree", "-r", "--name-only", branch],
cwd=repo_download_path,
capture_output=True,
text=True
)
if result.returncode != 0:
print(f"Error: {result.stderr}")
return None, None
# Parse the tree structure
files = result.stdout.splitlines()
tree = {}
for file in files:
parts = file.split("/")
current = tree
for part in parts[:-1]:
current = current.setdefault(part, {})
current[parts[-1]] = "file"
return tree, branch
def find_all_files_in_tree(tree, filename, path=""):
"""Recursively search for all instances of a file in the tree structure."""
found_files = []
for key, value in tree.items():
current_path = f"{path}/{key}" if path else key
if key == filename:
folder_name = path.split("/")[-1] if "/" in path else path
found_files.append((folder_name, current_path))
elif isinstance(value, dict): # It's a folder
found_files.extend(find_all_files_in_tree(value, filename, current_path))
return found_files
def create_raw_url(owner, repo, branch, file_path):
"""Construct the raw.githubusercontent.com URL for a file."""
raw_url = f"https://raw.githubusercontent.com/{owner}/{repo}/{branch}/{file_path}"
return raw_url
def get_package_dependencies(repo_url, branch, temp_dir, filename_to_find="package.xml"):
"""Fetch package dependencies and construct raw URLs."""
# Fetch tree and branch
tree_structure, branch = fetch_tree_with_git(repo_url, branch, temp_dir)
# Extract owner and repo
parts = repo_url.rstrip('.git').split('/')
owner, repo = parts[-2], parts[-1]
results = {}
# Find all instances of the file and construct URLs
if tree_structure:
found_files = find_all_files_in_tree(tree_structure, filename_to_find)
if found_files:
for folder_name, file_path in found_files:
raw_url = create_raw_url(owner, repo, branch, file_path)
if not folder_name:
folder_name = repo
results[folder_name] = raw_url
else:
print(f"Failed to fetch the repository structure: {repo_url}")
return results
def parse_and_validate_yaml(distro_yaml_path, output_file, max_threads=15):
"""Parse the distro.yaml, validate repositories, and store results."""
# Delete the output file if it exists
if os.path.exists(output_file):
os.remove(output_file)
with open(distro_yaml_path, "r") as yaml_file:
distro_data = yaml.safe_load(yaml_file)
# Temporary directory for bare repositories
temp_dir = mkdtemp()
reconciliation_report = {"total": 0, "matched": 0, "mismatched": 0, "unmatched": 0}
def process_repo(repo_name, repo_data):
nonlocal reconciliation_report
key_value_results = {}
repo_url = repo_data.get("url")
if not repo_url:
reconciliation_report["unmatched"] += 1
return
branch = repo_data.get("version", "main")
dependencies = get_package_dependencies(repo_url, branch, temp_dir)
for folder_name, raw_url in dependencies.items():
if folder_name in repo_data.get("packages", []):
key_value_results[folder_name] = raw_url
reconciliation_report["matched"] += 1
else:
reconciliation_report["mismatched"] += 1
# Update reconciliation totals
reconciliation_report["total"] += len(repo_data.get("packages", []))
# Incrementally save results to the output file
with open(output_file, "a") as result_file:
for key, value in key_value_results.items():
result_file.write(f"{key} => {value}\n")
try:
with ThreadPoolExecutor(max_threads) as executor:
for repo_name, repo_data in distro_data.items():
executor.submit(process_repo, repo_name, repo_data)
finally:
# Clean up temporary directory
shutil.rmtree(temp_dir)
# Print reconciliation report
print("\nReconciliation Report:")
print(f"Total Packages Declared: {reconciliation_report['total']}")
print(f"Matched Packages: {reconciliation_report['matched']}")
print(f"Mismatched Packages: {reconciliation_report['mismatched']}")
print(f"Unmatched Repositories: {reconciliation_report['unmatched']}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Parse distro.yaml and validate package dependencies.")
parser.add_argument("pkglist", type=str, help="Path to the distro.yaml file.")
parser.add_argument("output", type=str, help="Path to the output file.")
parser.add_argument("--max-threads", type=int, default=15, help="Maximum number of concurrent threads (default: 15).")
args = parser.parse_args()
parse_and_validate_yaml(args.pkglist, args.output, args.max_threads)