Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TCP client re-work #149

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/app/tmtcc.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ def main(): # noqa: C901
while True:
state = tmtc_backend.periodic_op(None)
if state.request == BackendRequest.TERMINATION_NO_ERROR:
tmtc_backend.close_com_if()
sys.exit(0)
elif state.request == BackendRequest.DELAY_IDLE:
_LOGGER.info("TMTC Client in IDLE mode")
Expand Down
2 changes: 1 addition & 1 deletion tests/com/test_tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def setUp(self) -> None:
"tcp",
space_packet_ids=[self.expected_packet_id],
target_address=EthAddr.from_tuple(self.addr),
tm_polling_freqency=0.05,
inner_thread_delay=0.05,
)
self.conn_socket: Optional[socket.socket] = None
self.server_received_packets = deque()
Expand Down
1 change: 1 addition & 0 deletions tmtccmd/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Contains core methods called by entry point files to setup and start a tmtccmd application.
It also re-exports commonly used classes and functions.
"""

import logging
import sys
import os
Expand Down
1 change: 1 addition & 0 deletions tmtccmd/cfdp/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Please note that this module does not contain configuration helpers, for example
to convert CLI or GUI parameters into the internalized CFDP classes. You can find all those
helpers inside the :py:mod:`tmtccmd.config.cfdp` module."""

from .request import CfdpRequestWrapper
from spacepackets.cfdp import TransactionId
1 change: 1 addition & 0 deletions tmtccmd/com/dummy.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Dummy Virtual Communication Interface. Currently serves to use the TMTC program without needing
external hardware or an extra socket
"""

from typing import Optional

from deprecated.sphinx import deprecated
Expand Down
192 changes: 98 additions & 94 deletions tmtccmd/com/tcp.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""TCP communication interface"""

