-
Notifications
You must be signed in to change notification settings - Fork 197
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add mypy support and fixup project to give no errors #512
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,4 @@ | ||
from __future__ import annotations | ||
import struct | ||
import logging | ||
import threading | ||
|
@@ -52,7 +53,7 @@ def reset(self): | |
|
||
def wait( | ||
self, emcy_code: Optional[int] = None, timeout: float = 10 | ||
) -> "EmcyError": | ||
) -> Optional[EmcyError]: | ||
"""Wait for a new EMCY to arrive. | ||
|
||
:param emcy_code: EMCY code to wait for | ||
|
@@ -86,10 +87,14 @@ def __init__(self, cob_id: int): | |
self.cob_id = cob_id | ||
|
||
def send(self, code: int, register: int = 0, data: bytes = b""): | ||
if self.network is None: | ||
raise RuntimeError("A Network is required") | ||
Comment on lines
+90
to
+91
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Superseded by #525? |
||
payload = EMCY_STRUCT.pack(code, register, data) | ||
self.network.send_message(self.cob_id, payload) | ||
|
||
def reset(self, register: int = 0, data: bytes = b""): | ||
if self.network is None: | ||
raise RuntimeError("A Network is required") | ||
payload = EMCY_STRUCT.pack(0, register, data) | ||
self.network.send_message(self.cob_id, payload) | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,18 +3,21 @@ | |
from collections.abc import MutableMapping | ||
import logging | ||
import threading | ||
from typing import Callable, Dict, Iterator, List, Optional, Union | ||
from typing import Callable, Dict, Iterator, List, Optional, Union, TYPE_CHECKING, TextIO | ||
|
||
try: | ||
import can | ||
from can import Listener | ||
from can import CanError | ||
except ImportError: | ||
# Do not fail if python-can is not installed | ||
can = None | ||
CanError = Exception | ||
class Listener: | ||
""" Dummy listener """ | ||
# Type checkers don't like this conditional logic, so it is only run when | ||
# not type checking | ||
if not TYPE_CHECKING: | ||
# Do not fail if python-can is not installed | ||
can = None | ||
CanError = Exception | ||
class Listener: | ||
""" Dummy listener """ | ||
Comment on lines
+13
to
+20
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed in master, I believe? |
||
|
||
from canopen.node import RemoteNode, LocalNode | ||
from canopen.sync import SyncProducer | ||
|
@@ -24,6 +27,9 @@ class Listener: | |
from canopen.objectdictionary.eds import import_from_node | ||
from canopen.objectdictionary import ObjectDictionary | ||
|
||
if TYPE_CHECKING: | ||
from can.typechecking import CanData | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
Callback = Callable[[int, bytearray, float], None] | ||
|
@@ -45,7 +51,7 @@ def __init__(self, bus: Optional[can.BusABC] = None): | |
#: List of :class:`can.Listener` objects. | ||
#: Includes at least MessageListener. | ||
self.listeners = [MessageListener(self)] | ||
self.notifier = None | ||
self.notifier: Optional[can.Notifier] = None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Factored out to #534 |
||
self.nodes: Dict[int, Union[RemoteNode, LocalNode]] = {} | ||
self.subscribers: Dict[int, List[Callback]] = {} | ||
self.send_lock = threading.Lock() | ||
|
@@ -138,15 +144,15 @@ def __exit__(self, type, value, traceback): | |
|
||
def add_node( | ||
self, | ||
node: Union[int, RemoteNode, LocalNode], | ||
object_dictionary: Union[str, ObjectDictionary, None] = None, | ||
node: Union[int, RemoteNode], | ||
object_dictionary: Union[str, ObjectDictionary, TextIO, None] = None, | ||
upload_eds: bool = False, | ||
) -> RemoteNode: | ||
"""Add a remote node to the network. | ||
|
||
:param node: | ||
Can be either an integer representing the node ID, a | ||
:class:`canopen.RemoteNode` or :class:`canopen.LocalNode` object. | ||
:class:`canopen.RemoteNode` object. | ||
Comment on lines
-149
to
+155
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think what is wrong here is the return type annotation. It's perfectly alright to add an existing |
||
:param object_dictionary: | ||
Can be either a string for specifying the path to an | ||
Object Dictionary file or a | ||
|
@@ -161,14 +167,16 @@ def add_node( | |
if upload_eds: | ||
logger.info("Trying to read EDS from node %d", node) | ||
object_dictionary = import_from_node(node, self) | ||
node = RemoteNode(node, object_dictionary) | ||
self[node.id] = node | ||
return node | ||
nodeobj = RemoteNode(node, object_dictionary) | ||
else: | ||
nodeobj = node | ||
self[nodeobj.id] = nodeobj | ||
return nodeobj | ||
|
||
def create_node( | ||
self, | ||
node: int, | ||
object_dictionary: Union[str, ObjectDictionary, None] = None, | ||
object_dictionary: Union[str, ObjectDictionary, TextIO, None] = None, | ||
) -> LocalNode: | ||
"""Create a local node in the network. | ||
|
||
|
@@ -183,11 +191,13 @@ def create_node( | |
The Node object that was added. | ||
""" | ||
if isinstance(node, int): | ||
node = LocalNode(node, object_dictionary) | ||
self[node.id] = node | ||
return node | ||
nodeobj = LocalNode(node, object_dictionary) | ||
else: | ||
nodeobj = node | ||
self[nodeobj.id] = nodeobj | ||
return nodeobj | ||
|
||
def send_message(self, can_id: int, data: bytes, remote: bool = False) -> None: | ||
def send_message(self, can_id: int, data: CanData, remote: bool = False) -> None: | ||
"""Send a raw CAN message to the network. | ||
|
||
This method may be overridden in a subclass if you need to integrate | ||
|
@@ -215,7 +225,7 @@ def send_message(self, can_id: int, data: bytes, remote: bool = False) -> None: | |
self.check() | ||
|
||
def send_periodic( | ||
self, can_id: int, data: bytes, period: float, remote: bool = False | ||
self, can_id: int, data: CanData, period: float, remote: bool = False | ||
) -> PeriodicMessageTask: | ||
"""Start sending a message periodically. | ||
|
||
|
@@ -295,7 +305,7 @@ class PeriodicMessageTask: | |
def __init__( | ||
self, | ||
can_id: int, | ||
data: bytes, | ||
data: CanData, | ||
period: float, | ||
bus, | ||
remote: bool = False, | ||
|
@@ -335,10 +345,12 @@ def update(self, data: bytes) -> None: | |
old_data = self.msg.data | ||
self.msg.data = new_data | ||
if hasattr(self._task, "modify_data"): | ||
assert self._task is not None # This will never be None, but mypy needs this | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Superseded by #534 |
||
self._task.modify_data(self.msg) | ||
elif new_data != old_data: | ||
# Stop and start (will mess up period unfortunately) | ||
self._task.stop() | ||
if self._task is not None: | ||
self._task.stop() | ||
Comment on lines
-341
to
+353
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Superseded by #534 |
||
self._start() | ||
|
||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cosmetics, let's keep these separate from the mypy error fixing. There are lots of things like this to do, but better to keep a PR focused and revisit such cleanups later. IMHO.