diff --git a/examples/app/tmtcc.py b/examples/app/tmtcc.py index 57fb9dc7..0e04972c 100755 --- a/examples/app/tmtcc.py +++ b/examples/app/tmtcc.py @@ -312,6 +312,7 @@ def main(): # noqa: C901 while True: state = tmtc_backend.periodic_op(None) if state.request == BackendRequest.TERMINATION_NO_ERROR: + tmtc_backend.close_com_if() sys.exit(0) elif state.request == BackendRequest.DELAY_IDLE: _LOGGER.info("TMTC Client in IDLE mode") diff --git a/tests/com/test_tcp.py b/tests/com/test_tcp.py index 225bdfc2..10d37ac9 100644 --- a/tests/com/test_tcp.py +++ b/tests/com/test_tcp.py @@ -35,7 +35,7 @@ def setUp(self) -> None: "tcp", space_packet_ids=[self.expected_packet_id], target_address=EthAddr.from_tuple(self.addr), - tm_polling_freqency=0.05, + inner_thread_delay=0.05, ) self.conn_socket: Optional[socket.socket] = None self.server_received_packets = deque() diff --git a/tmtccmd/__init__.py b/tmtccmd/__init__.py index 5c9bc9d1..6e006ac5 100644 --- a/tmtccmd/__init__.py +++ b/tmtccmd/__init__.py @@ -1,6 +1,7 @@ """Contains core methods called by entry point files to setup and start a tmtccmd application. It also re-exports commonly used classes and functions. """ + import logging import sys import os diff --git a/tmtccmd/cfdp/__init__.py b/tmtccmd/cfdp/__init__.py index 3e7be295..c462af9a 100644 --- a/tmtccmd/cfdp/__init__.py +++ b/tmtccmd/cfdp/__init__.py @@ -1,5 +1,6 @@ """Please note that this module does not contain configuration helpers, for example to convert CLI or GUI parameters into the internalized CFDP classes. You can find all those helpers inside the :py:mod:`tmtccmd.config.cfdp` module.""" + from .request import CfdpRequestWrapper from spacepackets.cfdp import TransactionId diff --git a/tmtccmd/com/dummy.py b/tmtccmd/com/dummy.py index 3c139b76..35593d6e 100644 --- a/tmtccmd/com/dummy.py +++ b/tmtccmd/com/dummy.py @@ -1,6 +1,7 @@ """Dummy Virtual Communication Interface. Currently serves to use the TMTC program without needing external hardware or an extra socket """ + from typing import Optional from deprecated.sphinx import deprecated diff --git a/tmtccmd/com/tcp.py b/tmtccmd/com/tcp.py index 6b8bbf4d..48a31998 100644 --- a/tmtccmd/com/tcp.py +++ b/tmtccmd/com/tcp.py @@ -1,4 +1,5 @@ """TCP communication interface""" + import logging import queue import socket @@ -36,7 +37,7 @@ def __init__( self, com_if_id: str, space_packet_ids: Sequence[PacketId], - tm_polling_freqency: float, + inner_thread_delay: float, target_address: EthAddr, max_packets_stored: Optional[int] = None, ): @@ -45,24 +46,23 @@ def __init__( :param com_if_id: :param space_packet_ids: Valid packet IDs for CCSDS space packets. Those will be used to parse for space packets inside the TCP stream. - :param tm_polling_freqency: Polling frequency in seconds + :param inner_thread_delay: Polling frequency of TCP thread in seconds. """ self.com_if_id = com_if_id self.com_type = TcpCommunicationType.SPACE_PACKETS self.space_packet_ids = space_packet_ids - self.tm_polling_frequency = tm_polling_freqency + self.__inner_thread_delay = inner_thread_delay self.target_address = target_address self.max_packets_stored = max_packets_stored - self.connected = False - - self.__tcp_socket: Optional[socket.socket] = None + self.__conn_lock = threading.Lock() + self.__connected = False + self.__tcp_socket = None self.__last_connection_time = 0 - self.__tm_thread_kill_signal = threading.Event() + self.__thread_kill_signal = threading.Event() # Separate thread to request TM packets periodically if no TCs are being sent - self.__tcp_conn_thread: Optional[threading.Thread] = threading.Thread( - target=self.__tcp_tm_client, daemon=True - ) + self.__tcp_thread = threading.Thread(target=self.__tcp_task, daemon=True) self.__tm_queue = queue.Queue() + self.__tc_queue = queue.Queue() self.__analysis_queue = deque() self.tm_packet_list = [] @@ -80,71 +80,51 @@ def initialize(self, args: any = None) -> any: pass def open(self, args: any = None): - self.__tm_thread_kill_signal.clear() + if self.is_open(): + return + self.__thread_kill_signal.clear() try: - self.set_up_socket() + self.__init_socket() + self.__connect_socket() except IOError as e: _LOGGER.exception("Issues setting up the TCP socket") raise e - self.set_up_tcp_thread() - self.__tcp_conn_thread.start() + self.__tcp_thread = threading.Thread(target=self.__tcp_task) + self.__tcp_thread.start() + self.__connected = True def is_open(self) -> bool: - return self.connected + with self.__conn_lock: + return self.__connected - def set_up_socket(self): + def __init_socket(self): if self.__tcp_socket is None: self.__tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - try: - self.__tcp_socket.settimeout(2.0) - self.__tcp_socket.connect(self.target_address.to_tuple) - self.connected = True - except socket.timeout as e: - _LOGGER.warning( - "Could not connect to socket with address" - f" {self.target_address}: {e}" - ) - finally: - self.__tcp_socket.settimeout(None) + self.__tcp_socket.settimeout(2.0) - def set_up_tcp_thread(self): - if self.__tcp_conn_thread is None: - self.__tcp_conn_thread = threading.Thread( - target=self.__tcp_tm_client, daemon=True + def __connect_socket(self): + try: + self.__tcp_socket.connect(self.target_address.to_tuple) + except socket.timeout as e: + _LOGGER.warning( + "Could not connect to socket with address" + f" {self.target_address}: {e}" ) + finally: + self.__tcp_socket.settimeout(None) def close(self, args: any = None) -> None: - self.__tm_thread_kill_signal.set() - socket_was_closed = False - if self.__tcp_conn_thread is not None: - if self.__tcp_conn_thread.is_alive(): - self.__tcp_conn_thread.join(self.tm_polling_frequency) - if self.connected: - try: - self.__tcp_socket.shutdown(socket.SHUT_RDWR) - except OSError: - _LOGGER.warning( - "TCP socket endpoint was already closed or not connected" - ) - self.__tcp_socket.close() - socket_was_closed = True - if self.__tcp_socket is not None and not socket_was_closed: - self.__tcp_socket.close() + if not self.is_open(): + return + self.__thread_kill_signal.set() + if self.__tcp_thread is not None: + self.__tcp_thread.join(self.__inner_thread_delay) + with self.__conn_lock: + self.__connected = False self.__tcp_socket = None - self.__tcp_conn_thread = None def send(self, data: bytes): - try: - if not self.connected: - self.set_up_socket() - self.__tcp_socket.sendto(data, self.target_address.to_tuple) - except BrokenPipeError as e: - raise SendError(f"{e}", e) - except ConnectionRefusedError or OSError as e: - self.connected = False - self.__tcp_socket.close() - self.__tcp_socket = None - raise SendError(f"TCP connection attempt failed with exception: {e}", e) + self.__tc_queue.put(data) def receive(self, poll_timeout: float = 0) -> TelemetryListT: self.__tm_queue_to_packet_list() @@ -168,50 +148,74 @@ def __tm_queue_to_packet_list(self): while self.__analysis_queue: self.tm_packet_list.append(self.__analysis_queue.popleft()) - def __tcp_tm_client(self): - while True and not self.__tm_thread_kill_signal.is_set(): - if self.connected: - try: - self.__receive_tm_packets() - except ConnectionRefusedError: - _LOGGER.warning("TCP connection attempt failed..") - time.sleep(self.tm_polling_frequency) + def __tcp_task(self): + while True and not self.__thread_kill_signal.is_set(): + try: + self.__tmtc_event_loop() + except ConnectionRefusedError: + _LOGGER.warning("TCP connection attempt failed..") + time.sleep(self.__inner_thread_delay) - def __receive_tm_packets(self): + def __tmtc_event_loop(self): try: while True: - ready = select.select([self.__tcp_socket], [], [], 0) - if not ready[0]: + outputs = [] + queue_size = self.__tc_queue.qsize() + if queue_size > 0: + outputs.append(self.__tcp_socket) + (readable, writable, _) = select.select( + [self.__tcp_socket], outputs, [], self.__inner_thread_delay + ) + if self.__thread_kill_signal.is_set(): + self.__tcp_socket.close() break - bytes_recvd = self.__tcp_socket.recv(4096) - if bytes_recvd == b"": - self.__close_tcp_socket() - _LOGGER.info("TCP server has been closed") - return - else: - self.connected = True - if ( - self.max_packets_stored is not None - and self.__tm_queue.qsize() >= self.max_packets_stored - ): - _LOGGER.warning( - "Number of packets in TCP queue too large. " - "Overwriting old packets.." - ) - self.__tm_queue.get() - # TODO: If segments are received but the receiver is unable to parse packets - # properly, it might make sense to have a timeout which then also - # logs that there might be an issue reading packets - self.__tm_queue.put(bytes(bytes_recvd)) + if queue_size > 0 and writable and writable[0]: + self.__tc_handling(queue_size) + if readable and readable[0]: + self.__tm_handling() + except KeyboardInterrupt: + _LOGGER.info("Keyboard interrupt, shutting down TCP task") + self.__force_shutdown() except ConnectionResetError: - self.__close_tcp_socket() + self.__force_shutdown() _LOGGER.exception("ConnectionResetError. TCP server might not be up") + def __tc_handling(self, queue_size: int): + try: + self.__tcp_socket.sendto( + self.__tc_queue.get(), self.target_address.to_tuple + ) + queue_size -= 1 + except BrokenPipeError as e: + raise SendError(f"{e}", e) + except ConnectionRefusedError or OSError as e: + self.__force_shutdown() + raise SendError(f"TCP connection attempt failed with exception: {e}", e) + + def __tm_handling(self): + bytes_recvd = self.__tcp_socket.recv(4096) + if bytes_recvd == b"": + self.__force_shutdown() + _LOGGER.info("TCP server has been closed") + return + if ( + self.max_packets_stored is not None + and self.__tm_queue.qsize() >= self.max_packets_stored + ): + _LOGGER.warning( + "Number of packets in TCP queue too large. " "Overwriting old packets.." + ) + self.__tm_queue.get() + # TODO: If segments are received but the receiver is unable to parse packets + # properly, it might make sense to have a timeout which then also + # logs that there might be an issue reading packets + self.__tm_queue.put(bytes(bytes_recvd)) + def data_available(self, timeout: float = 0, parameters: any = 0) -> int: self.__tm_queue_to_packet_list() return len(self.tm_packet_list) - def __close_tcp_socket(self): - self.connected = False + def __force_shutdown(self): self.__tcp_socket.close() - self.__tcp_socket = None + with self.__conn_lock: + self.__connected = False diff --git a/tmtccmd/com/udp.py b/tmtccmd/com/udp.py index 2edded3e..b1a232ad 100644 --- a/tmtccmd/com/udp.py +++ b/tmtccmd/com/udp.py @@ -1,4 +1,5 @@ """UDP Communication Interface""" + import logging import select import socket diff --git a/tmtccmd/config/__init__.py b/tmtccmd/config/__init__.py index 9bebab2e..cd1b8184 100644 --- a/tmtccmd/config/__init__.py +++ b/tmtccmd/config/__init__.py @@ -7,6 +7,7 @@ * :py:mod:`tmtccmd.config.args` - Various helper methods and classes to create the argument parsers and arguments converts to create the data structures expected by this library from passed CLI arguments.""" + import logging from pathlib import Path from typing import Optional diff --git a/tmtccmd/config/args.py b/tmtccmd/config/args.py index 7ed4a33c..e9580b93 100644 --- a/tmtccmd/config/args.py +++ b/tmtccmd/config/args.py @@ -1,4 +1,5 @@ """Argument parser module.""" + from __future__ import annotations import argparse diff --git a/tmtccmd/config/com.py b/tmtccmd/config/com.py index 9073e9ea..4b54cb0b 100644 --- a/tmtccmd/config/com.py +++ b/tmtccmd/config/com.py @@ -222,7 +222,7 @@ def create_default_tcpip_interface(tcpip_cfg: TcpipCfg) -> Optional[ComInterface communication_interface = TcpSpacePacketsComIF( com_if_id=tcpip_cfg.com_if_key, space_packet_ids=tcpip_cfg.space_packet_ids, - tm_polling_freqency=0.5, + inner_thread_delay=0.5, target_address=tcpip_cfg.send_addr, ) return communication_interface diff --git a/tmtccmd/fsfw/tmtc_printer.py b/tmtccmd/fsfw/tmtc_printer.py index 53ee078a..1c0320b3 100644 --- a/tmtccmd/fsfw/tmtc_printer.py +++ b/tmtccmd/fsfw/tmtc_printer.py @@ -1,5 +1,6 @@ """Contains classes and functions that perform all printing functionalities. """ + import logging import enum from typing import List, Optional diff --git a/tmtccmd/gui/frontend.py b/tmtccmd/gui/frontend.py index dd418de4..70e6600b 100644 --- a/tmtccmd/gui/frontend.py +++ b/tmtccmd/gui/frontend.py @@ -1,6 +1,7 @@ """PyQt front end components for the tmtccmd framework. @author R. Mueller, P. Scheurenbrand, D. Nguyen """ + import os import sys import webbrowser diff --git a/tmtccmd/logging/__init__.py b/tmtccmd/logging/__init__.py index 7bbdd4ef..65c1bf40 100644 --- a/tmtccmd/logging/__init__.py +++ b/tmtccmd/logging/__init__.py @@ -1,6 +1,7 @@ """ @brief This module is used to set up the global loggers """ + from tmtccmd.version import get_version import logging import os diff --git a/tmtccmd/pus/__init__.py b/tmtccmd/pus/__init__.py index cd022e36..89e15f47 100644 --- a/tmtccmd/pus/__init__.py +++ b/tmtccmd/pus/__init__.py @@ -5,6 +5,7 @@ * :py:class:`tmtccmd.pus.VerificationWrapper` helper class """ + from enum import IntEnum from typing import Optional diff --git a/tmtccmd/pus/s5_fsfw_event.py b/tmtccmd/pus/s5_fsfw_event.py index 4ad2dd3f..b2803542 100644 --- a/tmtccmd/pus/s5_fsfw_event.py +++ b/tmtccmd/pus/s5_fsfw_event.py @@ -1,4 +1,5 @@ """`FSFW `_ specific PUS event support""" + from .s5_fsfw_event_defs import * # noqa re-export from .tm.s5_fsfw_event import * # noqa re-export from .tc.s5_event import * # noqa re-export diff --git a/tmtccmd/pus/s5_satrs_event.py b/tmtccmd/pus/s5_satrs_event.py index cef627c3..9f041811 100644 --- a/tmtccmd/pus/s5_satrs_event.py +++ b/tmtccmd/pus/s5_satrs_event.py @@ -1,3 +1,4 @@ """`sat-rs `_ specific PUS event support.""" + from .s5_satrs_event_defs import * # noqa re-export from .tc.s5_event import * # noqa re-export diff --git a/tmtccmd/pus/s8_fsfw_action.py b/tmtccmd/pus/s8_fsfw_action.py index 0c1d0936..b9a69c84 100644 --- a/tmtccmd/pus/s8_fsfw_action.py +++ b/tmtccmd/pus/s8_fsfw_action.py @@ -1,4 +1,5 @@ """`FSFW `_ specific PUS actions support""" + from .s8_fsfw_action_defs import * # noqa re-export from .tc.s8_fsfw_action import * # noqa re-export from .tm.s8_fsfw_action import * # noqa re-export diff --git a/tmtccmd/pus/tc/s200_fsfw_mode.py b/tmtccmd/pus/tc/s200_fsfw_mode.py index dd160a4a..45f1691a 100644 --- a/tmtccmd/pus/tc/s200_fsfw_mode.py +++ b/tmtccmd/pus/tc/s200_fsfw_mode.py @@ -1,4 +1,5 @@ """Core components for mode commanding (custom PUS service).""" + import enum import struct from typing import Union diff --git a/tmtccmd/pus/tc/s20_fsfw_param.py b/tmtccmd/pus/tc/s20_fsfw_param.py index 54193c38..9e249e4a 100644 --- a/tmtccmd/pus/tc/s20_fsfw_param.py +++ b/tmtccmd/pus/tc/s20_fsfw_param.py @@ -1,5 +1,6 @@ """Contains definitions and functions related to PUS Service 20 Telecommands. """ + from __future__ import annotations import struct diff --git a/tmtccmd/pus/tc/s3_fsfw_hk.py b/tmtccmd/pus/tc/s3_fsfw_hk.py index 69d9ffd2..abfea662 100644 --- a/tmtccmd/pus/tc/s3_fsfw_hk.py +++ b/tmtccmd/pus/tc/s3_fsfw_hk.py @@ -1,5 +1,6 @@ """Contains definitions and functions related to PUS Service 3 Telecommands. """ + import struct from typing import Tuple diff --git a/tmtccmd/pus/tm/s200_fsfw_mode.py b/tmtccmd/pus/tm/s200_fsfw_mode.py index f724f79c..931c4d6f 100644 --- a/tmtccmd/pus/tm/s200_fsfw_mode.py +++ b/tmtccmd/pus/tm/s200_fsfw_mode.py @@ -1,5 +1,6 @@ """Base class for Service 200 mode commanding reply handling. """ + from __future__ import annotations import struct diff --git a/tmtccmd/pus/tm/s2_rawcmd.py b/tmtccmd/pus/tm/s2_rawcmd.py index 8aaa6b25..6993a22e 100644 --- a/tmtccmd/pus/tm/s2_rawcmd.py +++ b/tmtccmd/pus/tm/s2_rawcmd.py @@ -1,5 +1,6 @@ """Base class for implementation of PUS Service 2 handling. """ + from __future__ import annotations from typing import Optional diff --git a/tmtccmd/pus/tm/s8_fsfw_action.py b/tmtccmd/pus/tm/s8_fsfw_action.py index dbcc7db7..b0a98d48 100644 --- a/tmtccmd/pus/tm/s8_fsfw_action.py +++ b/tmtccmd/pus/tm/s8_fsfw_action.py @@ -1,5 +1,6 @@ """Contains classes and functions to handle PUS Service 8 telemetry. """ + from __future__ import annotations import struct from typing import Optional diff --git a/tmtccmd/tmtc/ccsds_seq_sender.py b/tmtccmd/tmtc/ccsds_seq_sender.py index 5b3e289d..404d00b0 100644 --- a/tmtccmd/tmtc/ccsds_seq_sender.py +++ b/tmtccmd/tmtc/ccsds_seq_sender.py @@ -1,4 +1,5 @@ """Used to send multiple TCs in sequence""" + import enum import logging from datetime import timedelta diff --git a/tmtccmd/tmtc/ccsds_tm_listener.py b/tmtccmd/tmtc/ccsds_tm_listener.py index abbf6b8e..aed5b507 100644 --- a/tmtccmd/tmtc/ccsds_tm_listener.py +++ b/tmtccmd/tmtc/ccsds_tm_listener.py @@ -1,4 +1,5 @@ """Contains the TmListener which can be used to listen to Telemetry in the background""" + from typing import Dict, List, Tuple from spacepackets.ccsds.spacepacket import get_apid_from_raw_space_packet diff --git a/tmtccmd/util/hammingcode.py b/tmtccmd/util/hammingcode.py index f7f4c8ab..8787b076 100644 --- a/tmtccmd/util/hammingcode.py +++ b/tmtccmd/util/hammingcode.py @@ -3,6 +3,7 @@ Documentation: https://en.wikipedia.org/wiki/Hamming_code They can be used to identify up to two bit errors and correct one bit error per 256 byte block. """ + import logging from enum import Enum