Skip to content

Commit

Permalink
Add the ability for candore to resume an extraction after an error
Browse files Browse the repository at this point in the history
This change makes some structural changes to allow the extractor to
store extraction to files that it can later resume from.
Due to the async nature of the extractor, I haven't allowed this
save/resume to happen within an entity, so the latest entity being
extracted would lose its progress and have to start again.
  • Loading branch information
JacobCallahan committed Aug 15, 2024
1 parent 2633855 commit 870326c
Show file tree
Hide file tree
Showing 10 changed files with 119 additions and 43 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## Introduction

`Candore` is the command line interface data integrity tool. The tool is build to verify the change made in a product has any impact on data in product.
`Candore` is the command line interface data integrity tool. The tool is build to verify the change made in a product has any impact on data in product.

**The change** could be:
- Upgrade of the product to new version
Expand Down
7 changes: 5 additions & 2 deletions candore/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from candore.modules.extractor import Extractor
from candore.modules.finder import Finder
from candore.modules.report import Reporting
from candore.config import candore_settings


class Candore:
Expand All @@ -22,7 +21,9 @@ def __init__(self, settings):
def list_endpoints(self):
return self.api_lister.lister_endpoints()

async def save_all_entities(self, mode, output_file, full, max_pages=None, skip_percent=None):
async def save_all_entities(
self, mode, output_file, full, max_pages=None, skip_percent=None, resume=None
):
"""Save all the entities to a json file
:param mode: Pre or Post
Expand All @@ -39,6 +40,8 @@ async def save_all_entities(self, mode, output_file, full, max_pages=None, skip_
extractor.full = True
extractor.max_pages = max_pages
extractor.skip_percent = skip_percent
if resume:
extractor.load_resume_info()
data = await extractor.extract_all_entities()
if hasattr(self.settings, 'rpms'):
data.update({'installed_rpms': await extractor.extract_all_rpms()})
Expand Down
6 changes: 4 additions & 2 deletions candore/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def candore(ctx, version, settings_file, components_file, conf_dir):
settings=candore_settings(
option_settings_file=settings_file,
option_components_file=components_file,
conf_dir=conf_dir
conf_dir=conf_dir,
)
)
ctx.__dict__["candore"] = candore_obj
Expand All @@ -49,8 +49,9 @@ def apis(ctx):
@click.option("--full", is_flag=True, help="Extract data from all the pages of a component")
@click.option("--max-pages", type=int, help="The maximum number of pages to extract per entity")
@click.option("--skip-percent", type=int, help="The percentage of pages to skip per entity")
@click.option("--resume", is_flag=True, help="Resume the extraction from the last completed entity")
@click.pass_context
def extract(ctx, mode, output, full, max_pages, skip_percent):
def extract(ctx, mode, output, full, max_pages, skip_percent, resume):
loop = asyncio.get_event_loop()
candore_obj = ctx.parent.candore
loop.run_until_complete(
Expand All @@ -60,6 +61,7 @@ def extract(ctx, mode, output, full, max_pages, skip_percent):
full=full,
max_pages=max_pages,
skip_percent=skip_percent,
resume=resume,
)
)

Expand Down
4 changes: 2 additions & 2 deletions candore/modules/comparator.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,9 @@ def custom_key(elem):
def compare_all_pres_with_posts(self, pre_data, post_data, unique_key="", var_details=None):
if unique_key:
self.big_key.append(unique_key)
if isinstance(pre_data, dict):
if isinstance(pre_data, dict) and post_data:
self._is_data_type_dict(pre_data, post_data, unique_key=unique_key)
elif isinstance(pre_data, list):
elif isinstance(pre_data, list) and post_data:
self._is_data_type_list(pre_data, post_data, unique_key=unique_key)
else:
if pre_data != post_data:
Expand Down
109 changes: 83 additions & 26 deletions candore/modules/extractor.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
import asyncio # noqa: F401
import json
import math
from functools import cached_property
from candore.modules.ssh import Session
import re
from functools import cached_property
from pathlib import Path

import aiohttp

from candore.modules.ssh import Session

# Max observed request duration in testing was approximately 888 seconds
# so we set the timeout to 2000 seconds to be overly safe
EXTENDED_TIMEOUT = aiohttp.ClientTimeout(total=2000, connect=60, sock_read=2000, sock_connect=60)
RESUME_FILE = Path("_resume_info.json")
PARTIAL_FILE = Path("_partial_extraction.json")


class Extractor:
Expand All @@ -27,6 +33,12 @@ def __init__(self, settings, apilister=None):
self.apilister = apilister
self.full = False
self.semaphore = asyncio.Semaphore(self.settings.candore.max_connections)
self._all_data = {}
self._api_endpoints = None
self._completed_entities = []
self._current_entity = None
self._current_endpoint = None
self._retry_limit = 3

@cached_property
def dependent_components(self):
Expand All @@ -40,7 +52,9 @@ def ignore_components(self):

@cached_property
def api_endpoints(self):
return self.apilister.lister_endpoints()
if not self._api_endpoints:
self._api_endpoints = self.apilister.lister_endpoints()
return self._api_endpoints

async def _start_session(self):
if not self.client:
Expand All @@ -56,13 +70,37 @@ async def __aenter__(self):

async def __aexit__(self, exc_type, exc_val, exc_tb):
await self._end_session()
if exc_val:
with open("_partial_extraction.json", "w") as partial_file:
json.dump(self._all_data, partial_file)
with open("_resume_info.json", "w") as resume_file:
json.dump(self.to_resume_dict(), resume_file, indent=4)

async def _retry_get(self, retries=None, **get_params):
if not retries:
retries = self._retry_limit
try:
async with self.client.get(**get_params) as response:
if response.status == 200:
json_data = await response.json()
return response.status, json_data
else:
return response.status, {}
except aiohttp.ClientError:
if retries > 0:
return await self._retry_get(retries=retries - 1, **get_params)
else:
print(
f"Failed to get data from {get_params.get('url')} "
f"in {self._retry_limit} retries."
)
raise

async def paged_results(self, **get_params):
async with self.client.get(**get_params, timeout=EXTENDED_TIMEOUT) as response:
if response.status == 200:
_paged_results = await response.json()
_paged_results = _paged_results.get("results")
return _paged_results
status, _paged_results = await self._retry_get(**get_params, timeout=EXTENDED_TIMEOUT)
if status == 200:
_paged_results = _paged_results.get("results")
return _paged_results

async def fetch_page(self, page, _request):
async with self.semaphore:
Expand Down Expand Up @@ -95,18 +133,17 @@ async def fetch_component_entities(self, **comp_params):
_request = {"url": self.base + "/" + endpoint, "params": {}}
if data and dependency:
_request["params"].update({f"{dependency}_id": data})
async with self.client.get(**_request) as response:
if response.status == 200:
results = await response.json()
if "results" in results:
entity_data.extend(results.get("results"))
else:
# Return an empty directory for endpoints
# like services, api etc
# which does not have results
return entity_data
status, results = await self._retry_get(**_request)
if status == 200:
if "results" in results:
entity_data.extend(results.get("results"))
else:
# Return an empty directory for endpoints
# like services, api etc
# which does not have results
return entity_data
else:
return entity_data
total_pages = results.get("total") // results.get("per_page") + 1
if total_pages > 1:
print(f"Endpoint {endpoint} has {total_pages} pages.")
Expand Down Expand Up @@ -154,11 +191,12 @@ async def component_params(self, component_endpoint):

async def process_entities(self, endpoints):
"""
endpoints = ['katello/api/actiovationkeys']
endpoints = ['katello/api/activationkeys']
"""
comp_data = []
entities = None
for endpoint in endpoints:
self._current_endpoint = endpoint
comp_params = await self.component_params(component_endpoint=endpoint)
if comp_params:
entities = []
Expand All @@ -183,21 +221,40 @@ async def extract_all_entities(self):
:return:
"""
all_data = {}
for component, endpoints in self.api_endpoints.items():
if endpoints:
self._current_entity = component
if endpoints and component not in self._completed_entities:
comp_entities = await self.process_entities(endpoints=endpoints)
all_data[component] = comp_entities
return all_data
self._all_data[component] = comp_entities
self._completed_entities.append(component)
return self._all_data