import logging
import queue
import socket
Expand Down Expand Up @@ -36,7 +37,7 @@ def __init__(
self,
com_if_id: str,
space_packet_ids: Sequence[PacketId],
tm_polling_freqency: float,
inner_thread_delay: float,
target_address: EthAddr,
max_packets_stored: Optional[int] = None,
):
Expand All @@ -45,24 +46,23 @@ def __init__(
:param com_if_id:
:param space_packet_ids: Valid packet IDs for CCSDS space packets. Those will be used
to parse for space packets inside the TCP stream.
:param tm_polling_freqency: Polling frequency in seconds
:param inner_thread_delay: Polling frequency of TCP thread in seconds.
"""
self.com_if_id = com_if_id
self.com_type = TcpCommunicationType.SPACE_PACKETS
self.space_packet_ids = space_packet_ids
self.tm_polling_frequency = tm_polling_freqency
self.__inner_thread_delay = inner_thread_delay
self.target_address = target_address
self.max_packets_stored = max_packets_stored
self.connected = False

self.__tcp_socket: Optional[socket.socket] = None
self.__conn_lock = threading.Lock()
self.__connected = False
self.__tcp_socket = None
self.__last_connection_time = 0
self.__tm_thread_kill_signal = threading.Event()
self.__thread_kill_signal = threading.Event()
# Separate thread to request TM packets periodically if no TCs are being sent
self.__tcp_conn_thread: Optional[threading.Thread] = threading.Thread(
target=self.__tcp_tm_client, daemon=True
)
self.__tcp_thread = threading.Thread(target=self.__tcp_task, daemon=True)
self.__tm_queue = queue.Queue()
self.__tc_queue = queue.Queue()
self.__analysis_queue = deque()
self.tm_packet_list = []

Expand All @@ -80,71 +80,51 @@ def initialize(self, args: any = None) -> any:
pass

def open(self, args: any = None):
self.__tm_thread_kill_signal.clear()
if self.is_open():
return
self.__thread_kill_signal.clear()
try:
self.set_up_socket()
self.__init_socket()
self.__connect_socket()
except IOError as e:
_LOGGER.exception("Issues setting up the TCP socket")
raise e
self.set_up_tcp_thread()
self.__tcp_conn_thread.start()
self.__tcp_thread = threading.Thread(target=self.__tcp_task)
self.__tcp_thread.start()
self.__connected = True

def is_open(self) -> bool:
return self.connected
with self.__conn_lock:
return self.__connected

def set_up_socket(self):
def __init_socket(self):
if self.__tcp_socket is None:
self.__tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
self.__tcp_socket.settimeout(2.0)
self.__tcp_socket.connect(self.target_address.to_tuple)
self.connected = True
except socket.timeout as e:
_LOGGER.warning(
"Could not connect to socket with address"
f" {self.target_address}: {e}"
)
finally:
self.__tcp_socket.settimeout(None)
self.__tcp_socket.settimeout(2.0)

def set_up_tcp_thread(self):
if self.__tcp_conn_thread is None:
self.__tcp_conn_thread = threading.Thread(
target=self.__tcp_tm_client, daemon=True
def __connect_socket(self):
try:
self.__tcp_socket.connect(self.target_address.to_tuple)
except socket.timeout as e:
_LOGGER.warning(
"Could not connect to socket with address"
f" {self.target_address}: {e}"
)
finally:
self.__tcp_socket.settimeout(None)

def close(self, args: any = None) -> None:
self.__tm_thread_kill_signal.set()
socket_was_closed = False
if self.__tcp_conn_thread is not None:
if self.__tcp_conn_thread.is_alive():
self.__tcp_conn_thread.join(self.tm_polling_frequency)
if self.connected:
try:
self.__tcp_socket.shutdown(socket.SHUT_RDWR)
except OSError:
_LOGGER.warning(
"TCP socket endpoint was already closed or not connected"
)
self.__tcp_socket.close()
socket_was_closed = True
if self.__tcp_socket is not None and not socket_was_closed:
self.__tcp_socket.close()
if not self.is_open():
return
self.__thread_kill_signal.set()
if self.__tcp_thread is not None:
self.__tcp_thread.join(self.__inner_thread_delay)
with self.__conn_lock:
self.__connected = False
self.__tcp_socket = None
self.__tcp_conn_thread = None

def send(self, data: bytes):
try:
if not self.connected:
self.set_up_socket()
self.__tcp_socket.sendto(data, self.target_address.to_tuple)
except BrokenPipeError as e:
raise SendError(f"{e}", e)
except ConnectionRefusedError or OSError as e:
self.connected = False
self.__tcp_socket.close()
self.__tcp_socket = None
raise SendError(f"TCP connection attempt failed with exception: {e}", e)
self.__tc_queue.put(data)

def receive(self, poll_timeout: float = 0) -> TelemetryListT:
self.__tm_queue_to_packet_list()
Expand All @@ -168,50 +148,74 @@ def __tm_queue_to_packet_list(self):
while self.__analysis_queue:
self.tm_packet_list.append(self.__analysis_queue.popleft())

def __tcp_tm_client(self):
while True and not self.__tm_thread_kill_signal.is_set():
if self.connected:
try:
self.__receive_tm_packets()
except ConnectionRefusedError:
_LOGGER.warning("TCP connection attempt failed..")
time.sleep(self.tm_polling_frequency)
def __tcp_task(self):
while True and not self.__thread_kill_signal.is_set():
try:
self.__tmtc_event_loop()
except ConnectionRefusedError:
_LOGGER.warning("TCP connection attempt failed..")
time.sleep(self.__inner_thread_delay)

def __receive_tm_packets(self):
def __tmtc_event_loop(self):
try:
while True:
ready = select.select([self.__tcp_socket], [], [], 0)
if not ready[0]:
outputs = []
queue_size = self.__tc_queue.qsize()
if queue_size > 0:
outputs.append(self.__tcp_socket)
(readable, writable, _) = select.select(
[self.__tcp_socket], outputs, [], self.__inner_thread_delay
)
if self.__thread_kill_signal.is_set():
self.__tcp_socket.close()
break
bytes_recvd = self.__tcp_socket.recv(4096)
if bytes_recvd == b"":
self.__close_tcp_socket()
_LOGGER.info("TCP server has been closed")
return
else:
self.connected = True
if (
self.max_packets_stored is not None
and self.__tm_queue.qsize() >= self.max_packets_stored
):
_LOGGER.warning(
"Number of packets in TCP queue too large. "
"Overwriting old packets.."
)
self.__tm_queue.get()
# TODO: If segments are received but the receiver is unable to parse packets
# properly, it might make sense to have a timeout which then also
# logs that there might be an issue reading packets
self.__tm_queue.put(bytes(bytes_recvd))
if queue_size > 0 and writable and writable[0]:
self.__tc_handling(queue_size)
if readable and readable[0]:
self.__tm_handling()
except KeyboardInterrupt:
_LOGGER.info("Keyboard interrupt, shutting down TCP task")
self.__force_shutdown()
except ConnectionResetError:
self.__close_tcp_socket()
self.__force_shutdown()
_LOGGER.exception("ConnectionResetError. TCP server might not be up")

def __tc_handling(self, queue_size: int):
try:
self.__tcp_socket.sendto(
self.__tc_queue.get(), self.target_address.to_tuple
)
queue_size -= 1
except BrokenPipeError as e:
raise SendError(f"{e}", e)
except ConnectionRefusedError or OSError as e:
self.__force_shutdown()
raise SendError(f"TCP connection attempt failed with exception: {e}", e)

def __tm_handling(self):
bytes_recvd = self.__tcp_socket.recv(4096)
if bytes_recvd == b"":
self.__force_shutdown()
_LOGGER.info("TCP server has been closed")
return
if (
self.max_packets_stored is not None
and self.__tm_queue.qsize() >= self.max_packets_stored
):
_LOGGER.warning(
"Number of packets in TCP queue too large. " "Overwriting old packets.."
)
self.__tm_queue.get()
# TODO: If segments are received but the receiver is unable to parse packets
# properly, it might make sense to have a timeout which then also
# logs that there might be an issue reading packets
self.__tm_queue.put(bytes(bytes_recvd))

def data_available(self, timeout: float = 0, parameters: any = 0) -> int:
self.__tm_queue_to_packet_list()
return len(self.tm_packet_list)

def __close_tcp_socket(self):
self.connected = False
def __force_shutdown(self):
self.__tcp_socket.close()
self.__tcp_socket = None
with self.__conn_lock:
self.__connected = False
1 change: 1 addition & 0 deletions tmtccmd/com/udp.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""UDP Communication Interface"""

import logging
import select
import socket
Expand Down
1 change: 1 addition & 0 deletions tmtccmd/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* :py:mod:`tmtccmd.config.args` - Various helper methods and classes to create the argument parsers
and arguments converts to create the data structures expected by this library from passed CLI
arguments."""

import logging
from pathlib import Path
from typing import Optional
Expand Down
1 change: 1 addition & 0 deletions tmtccmd/config/args.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Argument parser module."""

from __future__ import annotations

import argparse
Expand Down
2 changes: 1 addition & 1 deletion tmtccmd/config/com.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ def create_default_tcpip_interface(tcpip_cfg: TcpipCfg) -> Optional[ComInterface
communication_interface = TcpSpacePacketsComIF(
com_if_id=tcpip_cfg.com_if_key,
space_packet_ids=tcpip_cfg.space_packet_ids,
tm_polling_freqency=0.5,
inner_thread_delay=0.5,
target_address=tcpip_cfg.send_addr,
)
return communication_interface
Expand Down
1 change: 1 addition & 0 deletions tmtccmd/fsfw/tmtc_printer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Contains classes and functions that perform all printing functionalities.
"""

import logging
import enum
from typing import List, Optional
Expand Down
1 change: 1 addition & 0 deletions tmtccmd/gui/frontend.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""PyQt front end components for the tmtccmd framework.
@author R. Mueller, P. Scheurenbrand, D. Nguyen
"""

import os
import sys
import webbrowser
Expand Down
1 change: 1 addition & 0 deletions tmtccmd/logging/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
@brief This module is used to set up the global loggers
"""

from tmtccmd.version import get_version
import logging
import os
Expand Down
1 change: 1 addition & 0 deletions tmtccmd/pus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

* :py:class:`tmtccmd.pus.VerificationWrapper` helper class
"""

from enum import IntEnum
from typing import Optional

Expand Down
1 change: 1 addition & 0 deletions tmtccmd/pus/s5_fsfw_event.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""`FSFW <https://egit.irs.uni-stuttgart.de/fsfw/fsfw>`_ specific PUS event support"""

from .s5_fsfw_event_defs import * # noqa re-export
from .tm.s5_fsfw_event import * # noqa re-export
from .tc.s5_event import * # noqa re-export
1 change: 1 addition & 0 deletions tmtccmd/pus/s5_satrs_event.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""`sat-rs <https://egit.irs.uni-stuttgart.de/rust/sat-rs.git>`_ specific PUS event support."""

from .s5_satrs_event_defs import * # noqa re-export
from .tc.s5_event import * # noqa re-export
1 change: 1 addition & 0 deletions tmtccmd/pus/s8_fsfw_action.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""`FSFW <https://egit.irs.uni-stuttgart.de/fsfw/fsfw>`_ specific PUS actions support"""

from .s8_fsfw_action_defs import * # noqa re-export
from .tc.s8_fsfw_action import * # noqa re-export
from .tm.s8_fsfw_action import * # noqa re-export
1 change: 1 addition & 0 deletions tmtccmd/pus/tc/s200_fsfw_mode.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Core components for mode commanding (custom PUS service)."""

import enum
import struct
from typing import Union
Expand Down
1 change: 1 addition & 0 deletions tmtccmd/pus/tc/s20_fsfw_param.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Contains definitions and functions related to PUS Service 20 Telecommands.
"""

from __future__ import annotations

import struct
Expand Down
1 change: 1 addition & 0 deletions tmtccmd/pus/tc/s3_fsfw_hk.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Contains definitions and functions related to PUS Service 3 Telecommands.
"""

import struct
from typing import Tuple

Expand Down
Loading
Loading