From a15dc52f50149e8e78771744cfa8f0efe2cd2bfc Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Sun, 12 May 2024 13:52:54 +0200 Subject: [PATCH] improved the config hook module --- examples/app/tmtcc.py | 20 ++++------ tests/test_util.py | 31 ++++++++------- tmtccmd/config/defs.py | 13 ++++-- tmtccmd/config/hook.py | 84 +++++++++++++++++++-------------------- tmtccmd/config/objects.py | 10 +++-- tmtccmd/util/__init__.py | 14 ++++++- tmtccmd/util/conf_util.py | 8 ++-- tmtccmd/util/json.py | 3 +- tmtccmd/util/obj_id.py | 43 ++++++++++++++------ tmtccmd/util/retval.py | 7 +++- 10 files changed, 137 insertions(+), 96 deletions(-) diff --git a/examples/app/tmtcc.py b/examples/app/tmtcc.py index b550e6b6..3df8bb13 100755 --- a/examples/app/tmtcc.py +++ b/examples/app/tmtcc.py @@ -44,7 +44,6 @@ TcProcedureType, TcQueueEntryType, ) -from tmtccmd.util import ObjectIdDictT from spacepackets.seqcount import FileSeqCountProvider, PusFileSeqCountProvider _LOGGER = logging.getLogger() @@ -58,7 +57,7 @@ class ExampleHookClass(HookBase): def __init__(self, json_cfg_path: str): - super().__init__(json_cfg_path=json_cfg_path) + super().__init__(cfg_file_path=json_cfg_path) def get_communication_interface(self, com_if_key: str) -> Optional[ComInterface]: assert self.cfg_path is not None @@ -124,11 +123,6 @@ def perform_mode_operation(self, _tmtc_backend: CcsdsTmtcBackend, _mode: int): _LOGGER.info("Mode operation hook was called") pass - def get_object_ids(self) -> ObjectIdDictT: - from tmtccmd.config.objects import get_core_object_ids - - return get_core_object_ids() - class PusTmHandler(SpecificApidHandlerBase): def __init__( @@ -238,15 +232,15 @@ def send_cb(self, send_params: SendCbParams): log_entry = entry_helper.to_log_entry() _LOGGER.info(log_entry.log_str) - def queue_finished_cb(self, helper: ProcedureWrapper): - if helper.proc_type == TcProcedureType.TREE_COMMANDING: - def_proc = helper.to_tree_commanding_procedure() + def queue_finished_cb(self, info: ProcedureWrapper): + if info.proc_type == TcProcedureType.TREE_COMMANDING: + def_proc = info.to_tree_commanding_procedure() _LOGGER.info(f"Queue handling finished for command {def_proc.cmd_path}") - def feed_cb(self, helper: ProcedureWrapper, wrapper: FeedWrapper): + def feed_cb(self, info: ProcedureWrapper, wrapper: FeedWrapper): self.queue_helper.queue_wrapper = wrapper.queue_wrapper - if helper.proc_type == TcProcedureType.TREE_COMMANDING: - def_proc = helper.to_tree_commanding_procedure() + if info.proc_type == TcProcedureType.TREE_COMMANDING: + def_proc = info.to_tree_commanding_procedure() cmd_path = def_proc.cmd_path assert cmd_path is not None # Path starts with / so the first entry of the list will be an empty string. We cut diff --git a/tests/test_util.py b/tests/test_util.py index 09501666..32c836ef 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -1,48 +1,51 @@ from unittest import TestCase -from tmtccmd.util import ObjectIdU32 -from tmtccmd.util.obj_id import ObjectIdU8, ObjectIdU16 +from tmtccmd.util.obj_id import ( + ComponentIdU16, + ComponentIdU32, + ComponentIdU8, +) class TestObjectId(TestCase): def test_basic(self): - obj_id0 = ObjectIdU32(1, "Some Name") + obj_id0 = ComponentIdU32(1, "Some Name") self.assertEqual(str(obj_id0), "Object ID Some Name with ID 0x00000001") self.assertEqual( - obj_id0.__repr__(), "ObjectIdU32(object_id=1, name='Some Name')" + obj_id0.__repr__(), "ComponentIdU32(object_id=1, name='Some Name')" ) self.assertEqual(obj_id0.as_bytes, bytes([0x00, 0x00, 0x00, 0x01])) self.assertEqual(obj_id0.as_hex_string, "0x00000001") self.assertEqual(int(obj_id0), 1) - obj_id1 = ObjectIdU32(1, "Other Name") + obj_id1 = ComponentIdU32(1, "Other Name") self.assertEqual(obj_id0, obj_id1) - obj_from_raw = ObjectIdU32.from_bytes(obj_id0.as_bytes) + obj_from_raw = ComponentIdU32.from_bytes(obj_id0.as_bytes) self.assertEqual(obj_from_raw, obj_id0) with self.assertRaises(ValueError): - ObjectIdU32.from_bytes(bytes()) + ComponentIdU32.from_bytes(bytes()) with self.assertRaises(ValueError): - ObjectIdU32.from_bytes(bytes([0, 1, 2])) + ComponentIdU32.from_bytes(bytes([0, 1, 2])) with self.assertRaises(ValueError): obj_id1.obj_id = -1 def test_diff_types(self): - obj_id_u8 = ObjectIdU8(1, "U8 ID 0") + obj_id_u8 = ComponentIdU8(1, "U8 ID 0") self.assertEqual(obj_id_u8.as_bytes, bytes([1])) self.assertEqual(obj_id_u8.as_hex_string, "0x01") self.assertEqual(obj_id_u8.byte_len, 1) - obj_id_u16 = ObjectIdU16(2, "U16 ID 2") + obj_id_u16 = ComponentIdU16(2, "U16 ID 2") self.assertEqual(obj_id_u16.as_bytes, bytes([0, 2])) self.assertEqual(obj_id_u16.as_hex_string, "0x0002") self.assertEqual(obj_id_u16.byte_len, 2) - obj_id_u32 = ObjectIdU32(1, "U32 ID 1") + obj_id_u32 = ComponentIdU32(1, "U32 ID 1") test_dict = dict() test_dict.update({obj_id_u8: obj_id_u8.name}) test_dict.update({obj_id_u16: obj_id_u16.name}) test_dict.update({obj_id_u32: obj_id_u32.name}) self.assertEqual(len(test_dict), 3) - obj_id_u8_from_raw = ObjectIdU8.from_bytes(obj_id_u8.as_bytes) + obj_id_u8_from_raw = ComponentIdU8.from_bytes(obj_id_u8.as_bytes) self.assertEqual(obj_id_u8_from_raw, obj_id_u8) - obj_id_u16_from_raw = ObjectIdU16.from_bytes(obj_id_u16.as_bytes) + obj_id_u16_from_raw = ComponentIdU16.from_bytes(obj_id_u16.as_bytes) self.assertEqual(obj_id_u16_from_raw, obj_id_u16) - obj_id_u32_from_raw = ObjectIdU32.from_bytes(obj_id_u32.as_bytes) + obj_id_u32_from_raw = ComponentIdU32.from_bytes(obj_id_u32.as_bytes) self.assertEqual(obj_id_u32_from_raw, obj_id_u32) diff --git a/tmtccmd/config/defs.py b/tmtccmd/config/defs.py index 0c7658e3..b230d7cd 100644 --- a/tmtccmd/config/defs.py +++ b/tmtccmd/config/defs.py @@ -1,12 +1,13 @@ import enum from dataclasses import dataclass -from typing import Any, Tuple, Dict, Union, Optional +from typing import Any, Tuple, Mapping, Union, Optional from spacepackets.cfdp import TransmissionMode # Com Interface Types -ComIfValueT = Tuple[str, Any] -ComIfDictT = Dict[str, ComIfValueT] +ComIfValue = Tuple[str, Any] +ComIfMapping = Mapping[str, ComIfValue] +ComIfDictT = ComIfMapping @dataclass @@ -30,6 +31,10 @@ def default_json_path() -> str: return "tmtc_conf.json" +def default_toml_path() -> str: + return "tmtc_conf.toml" + + class CoreComInterfaces(str, enum.Enum): DUMMY = "dummy" UDP = "udp" @@ -40,7 +45,7 @@ class CoreComInterfaces(str, enum.Enum): UNSPECIFIED = "unspec" -CORE_COM_IF_DICT: ComIfDictT = { +CORE_COM_IF_DICT: ComIfMapping = { CoreComInterfaces.DUMMY: ("Dummy Interface", None), CoreComInterfaces.UDP: ("TCP/IP with UDP datagrams", None), CoreComInterfaces.TCP: ("TCP/IP with TCP", None), diff --git a/tmtccmd/config/hook.py b/tmtccmd/config/hook.py index f6d9c3ef..60de8f6d 100644 --- a/tmtccmd/config/hook.py +++ b/tmtccmd/config/hook.py @@ -12,7 +12,7 @@ from .com import ComCfgBase, ComInterface from .tmtc import TmtcDefinitionWrapper -from .defs import default_json_path, CORE_COM_IF_DICT, ComIfDictT +from .defs import default_json_path, CORE_COM_IF_DICT, ComIfMapping _LOGGER = logging.getLogger(__name__) @@ -24,33 +24,11 @@ class HookBase(ABC): TMTC commander core. """ - def __init__(self, json_cfg_path: Optional[str] = None): - self.cfg_path = json_cfg_path + def __init__(self, cfg_file_path: Optional[str] = None): + self.cfg_path = cfg_file_path if self.cfg_path is None: self.cfg_path = default_json_path() - @abstractmethod - def get_object_ids(self) -> ObjectIdDictT: - from tmtccmd.config.objects import get_core_object_ids - - """The user can specify an object ID dictionary here mapping object ID bytearrays to a - list. This list could contain containing the string representation or additional - information about that object ID. - """ - return get_core_object_ids() - - @deprecated( - version="8.0.0rc0", - reason="implement and use get_communication_interface instead", - ) - def assign_communication_interface(self, com_if_key: str) -> Optional[ComInterface]: - """Assign the communication interface used by the TMTC commander to send and receive - TMTC with. - - :param com_if_key: String key of the communication interface to be created. - """ - return self.get_communication_interface(com_if_key) - @abstractmethod def get_communication_interface(self, com_if_key: str) -> Optional[ComInterface]: from tmtccmd.config.com import create_com_interface_default @@ -59,28 +37,14 @@ def get_communication_interface(self, com_if_key: str) -> Optional[ComInterface] cfg_base = ComCfgBase(com_if_key=com_if_key, json_cfg_path=self.cfg_path) return create_com_interface_default(cfg_base) - def get_com_if_dict(self) -> ComIfDictT: - return CORE_COM_IF_DICT - - @deprecated( - version="8.0.0rc0", - reason="implement and use get_command_definitions instead", - ) - def get_tmtc_definitions(self) -> TmtcDefinitionWrapper: - """This is a dicitonary mapping services represented by strings to an operation code - dictionary. - - :return: - """ - from tmtccmd.config.globals import get_default_tmtc_defs - - return get_default_tmtc_defs() - @abstractmethod def get_command_definitions(self) -> CmdTreeNode: """This function should return the root node of the command definition tree.""" pass + def get_com_if_dict(self) -> ComIfMapping: + return CORE_COM_IF_DICT + def get_cmd_history(self) -> Optional[History]: """Optionlly return a history class for the past command paths which will be used when prompting a command path from the user in CLI mode.""" @@ -94,3 +58,39 @@ def perform_mode_operation(self, tmtc_backend: BackendBase, mode: int): :return: """ _LOGGER.warning("No custom mode operation implemented") + + @deprecated(version="8.0.0", reason="application specific code") + def get_object_ids(self) -> ObjectIdDictT: + from tmtccmd.config.objects import get_base_component_id_mapping + + """The user can specify an object ID dictionary here mapping object ID bytearrays to a + list. This list could contain containing the string representation or additional + information about that object ID. + """ + return get_base_component_id_mapping() + + @deprecated( + version="8.0.0rc0", + reason="implement and use get_command_definitions instead", + ) + def get_tmtc_definitions(self) -> TmtcDefinitionWrapper: + """This is a dicitonary mapping services represented by strings to an operation code + dictionary. + + :return: + """ + from tmtccmd.config.globals import get_default_tmtc_defs + + return get_default_tmtc_defs() + + @deprecated( + version="8.0.0rc0", + reason="implement and use get_communication_interface instead", + ) + def assign_communication_interface(self, com_if_key: str) -> Optional[ComInterface]: + """Assign the communication interface used by the TMTC commander to send and receive + TMTC with. + + :param com_if_key: String key of the communication interface to be created. + """ + return self.get_communication_interface(com_if_key) diff --git a/tmtccmd/config/objects.py b/tmtccmd/config/objects.py index 8b62d4e5..c09b0871 100644 --- a/tmtccmd/config/objects.py +++ b/tmtccmd/config/objects.py @@ -1,16 +1,20 @@ -from tmtccmd.util.obj_id import ObjectIdDictT, ObjectIdU32 +from tmtccmd.util.obj_id import ComponentIdMapping, ObjectIdU32 INVALID_ID = bytes([0xFF, 0xFF, 0xFF, 0xFF]) -def get_core_object_ids() -> ObjectIdDictT: +def get_base_component_id_mapping() -> ComponentIdMapping: """These are the object IDs for the tmtccmd core. The core will usually take care of inserting these into the object manager during the program initialization. :return Dictionary of the core object IDs """ - invalid_id = ObjectIdU32.from_bytes(obj_id_as_bytes=INVALID_ID) + invalid_id = ObjectIdU32.from_bytes(raw=INVALID_ID) invalid_id.name = "Invalid ID" object_id_dict = {INVALID_ID: invalid_id} return object_id_dict + + +def get_core_object_ids() -> ComponentIdMapping: + return get_base_component_id_mapping() diff --git a/tmtccmd/util/__init__.py b/tmtccmd/util/__init__.py index 5d696d93..fdce8f0e 100644 --- a/tmtccmd/util/__init__.py +++ b/tmtccmd/util/__init__.py @@ -1,2 +1,14 @@ -from .obj_id import ObjectIdU32, ObjectIdU16, ObjectIdU8, ObjectIdBase, ObjectIdDictT +from .obj_id import ( + ComponentIdU32, + ComponentIdU16, + ComponentIdU8, + ComponentIdBase, + ComponentIdMapping, + ObjectIdU32, + ObjectIdU16, + ObjectIdU8, + ObjectIdBase, + ObjectIdMapping, + ObjectIdDictT, +) from .retval import RetvalDictT diff --git a/tmtccmd/util/conf_util.py b/tmtccmd/util/conf_util.py index b942a1cc..33dc01c6 100644 --- a/tmtccmd/util/conf_util.py +++ b/tmtccmd/util/conf_util.py @@ -1,6 +1,6 @@ import collections.abc import logging -from typing import Tuple, Union +from typing import Any, Tuple, Union from contextlib import contextmanager @@ -62,11 +62,11 @@ def check_args_in_dict( def __handle_iterable_non_dict( - param: any, + param: Any, iterable: collections.abc.Iterable, might_be_integer: bool, - init_res_tuple: Tuple[bool, any], -) -> (bool, any): + init_res_tuple: Tuple[bool, Any], +) -> Tuple[bool, Any]: param_list = list() for idx, enum_value in enumerate(iterable): if isinstance(enum_value.value, str): diff --git a/tmtccmd/util/json.py b/tmtccmd/util/json.py index 7ae8494f..b0e8beaa 100644 --- a/tmtccmd/util/json.py +++ b/tmtccmd/util/json.py @@ -2,6 +2,7 @@ import logging import os import enum +from typing import Any _LOGGER = logging.getLogger(__name__) @@ -57,7 +58,7 @@ def check_json_file(json_cfg_path: str) -> bool: def save_to_json_with_prompt( - key: str, value: any, name: str, json_cfg_path: str, json_obj: any + key: str, value: Any, name: str, json_cfg_path: str, json_obj: Any ) -> bool: save_to_json = input( f"Do you want to store the {name} to the configuration file? (y/n): " diff --git a/tmtccmd/util/obj_id.py b/tmtccmd/util/obj_id.py index 657c2218..8946a87a 100644 --- a/tmtccmd/util/obj_id.py +++ b/tmtccmd/util/obj_id.py @@ -5,7 +5,7 @@ from spacepackets.util import UnsignedByteField -class ObjectIdBase(UnsignedByteField): +class ComponentIdBase(UnsignedByteField): """Base class for unsigned object IDs with different byte sizes""" def __init__(self, obj_id: int, byte_len: int, name: Optional[str] = None): @@ -44,11 +44,11 @@ def obj_id(self, obj_id: Union[int, bytes]): self.value = obj_id -class ObjectIdU32(ObjectIdBase): +class ComponentIdU32(ComponentIdBase): """A helper object for a unique object identifier which has a raw unsigned 32-bit representation. - >>> obj_id = ObjectIdU32(42, "Object with the answer to everything") + >>> obj_id = ComponentIdU32(42, "Object with the answer to everything") >>> int(obj_id) 42 >>> obj_id.name @@ -68,13 +68,13 @@ def __repr__(self): ) @classmethod - def from_bytes_typed(cls, obj_id_as_bytes: bytes) -> ObjectIdU32: - obj_id = ObjectIdU32(obj_id=0) + def from_bytes_typed(cls, obj_id_as_bytes: bytes) -> ComponentIdU32: + obj_id = ComponentIdU32(obj_id=0) obj_id.obj_id = obj_id_as_bytes return obj_id -class ObjectIdU16(ObjectIdBase): +class ComponentIdU16(ComponentIdBase): """A helper object for a unique object identifier which has a raw unsigned 16-bit representation. """ @@ -88,13 +88,13 @@ def __repr__(self): ) @classmethod - def from_bytes_typed(cls, obj_id_as_bytes: bytes) -> ObjectIdU16: - obj_id = ObjectIdU16(obj_id=0) + def from_bytes_typed(cls, obj_id_as_bytes: bytes) -> ComponentIdU16: + obj_id = ComponentIdU16(obj_id=0) obj_id.obj_id = obj_id_as_bytes return obj_id -class ObjectIdU8(ObjectIdBase): +class ComponentIdU8(ComponentIdBase): """A helper object for a unique object identifier which has a raw unsigned 8-bit representation. """ @@ -108,10 +108,29 @@ def __repr__(self): ) @classmethod - def from_bytes_typed(cls, obj_id_as_bytes: bytes) -> ObjectIdU8: - obj_id = ObjectIdU8(obj_id=0) + def from_bytes_typed(cls, obj_id_as_bytes: bytes) -> ComponentIdU8: + obj_id = ComponentIdU8(obj_id=0) obj_id.obj_id = obj_id_as_bytes return obj_id -ObjectIdDictT = Mapping[bytes, ObjectIdBase] +ComponentIdMapping = Mapping[bytes, ComponentIdBase] + + +ObjectIdBase = ComponentIdBase +"""Deprecated type defintion for :py:class:`ComponentIdBase`""" + +ObjectIdU32 = ComponentIdU32 +"""Deprecated type defintion for :py:class:`ComponentIdU32`""" + +ObjectIdU16 = ComponentIdU16 +"""Deprecated type defintion for :py:class:`ComponentIdU16`""" + +ObjectIdU8 = ComponentIdU8 +"""Deprecated type defintion for :py:class:`ComponentIdU8`""" + +ObjectIdMapping = ComponentIdMapping +"""Deprecated type defintion for :py:class:`ComponentIdMapping`""" + +ObjectIdDictT = ObjectIdMapping +"""Deprecated type defintion for :py:class:`ObjectIdMapping`""" diff --git a/tmtccmd/util/retval.py b/tmtccmd/util/retval.py index 37a39afc..1573a106 100644 --- a/tmtccmd/util/retval.py +++ b/tmtccmd/util/retval.py @@ -1,4 +1,4 @@ -from typing import Dict +from typing import Mapping class RetvalInfo: @@ -20,4 +20,7 @@ def unique_id(self) -> int: return self.id & 0xFF -RetvalDictT = Dict[int, RetvalInfo] +RetvalMapping = Mapping[int, RetvalInfo] + +RetvalDictT = RetvalMapping +"""Deprecatd typedef for :py:class:`RetvalMapping`."""