From 870326cbe31edb88936e2ebcbd5a2fb32d6ce2b7 Mon Sep 17 00:00:00 2001 From: Jacob Callahan Date: Mon, 15 Jul 2024 16:49:12 -0400 Subject: [PATCH] Add the ability for candore to resume an extraction after an error This change makes some structural changes to allow the extractor to store extraction to files that it can later resume from. Due to the async nature of the extractor, I haven't allowed this save/resume to happen within an entity, so the latest entity being extracted would lose its progress and have to start again. --- README.md | 2 +- candore/__init__.py | 7 ++- candore/cli.py | 6 +- candore/modules/comparator.py | 4 +- candore/modules/extractor.py | 109 ++++++++++++++++++++++++++-------- candore/modules/report.py | 18 ++++-- candore/modules/ssh.py | 5 +- candore/modules/variations.py | 5 +- candore/utils.py | 3 +- scripts/gen_constants.py | 3 + 10 files changed, 119 insertions(+), 43 deletions(-) diff --git a/README.md b/README.md index 6cba132..2a14316 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ ## Introduction -`Candore` is the command line interface data integrity tool. The tool is build to verify the change made in a product has any impact on data in product. +`Candore` is the command line interface data integrity tool. The tool is build to verify the change made in a product has any impact on data in product. **The change** could be: - Upgrade of the product to new version diff --git a/candore/__init__.py b/candore/__init__.py index d86f7ef..21c2265 100644 --- a/candore/__init__.py +++ b/candore/__init__.py @@ -11,7 +11,6 @@ from candore.modules.extractor import Extractor from candore.modules.finder import Finder from candore.modules.report import Reporting -from candore.config import candore_settings class Candore: @@ -22,7 +21,9 @@ def __init__(self, settings): def list_endpoints(self): return self.api_lister.lister_endpoints() - async def save_all_entities(self, mode, output_file, full, max_pages=None, skip_percent=None): + async def save_all_entities( + self, mode, output_file, full, max_pages=None, skip_percent=None, resume=None + ): """Save all the entities to a json file :param mode: Pre or Post @@ -39,6 +40,8 @@ async def save_all_entities(self, mode, output_file, full, max_pages=None, skip_ extractor.full = True extractor.max_pages = max_pages extractor.skip_percent = skip_percent + if resume: + extractor.load_resume_info() data = await extractor.extract_all_entities() if hasattr(self.settings, 'rpms'): data.update({'installed_rpms': await extractor.extract_all_rpms()}) diff --git a/candore/cli.py b/candore/cli.py index 0cb9e13..2f8c1a0 100644 --- a/candore/cli.py +++ b/candore/cli.py @@ -28,7 +28,7 @@ def candore(ctx, version, settings_file, components_file, conf_dir): settings=candore_settings( option_settings_file=settings_file, option_components_file=components_file, - conf_dir=conf_dir + conf_dir=conf_dir, ) ) ctx.__dict__["candore"] = candore_obj @@ -49,8 +49,9 @@ def apis(ctx): @click.option("--full", is_flag=True, help="Extract data from all the pages of a component") @click.option("--max-pages", type=int, help="The maximum number of pages to extract per entity") @click.option("--skip-percent", type=int, help="The percentage of pages to skip per entity") +@click.option("--resume", is_flag=True, help="Resume the extraction from the last completed entity") @click.pass_context -def extract(ctx, mode, output, full, max_pages, skip_percent): +def extract(ctx, mode, output, full, max_pages, skip_percent, resume): loop = asyncio.get_event_loop() candore_obj = ctx.parent.candore loop.run_until_complete( @@ -60,6 +61,7 @@ def extract(ctx, mode, output, full, max_pages, skip_percent): full=full, max_pages=max_pages, skip_percent=skip_percent, + resume=resume, ) ) diff --git a/candore/modules/comparator.py b/candore/modules/comparator.py index 7f0e538..458743f 100644 --- a/candore/modules/comparator.py +++ b/candore/modules/comparator.py @@ -132,9 +132,9 @@ def custom_key(elem): def compare_all_pres_with_posts(self, pre_data, post_data, unique_key="", var_details=None): if unique_key: self.big_key.append(unique_key) - if isinstance(pre_data, dict): + if isinstance(pre_data, dict) and post_data: self._is_data_type_dict(pre_data, post_data, unique_key=unique_key) - elif isinstance(pre_data, list): + elif isinstance(pre_data, list) and post_data: self._is_data_type_list(pre_data, post_data, unique_key=unique_key) else: if pre_data != post_data: diff --git a/candore/modules/extractor.py b/candore/modules/extractor.py index 7c6b90c..3a9c34e 100644 --- a/candore/modules/extractor.py +++ b/candore/modules/extractor.py @@ -1,13 +1,19 @@ import asyncio # noqa: F401 +import json import math -from functools import cached_property -from candore.modules.ssh import Session import re +from functools import cached_property +from pathlib import Path + import aiohttp +from candore.modules.ssh import Session + # Max observed request duration in testing was approximately 888 seconds # so we set the timeout to 2000 seconds to be overly safe EXTENDED_TIMEOUT = aiohttp.ClientTimeout(total=2000, connect=60, sock_read=2000, sock_connect=60) +RESUME_FILE = Path("_resume_info.json") +PARTIAL_FILE = Path("_partial_extraction.json") class Extractor: @@ -27,6 +33,12 @@ def __init__(self, settings, apilister=None): self.apilister = apilister self.full = False self.semaphore = asyncio.Semaphore(self.settings.candore.max_connections) + self._all_data = {} + self._api_endpoints = None + self._completed_entities = [] + self._current_entity = None + self._current_endpoint = None + self._retry_limit = 3 @cached_property def dependent_components(self): @@ -40,7 +52,9 @@ def ignore_components(self): @cached_property def api_endpoints(self): - return self.apilister.lister_endpoints() + if not self._api_endpoints: + self._api_endpoints = self.apilister.lister_endpoints() + return self._api_endpoints async def _start_session(self): if not self.client: @@ -56,13 +70,37 @@ async def __aenter__(self): async def __aexit__(self, exc_type, exc_val, exc_tb): await self._end_session() + if exc_val: + with open("_partial_extraction.json", "w") as partial_file: + json.dump(self._all_data, partial_file) + with open("_resume_info.json", "w") as resume_file: + json.dump(self.to_resume_dict(), resume_file, indent=4) + + async def _retry_get(self, retries=None, **get_params): + if not retries: + retries = self._retry_limit + try: + async with self.client.get(**get_params) as response: + if response.status == 200: + json_data = await response.json() + return response.status, json_data + else: + return response.status, {} + except aiohttp.ClientError: + if retries > 0: + return await self._retry_get(retries=retries - 1, **get_params) + else: + print( + f"Failed to get data from {get_params.get('url')} " + f"in {self._retry_limit} retries." + ) + raise async def paged_results(self, **get_params): - async with self.client.get(**get_params, timeout=EXTENDED_TIMEOUT) as response: - if response.status == 200: - _paged_results = await response.json() - _paged_results = _paged_results.get("results") - return _paged_results + status, _paged_results = await self._retry_get(**get_params, timeout=EXTENDED_TIMEOUT) + if status == 200: + _paged_results = _paged_results.get("results") + return _paged_results async def fetch_page(self, page, _request): async with self.semaphore: @@ -95,18 +133,17 @@ async def fetch_component_entities(self, **comp_params): _request = {"url": self.base + "/" + endpoint, "params": {}} if data and dependency: _request["params"].update({f"{dependency}_id": data}) - async with self.client.get(**_request) as response: - if response.status == 200: - results = await response.json() - if "results" in results: - entity_data.extend(results.get("results")) - else: - # Return an empty directory for endpoints - # like services, api etc - # which does not have results - return entity_data + status, results = await self._retry_get(**_request) + if status == 200: + if "results" in results: + entity_data.extend(results.get("results")) else: + # Return an empty directory for endpoints + # like services, api etc + # which does not have results return entity_data + else: + return entity_data total_pages = results.get("total") // results.get("per_page") + 1 if total_pages > 1: print(f"Endpoint {endpoint} has {total_pages} pages.") @@ -154,11 +191,12 @@ async def component_params(self, component_endpoint): async def process_entities(self, endpoints): """ - endpoints = ['katello/api/actiovationkeys'] + endpoints = ['katello/api/activationkeys'] """ comp_data = [] entities = None for endpoint in endpoints: + self._current_endpoint = endpoint comp_params = await self.component_params(component_endpoint=endpoint) if comp_params: entities = [] @@ -183,12 +221,13 @@ async def extract_all_entities(self): :return: """ - all_data = {} for component, endpoints in self.api_endpoints.items(): - if endpoints: + self._current_entity = component + if endpoints and component not in self._completed_entities: comp_entities = await self.process_entities(endpoints=endpoints) - all_data[component] = comp_entities - return all_data + self._all_data[component] = comp_entities + self._completed_entities.append(component) + return self._all_data async def extract_all_rpms(self): """Extracts all installed RPMs from server""" @@ -196,8 +235,26 @@ async def extract_all_rpms(self): rpms = ssh_client.execute('rpm -qa').stdout rpms = rpms.splitlines() name_version_pattern = rf'{self.settings.rpms.regex_pattern}' - rpms_matches = [ - re.compile(name_version_pattern).match(rpm) for rpm in rpms - ] + rpms_matches = [re.compile(name_version_pattern).match(rpm) for rpm in rpms] rpms_list = [rpm_match.groups()[:-1] for rpm_match in rpms_matches if rpm_match] return dict(rpms_list) + + def to_resume_dict(self): + """Exports our latest extraction progress information to a dictionary""" + return { + "api_endpoints": self._api_endpoints, + "completed_entities": self._completed_entities, + "current_entity": self._current_entity, + "current_endpoint": self._current_endpoint, + } + + def load_resume_info(self): + """Resumes our extraction from the last known state""" + resume_info = json.load(RESUME_FILE.read_text()) + self._api_endpoints = resume_info["api_endpoints"] + self._completed_entities = resume_info["completed_entities"] + self._current_entity = resume_info["current_entity"] + self._current_endpoint = resume_info["current_endpoint"] + self._all_data = json.loads(PARTIAL_FILE.read_text()) + RESUME_FILE.unlink() + PARTIAL_FILE.unlink() diff --git a/candore/modules/report.py b/candore/modules/report.py index 40400b0..a3d75c1 100644 --- a/candore/modules/report.py +++ b/candore/modules/report.py @@ -80,12 +80,22 @@ def _generate_csv_report(self, output_file, inverse): # Convert json to csv and write to output file csv_writer = csv.writer(output_file.open("w")) # Table Column Names - columns = ["Path", "Pre-Upgrade", "Post-Upgrade", "Variation?" if not inverse else 'Constant?'] + columns = [ + "Path", + "Pre-Upgrade", + "Post-Upgrade", + "Variation?" if not inverse else 'Constant?', + ] csv_writer.writerow(columns) # Writing Rows for var_path, vals in self.results.items(): - csv_writer.writerow([ - var_path, vals["pre"], vals["post"], - vals["variation" if not inverse else "constant"]]) + csv_writer.writerow( + [ + var_path, + vals["pre"], + vals["post"], + vals["variation" if not inverse else "constant"], + ] + ) print("Wrote CSV report to {}".format(output_file)) print("CSV report contains {} results".format(len(self.results))) diff --git a/candore/modules/ssh.py b/candore/modules/ssh.py index e8376ca..5ef9969 100644 --- a/candore/modules/ssh.py +++ b/candore/modules/ssh.py @@ -1,11 +1,10 @@ -from hussh import Connection from functools import cached_property -from candore.config import candore_settings from urllib.parse import urlparse +from hussh import Connection + class Session: - def __init__(self, settings=None): self.settings = settings self.hostname = urlparse(settings.candore.base_url).hostname diff --git a/candore/modules/variations.py b/candore/modules/variations.py index d29595c..23c0dd6 100644 --- a/candore/modules/variations.py +++ b/candore/modules/variations.py @@ -3,7 +3,9 @@ `conf/variations` yaml file and convert them into processable list """ from functools import cached_property -from candore.utils import yaml_reader, get_yaml_paths + +from candore.utils import get_yaml_paths +from candore.utils import yaml_reader class Variations: @@ -20,7 +22,6 @@ def expected_variations(self): yaml_data = self.variations.get("expected_variations") if self.variations else None return get_yaml_paths(yaml_data=yaml_data) - @cached_property def skipped_variations(self): yaml_data = self.variations.get("skipped_variations") if self.variations else None diff --git a/candore/utils.py b/candore/utils.py index 0e461a7..72fef6f 100644 --- a/candore/utils.py +++ b/candore/utils.py @@ -2,6 +2,7 @@ An utility helpers module """ from pathlib import Path + import yaml @@ -40,4 +41,4 @@ def get_yaml_paths(yaml_data, prefix="", separator="/"): paths.extend(get_yaml_paths(item, prefix, separator)) else: paths.append(f"{prefix}{yaml_data}") - return paths \ No newline at end of file + return paths diff --git a/scripts/gen_constants.py b/scripts/gen_constants.py index 331a64a..2bee980 100644 --- a/scripts/gen_constants.py +++ b/scripts/gen_constants.py @@ -4,6 +4,7 @@ import yaml KEEP_FIELDS = ["name", "label", "title", "url", "description", "path"] +SKIP_ENTITIES = ["errata", "package_groups", "repository_sets"] SKIP_DICT = {} HELP_TEXT = """ This script processes a comparison report, in the form of a csv file, and outputs a constants file. @@ -26,6 +27,8 @@ def filter_parts(parts): for check in KEEP_FIELDS: + if parts[0] in SKIP_ENTITIES: + return if check in parts[-1]: return True