Skip to content

Commit

Permalink
CredentialsManager introduction
Browse files Browse the repository at this point in the history
  • Loading branch information
cpainchaud committed Mar 5, 2024
1 parent 9c2834b commit f60e235
Show file tree
Hide file tree
Showing 8 changed files with 294 additions and 90 deletions.
6 changes: 3 additions & 3 deletions pylo/API/APIConnector.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ def create_from_credentials_in_file(hostname_or_profile_name: str, request_if_mi
credentials = pylo.get_credentials_from_file(hostname_or_profile_name, credential_file)

if credentials is not None:
return APIConnector(credentials['hostname'], credentials['port'], credentials['api_user'],
credentials['api_key'], skip_ssl_cert_check=not credentials['verify_ssl'],
org_id=credentials['org_id'], name=credentials['name'])
return APIConnector(credentials.hostname, credentials.port, credentials.api_user,
credentials.api_key, skip_ssl_cert_check=not credentials.verify_ssl,
org_id=credentials.org_id, name=credentials.name)

if not request_if_missing:
return None
Expand Down
180 changes: 135 additions & 45 deletions pylo/API/CredentialsManager.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
from typing import Dict, TypedDict, Union, List, Optional
import json
import os
from typing import Dict, TypedDict, Union, List

from ..Exception import PyloEx
from .. import log

try:
import paramiko
except ImportError:
log.debug("Paramiko library not found, SSH based encryption will not be available")
paramiko = None



class CredentialFileEntry(TypedDict):
name: str
Expand All @@ -16,6 +22,40 @@ class CredentialFileEntry(TypedDict):
verify_ssl: bool


class CredentialProfile:
name: str
hostname: str
port: int
api_user: str
api_key: str
org_id: int
verify_ssl: bool

def __init__(self, name: str, hostname: str, port: int, api_user: str, api_key: str, org_id: int, verify_ssl: bool, originating_file: Optional[str] = None):
self.name = name
self.hostname = hostname
self.port = port
self.api_user = api_user
self.api_key = api_key
self.org_id = org_id
self.verify_ssl = verify_ssl
self.originating_file = originating_file

self.raw_json: Optional[CredentialFileEntry] = None


@staticmethod
def from_credentials_file_entry(credential_file_entry: CredentialFileEntry, originating_file: Optional[str] = None):
return CredentialProfile(credential_file_entry['name'],
credential_file_entry['hostname'],
credential_file_entry['port'],
credential_file_entry['api_user'],
credential_file_entry['api_key'],
credential_file_entry['org_id'],
credential_file_entry['verify_ssl'],
originating_file)


CredentialsFileType = Union[CredentialFileEntry | List[CredentialFileEntry]]


Expand All @@ -37,59 +77,109 @@ def check_profile_json_structure(profile: Dict) -> None:
raise PyloEx("The profile {} does not contain a verify_ssl".format(profile))


def get_all_credentials_from_file(credential_file: str ) -> List[CredentialProfile]:
log.debug("Loading credentials from file: {}".format(credential_file))
with open(credential_file, 'r') as f:
credentials: CredentialsFileType = json.load(f)
profiles: List[CredentialProfile] = []
if isinstance(credentials, list):
for profile in credentials:
check_profile_json_structure(profile)
profiles.append(CredentialProfile.from_credentials_file_entry(profile, credential_file))
else:
check_profile_json_structure(credentials)
profiles.append(CredentialProfile.from_credentials_file_entry(credentials, credential_file))

return profiles


def get_credentials_from_file(hostname_or_profile_name: str = None,
credential_file: str = None) -> CredentialFileEntry:
"""
Credentials files will be looked for in the following order:
1. The path provided in the credential_file argument
2. The path provided in the PYLO_CREDENTIAL_FILE environment variable
3. The path ~/.pylo/credentials.json
4. Current working directory credentials.json
"""
credential_file: str = None) -> CredentialProfile:

if hostname_or_profile_name is None:
log.debug("No hostname_or_profile_name provided, profile_name=default will be used")
hostname_or_profile_name = "default"

if credential_file is None:
log.debug("No credential_file provided, looking for one in the environment variable PYLO_CREDENTIAL_FILE")
credential_file = os.environ.get('PYLO_CREDENTIAL_FILE', None)
if credential_file is None:
log.debug("No credential_file provided, looking for one in the default path ~/.pylo/credentials.json")
credential_file = os.path.expanduser("~/.pylo/credentials.json")
if not os.path.exists(credential_file):
log.debug("No credential_file provided, looking for one in the current working directory credentials.json")
credential_file = os.path.join(os.getcwd(), "credentials.json")
if not os.path.exists(credential_file):
raise PyloEx("No credential file found. Please provide a path to a credential file or create one "
"in the default location ~/.pylo/credentials.json")
credential_files: List[str] = []
if credential_file is not None:
credential_files.append(credential_file)
else:
credential_files = list_potential_credential_files()

credentials: List[CredentialProfile] = []

for file in credential_files:
log.debug("Loading credentials from file: {}".format(credential_file))
with open(credential_file, 'r') as f:
credentials.extend(get_all_credentials_from_file(file))