async def extract_all_rpms(self):
"""Extracts all installed RPMs from server"""
with Session(settings=self.settings) as ssh_client:
rpms = ssh_client.execute('rpm -qa').stdout
rpms = rpms.splitlines()
name_version_pattern = rf'{self.settings.rpms.regex_pattern}'
rpms_matches = [
re.compile(name_version_pattern).match(rpm) for rpm in rpms
]
rpms_matches = [re.compile(name_version_pattern).match(rpm) for rpm in rpms]
rpms_list = [rpm_match.groups()[:-1] for rpm_match in rpms_matches if rpm_match]
return dict(rpms_list)

def to_resume_dict(self):
"""Exports our latest extraction progress information to a dictionary"""
return {
"api_endpoints": self._api_endpoints,
"completed_entities": self._completed_entities,
"current_entity": self._current_entity,
"current_endpoint": self._current_endpoint,
}

def load_resume_info(self):
"""Resumes our extraction from the last known state"""
resume_info = json.load(RESUME_FILE.read_text())
self._api_endpoints = resume_info["api_endpoints"]
self._completed_entities = resume_info["completed_entities"]
self._current_entity = resume_info["current_entity"]
self._current_endpoint = resume_info["current_endpoint"]
self._all_data = json.loads(PARTIAL_FILE.read_text())
RESUME_FILE.unlink()
PARTIAL_FILE.unlink()
18 changes: 14 additions & 4 deletions candore/modules/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,22 @@ def _generate_csv_report(self, output_file, inverse):
# Convert json to csv and write to output file
csv_writer = csv.writer(output_file.open("w"))
# Table Column Names
columns = ["Path", "Pre-Upgrade", "Post-Upgrade", "Variation?" if not inverse else 'Constant?']
columns = [
"Path",
"Pre-Upgrade",
"Post-Upgrade",
"Variation?" if not inverse else 'Constant?',
]
csv_writer.writerow(columns)
# Writing Rows
for var_path, vals in self.results.items():
csv_writer.writerow([
var_path, vals["pre"], vals["post"],
vals["variation" if not inverse else "constant"]])
csv_writer.writerow(
[
var_path,
vals["pre"],
vals["post"],
vals["variation" if not inverse else "constant"],
]
)
print("Wrote CSV report to {}".format(output_file))
print("CSV report contains {} results".format(len(self.results)))
5 changes: 2 additions & 3 deletions candore/modules/ssh.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
from hussh import Connection
from functools import cached_property
from candore.config import candore_settings
from urllib.parse import urlparse

