Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test and fix non-byte-aligned PDO mappings #454

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 28 additions & 25 deletions canopen/pdo/base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

import threading
import math
from typing import Callable, Dict, Iterator, List, Optional, Union, TYPE_CHECKING
from collections.abc import Mapping
import logging
Expand Down Expand Up @@ -260,7 +260,7 @@ def _fill_map(self, needed):
self.map.append(var)

def _update_data_size(self):
self.data = bytearray(int(math.ceil(self.length / 8.0)))
self.data = bytearray(-(-self.length // 8)) # rounds up

@property
def name(self) -> str:
Expand Down Expand Up @@ -564,23 +564,29 @@ def get_data(self) -> bytes:
"""
byte_offset, bit_offset = divmod(self.offset, 8)

byte_last = -(-(self.offset + self.length) // 8) # rounds up
if bit_offset or self.length % 8:
# Need information of the current variable type (unsigned vs signed)
data_type = self.od.data_type
if data_type == objectdictionary.BOOLEAN:
# A boolean type needs to be treated as an U08
data_type = objectdictionary.UNSIGNED8
od_struct = self.od.STRUCT_TYPES[data_type]
data = od_struct.unpack_from(self.pdo_parent.data, byte_offset)[0]
# Mask of relevant bits for this mapping, starting at bit 0
mask = (1 << self.length) - 1
# Extract all needed bytes and convert to a number for bit operations
cur_msg_data = self.pdo_parent.data[byte_offset:byte_last]
cur_msg_bits = int.from_bytes(cur_msg_data, byteorder="little")
# Shift and mask to get the correct values
data = (data >> bit_offset) & ((1 << self.length) - 1)
data_bits = (cur_msg_bits >> bit_offset) & mask

# Check if the variable is signed and if the data is negative prepend signedness
if od_struct.format.islower() and (1 << (self.length - 1)) < data:
if od_struct.format.islower() and (1 << (self.length - 1)) < data_bits:
# fill up the rest of the bits to get the correct signedness
data = data | (~((1 << self.length) - 1))
data = od_struct.pack(data)
data_bits |= ~mask
data = od_struct.pack(data_bits)
else:
data = self.pdo_parent.data[byte_offset:byte_offset + len(self.od) // 8]
data = self.pdo_parent.data[byte_offset:byte_last]

return data

Expand All @@ -593,27 +599,24 @@ def set_data(self, data: bytes):
logger.debug("Updating %s to %s in %s",
self.name, binascii.hexlify(data), self.pdo_parent.name)

byte_last = -(-(self.offset + self.length) // 8) # rounds up
if bit_offset or self.length % 8:
cur_msg_data = self.pdo_parent.data[byte_offset:byte_offset + len(self.od) // 8]
# Need information of the current variable type (unsigned vs signed)
data_type = self.od.data_type
if data_type == objectdictionary.BOOLEAN:
# A boolean type needs to be treated as an U08
data_type = objectdictionary.UNSIGNED8
od_struct = self.od.STRUCT_TYPES[data_type]
cur_msg_data = od_struct.unpack(cur_msg_data)[0]
# data has to have the same size as old_data
data = od_struct.unpack(data)[0]
# Mask of relevant bits for this mapping, starting at bit 0
mask = (1 << self.length) - 1
# Extract all needed bytes and convert to a number for bit operations
cur_msg_data = self.pdo_parent.data[byte_offset:byte_last]
cur_msg_bits = int.from_bytes(cur_msg_data, byteorder="little")
# Mask out the old data value
# At the end we need to mask for correct variable length (bitwise operation failure)
shifted = (((1 << self.length) - 1) << bit_offset) & ((1 << len(self.od)) - 1)
bitwise_not = (~shifted) & ((1 << len(self.od)) - 1)
cur_msg_data = cur_msg_data & bitwise_not
cur_msg_bits &= ~(mask << bit_offset)
# Convert new value to a number for generic bit operations
data_bits = int.from_bytes(data, byteorder="little") & mask
# Set the new data on the correct position
data = (data << bit_offset) | cur_msg_data
od_struct.pack_into(self.pdo_parent.data, byte_offset, data)
data_bits = (data_bits << bit_offset) | cur_msg_bits

merged_data = data_bits.to_bytes(byte_last - byte_offset, byteorder="little")
self.pdo_parent.data[byte_offset:byte_last] = merged_data
else:
self.pdo_parent.data[byte_offset:byte_offset + len(data)] = data
self.pdo_parent.data[byte_offset:byte_last] = data[0:byte_last]

self.pdo_parent.update()

Expand Down
14 changes: 14 additions & 0 deletions test/sample.eds
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,20 @@ DataType=0x0001
AccessType=rw
PDOMapping=1

[2007]
ParameterName=UNSIGNED16 value
ObjectType=0x7
DataType=0x0006
AccessType=rw
PDOMapping=1

[2008]
ParameterName=UNSIGNED32 value
ObjectType=0x7
DataType=0x0007
AccessType=rw
PDOMapping=1

[2020]
ParameterName=Complex data type
ObjectType=0x7
Expand Down
29 changes: 29 additions & 0 deletions test/test_pdo.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,35 @@ def test_bit_mapping(self):
self.assertEqual(node.tpdo[0x2002].raw, 0xf)
self.assertEqual(node.pdo[0x1600][0x2002].raw, 0xf)

def test_bit_offsets(self):
node = canopen.Node(1, EDS_PATH)
pdo = node.pdo.tx[1]
pdo.add_variable('UNSIGNED8 value', length=4) # byte-aligned, partial byte length
pdo.add_variable('INTEGER8 value') # non-byte-aligned, one whole byte length
pdo.add_variable('UNSIGNED32 value', length=24) # non-aligned, partial last byte
pdo.add_variable('UNSIGNED16 value', length=12) # non-aligned, whole last byte
pdo.add_variable('INTEGER16 value', length=3) # byte-aligned, partial byte length
pdo.add_variable('INTEGER32 value', length=13) # non-aligned, whole last byte

# Write some values
pdo['UNSIGNED8 value'].raw = 3
pdo['INTEGER8 value'].raw = -2
pdo['UNSIGNED32 value'].raw = 0x987654
pdo['UNSIGNED16 value'].raw = 0x321
pdo['INTEGER16 value'].raw = -1
pdo['INTEGER32 value'].raw = -1071

# Check expected data
self.assertEqual(pdo.data, b'\xe3\x4f\x65\x87\x19\x32\x8f\xde')
acolomb marked this conversation as resolved.
Show resolved Hide resolved

# Read values from data
self.assertEqual(pdo['UNSIGNED8 value'].raw, 3)
self.assertEqual(pdo['INTEGER8 value'].raw, -2)
self.assertEqual(pdo['UNSIGNED32 value'].raw, 0x987654)
self.assertEqual(pdo['UNSIGNED16 value'].raw, 0x321)
self.assertEqual(pdo['INTEGER16 value'].raw, -1)
self.assertEqual(pdo['INTEGER32 value'].raw, -1071)

def test_save_pdo(self):
node = canopen.Node(1, EDS_PATH)
node.tpdo.save()
Expand Down
Loading