for credential_profile in credentials:
if credential_profile.name.lower() == hostname_or_profile_name.lower():
return credential_profile
if credential_profile.hostname.lower() == hostname_or_profile_name.lower():
return credential_profile

raise PyloEx("No profile found in credential file '{}' with hostname: {}".
format(credential_file, hostname_or_profile_name))


def list_potential_credential_files() -> List[str]:
"""
List the potential locations where a credential file could be found and return them if they exist
:return:
"""
potential_credential_files = []
if os.environ.get('PYLO_CREDENTIAL_FILE', None) is not None:
potential_credential_files.append(os.environ.get('PYLO_CREDENTIAL_FILE'))
potential_credential_files.append(os.path.expanduser("~/.pylo/credentials.json"))
potential_credential_files.append(os.path.join(os.getcwd(), "credentials.json"))

return [file for file in potential_credential_files if os.path.exists(file)]


def get_all_credentials() -> List[CredentialProfile]:
"""
Get all credentials from all potential credential files
:return:
"""
credential_files = list_potential_credential_files()
credentials = []
for file in credential_files:
credentials.extend(get_all_credentials_from_file(file))
return credentials


def create_credential_in_file(file_full_path: str, data: CredentialFileEntry, overwrite_existing_profile = False) -> None:
# if file already exists, load it and append the new credential to it
if os.path.isdir(file_full_path):
file_full_path = os.path.join(file_full_path, "credentials.json")

if os.path.exists(file_full_path):
with open(file_full_path, 'r') as f:
credentials: CredentialsFileType = json.load(f)
found_profile = None
available_profiles_names: List[str] = []
# if it is a list, we need to find the right one
if isinstance(credentials, list):
# check if the profile already exists
for profile in credentials:
check_profile_json_structure(profile)
available_profiles_names.append(profile['name'])
if profile['name'].lower() == hostname_or_profile_name.lower():
found_profile = profile
break
if profile['hostname'].lower() == hostname_or_profile_name.lower():
found_profile = profile
break
if found_profile is None:
raise PyloEx("No profile named '{}' found in credential file '{}' with hostname/profile: {}."
" Available profiles are: {}".
format(hostname_or_profile_name, credential_file, hostname_or_profile_name, available_profiles_names))

if profile['name'].lower() == data['name'].lower():
if overwrite_existing_profile:
profile = data
break
else:
raise PyloEx("Profile with name {} already exists in file {}".format(data['name'], file_full_path))
credentials.append(data)
else:
log.debug("Credentials file is not a list, assuming it is a single profile")
check_profile_json_structure(credentials)
found_profile = credentials
if data['name'].lower() == credentials['name'].lower():
if overwrite_existing_profile:
credentials = data
else:
raise PyloEx("Profile with name {} already exists in file {}".format(data['name'], file_full_path))
else:
credentials = [credentials, data]
else:
credentials = [data]

return found_profile
# write to the file
with open(file_full_path, 'w') as f:
json.dump(credentials, f, indent=4)