from hussh import Connection


class Session:

def __init__(self, settings=None):
self.settings = settings
self.hostname = urlparse(settings.candore.base_url).hostname
Expand Down
5 changes: 3 additions & 2 deletions candore/modules/variations.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
`conf/variations` yaml file and convert them into processable list
"""
from functools import cached_property
from candore.utils import yaml_reader, get_yaml_paths

from candore.utils import get_yaml_paths
from candore.utils import yaml_reader


class Variations:
Expand All @@ -20,7 +22,6 @@ def expected_variations(self):
yaml_data = self.variations.get("expected_variations") if self.variations else None
return get_yaml_paths(yaml_data=yaml_data)


@cached_property
def skipped_variations(self):
yaml_data = self.variations.get("skipped_variations") if self.variations else None
Expand Down
3 changes: 2 additions & 1 deletion candore/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
An utility helpers module
"""
from pathlib import Path

import yaml


Expand Down Expand Up @@ -40,4 +41,4 @@ def get_yaml_paths(yaml_data, prefix="", separator="/"):
paths.extend(get_yaml_paths(item, prefix, separator))
else:
paths.append(f"{prefix}{yaml_data}")
return paths
return paths
3 changes: 3 additions & 0 deletions scripts/gen_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import yaml

KEEP_FIELDS = ["name", "label", "title", "url", "description", "path"]
SKIP_ENTITIES = ["errata", "package_groups", "repository_sets"]
SKIP_DICT = {}
HELP_TEXT = """
This script processes a comparison report, in the form of a csv file, and outputs a constants file.
Expand All @@ -26,6 +27,8 @@

def filter_parts(parts):
for check in KEEP_FIELDS:
if parts[0] in SKIP_ENTITIES:
return
if check in parts[-1]:
return True

Expand Down

0 comments on commit 870326c

Please sign in to comment.