raise PyloEx("No profile found in credential file '{}' with hostname: {}".
format(credential_file, hostname_or_profile_name))
def create_credential_in_default_file(data: CredentialFileEntry) -> None:
create_credential_in_file(os.path.expanduser("~/.pylo/credentials.json"), data)
8 changes: 4 additions & 4 deletions pylo/Organization.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ def get_from_api_using_credential_file(hostname_or_profile_name: str = None,
"""
credentials = get_credentials_from_file(hostname_or_profile_name, credential_file)

connector = pylo.APIConnector(hostname=credentials['hostname'], port=credentials['port'],
apiuser=credentials['api_user'], apikey=credentials['api_key'],
org_id=credentials['org_id'],
skip_ssl_cert_check=not credentials['verify_ssl'], name=hostname_or_profile_name)
connector = pylo.APIConnector(hostname=credentials.hostname, port=credentials.port,
apiuser=credentials.api_user, apikey=credentials.api_key,
org_id=credentials.org_id,
skip_ssl_cert_check=not credentials.verify_ssl, name=hostname_or_profile_name)

objects = connector.get_pce_objects(list_of_objects_to_load=list_of_objects_to_load,
include_deleted_workloads=include_deleted_workloads)
Expand Down
62 changes: 32 additions & 30 deletions pylo/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def execute_native_parsers(args: Dict, org: pylo.Organization, native_parsers: o
attr.execute(args[attr.get_arg_name()], org, padding=' ')

parser = argparse.ArgumentParser(description='TODO LATER')
parser.add_argument('--pce', type=str, required=True,
parser.add_argument('--pce', type=str, required=False,
help='hostname of the PCE')
parser.add_argument('--debug', action='store_true',
help='Enables extra debugging output in PYLO framework')
Expand Down Expand Up @@ -64,8 +64,9 @@ def execute_native_parsers(args: Dict, org: pylo.Organization, native_parsers: o
if args['debug']:
pylo.log_set_debug()

hostname = args['pce']
credential_profile_name = args['pce']
settings_use_cache = args['use_cache']
org: Optional[pylo.Organization] = None

# We are getting the command object associated to the command name if it was not already set (via forced_command_name)
if selected_command is None:
Expand All @@ -76,37 +77,38 @@ def execute_native_parsers(args: Dict, org: pylo.Organization, native_parsers: o
connector: Optional[pylo.APIConnector] = None
config_data = None

if settings_use_cache:
print(" * Loading objects from cached PCE '{}' data... ".format(hostname), end="", flush=True)
org = pylo.Organization.get_from_cache_file(hostname)
print("OK!")
connector = pylo.APIConnector.create_from_credentials_in_file(hostname, request_if_missing=False)
if connector is not None:
org.connector = connector
else:
print(" * Looking for PCE/profile '{}' credentials... ".format(hostname), end="", flush=True)
connector = pylo.APIConnector.create_from_credentials_in_file(hostname, request_if_missing=True)
print("OK!")
if not selected_command.credentials_manager_mode:
if settings_use_cache:
print(" * Loading objects from cached PCE '{}' data... ".format(credential_profile_name), end="", flush=True)
org = pylo.Organization.get_from_cache_file(credential_profile_name)
print("OK!")
connector = pylo.APIConnector.create_from_credentials_in_file(credential_profile_name, request_if_missing=False)
if connector is not None:
org.connector = connector
else:
print(" * Looking for PCE/profile '{}' credentials... ".format(credential_profile_name), end="", flush=True)
connector = pylo.APIConnector.create_from_credentials_in_file(credential_profile_name, request_if_missing=True)
print("OK!")

print(" * Downloading PCE objects from API... ".format(credential_profile_name), end="", flush=True)
config_data = connector.get_pce_objects(list_of_objects_to_load=selected_command.load_specific_objects_only)
print("OK!")

print(" * Downloading PCE objects from API... ".format(hostname), end="", flush=True)
config_data = connector.get_pce_objects(list_of_objects_to_load=selected_command.load_specific_objects_only)
print("OK!")
org = pylo.Organization(1)
org.connector = connector

org = pylo.Organization(1)
org.connector = connector
if not selected_command.skip_pce_config_loading:
print(" * Loading objects from PCE '{}' via API... ".format(credential_profile_name), end="", flush=True)
org.pce_version = connector.get_software_version()
org.load_from_json(config_data, list_of_objects_to_load=selected_command.load_specific_objects_only)
print("OK!")

print()
if not selected_command.skip_pce_config_loading:
print(" * Loading objects from PCE '{}' via API... ".format(hostname), end="", flush=True)
org.pce_version = connector.get_software_version()
org.load_from_json(config_data, list_of_objects_to_load=selected_command.load_specific_objects_only)
print("OK!")

print()
if not selected_command.skip_pce_config_loading:
print(" * PCE statistics: ")
print(org.stats_to_str(padding=' '))
print(" * PCE statistics: ")
print(org.stats_to_str(padding=' '))

print(flush=True)
print(flush=True)

print("**** {} UTILITY ****".format(selected_command.name.upper()), flush=True)
if selected_command.native_parsers is None:
Expand All @@ -116,9 +118,9 @@ def execute_native_parsers(args: Dict, org: pylo.Organization, native_parsers: o
execute_native_parsers(args, org, native_parsers)

if native_parsers is not None:
commands.available_commands[selected_command.name].main(args, org, config_data=config_data, connector=connector, native_parsers=native_parsers, pce_cache_was_used=settings_use_cache)
commands.available_commands[selected_command.name].main(args, org=org, config_data=config_data, connector=connector, native_parsers=native_parsers, pce_cache_was_used=settings_use_cache)
else:
commands.available_commands[selected_command.name].main(args, org, config_data=config_data, connector=connector, pce_cache_was_used=settings_use_cache)
commands.available_commands[selected_command.name].main(args, org=org, config_data=config_data, connector=connector, pce_cache_was_used=settings_use_cache)
print("**** END OF {} UTILITY ****".format(selected_command.name.upper()))
print()

Expand Down
5 changes: 4 additions & 1 deletion pylo/cli/commands/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
class Command:
def __init__(self, name: str, main_func, parser_func, load_specific_objects_only: Optional[List[str]] = None,
skip_pce_config_loading: bool = False,
native_parsers_as_class: Optional = None):
native_parsers_as_class: Optional = None,
credentials_manager_mode: bool = False):
self.name: str = name
self.main = main_func
self.fill_parser = parser_func
self.load_specific_objects_only: Optional[List[str]] = load_specific_objects_only
self.skip_pce_config_loading = skip_pce_config_loading
self.native_parsers = native_parsers_as_class
self.credentials_manager_mode = credentials_manager_mode
available_commands[name] = self


Expand All @@ -27,3 +29,4 @@ def __init__(self, name: str, main_func, parser_func, load_specific_objects_only
from .workload_import import command_object
from .ven_idle_to_visibility import command_object
from .workload_reset_names_to_null import command_object
from .credential_manager import command_object
Loading

0 comments on commit f60e235

Please sign in to comment.