Skip to content

Commit

Permalink
Add support for Seplos BMS (Louisvdw#530)
Browse files Browse the repository at this point in the history
* Initial version of Seplos support

TODOS (at least):
* revisit the stupid timing stuff, still does not read data reliably
* validate return code and checksums
* use address to support multiple packs
* read and populate alarm data
* cleanup logging
* calculate real checksums instead of hard wired for one command

* Fix most open issues:
* fixes serial reading from Seplos by simply using readline()
* validates checksums, return codes
* clean up logging
* proper encoding of arbitrary cid2 commands including address, length and checksums

Still TODO:
* read alarm data

* adds reading of alarm data, populates the Protection class.

* adds state of charge/discharge switches

* remove wildcard imports from seplos

* remove wildcard imports from battery template

* fix wildcard import and black lint errors

* Revert battery_template.py changes

The template is already reworked in another PR.

* fix typo

* remove unused code

* add documentation about hardcoded/config max (dis-)charge values (see Louisvdw#4)

---------

Co-authored-by: Manuel <[email protected]>
Co-authored-by: Louis Van Der Walt <[email protected]>
  • Loading branch information
3 people authored May 2, 2023
1 parent 1f99936 commit 9cf3b9a
Show file tree
Hide file tree
Showing 3 changed files with 303 additions and 0 deletions.
3 changes: 3 additions & 0 deletions etc/dbus-serialbattery/battery.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ class Protection(object):
This class holds Warning and alarm states for different types of Checks
They are of type integer, 2 represents an Alarm, 1 a Warning, 0 if everything is fine
"""
ALARM = 2
WARNING = 1
NOALARM = 0

def __init__(self):
self.voltage_high: int = None
Expand Down
2 changes: 2 additions & 0 deletions etc/dbus-serialbattery/dbus-serialbattery.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from daly import Daly
from ant import Ant
from jkbms import Jkbms
from seplos import Seplos

# from sinowealth import Sinowealth
from renogy import Renogy
Expand All @@ -42,6 +43,7 @@
{"bms": Renogy, "baud": 9600, "address": b"\x30"},
{"bms": Renogy, "baud": 9600, "address": b"\xF7"},
{"bms": Ecs, "baud": 19200},
{"bms": Seplos, "baud": 19200},
]
expected_bms_types = [
battery_type
Expand Down
298 changes: 298 additions & 0 deletions etc/dbus-serialbattery/seplos.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,298 @@
# -*- coding: utf-8 -*-
from battery import Protection, Battery, Cell
from utils import logger
import utils
import serial



class Seplos(Battery):
def __init__(self, port, baud, address=0x00):
super(Seplos, self).__init__(port, baud, address)
self.type = self.BATTERYTYPE
self.poll_interval = 5000

BATTERYTYPE = "Seplos"

COMMAND_STATUS = 0x42
COMMAND_ALARM = 0x44
COMMAND_PROTOCOL_VERSION = 0x4F
COMMAND_VENDOR_INFO = 0x51

@staticmethod
def int_from_1byte_hex_ascii(data: bytes, offset: int, signed=False):
return int.from_bytes(
bytes.fromhex(data[offset : offset + 2].decode("ascii")),
byteorder="big",
signed=signed,
)

@staticmethod
def int_from_2byte_hex_ascii(data: bytes, offset: int, signed=False):
return int.from_bytes(
bytes.fromhex(data[offset : offset + 4].decode("ascii")),
byteorder="big",
signed=signed,
)

@staticmethod
def get_checksum(frame: bytes) -> int:
"""implements the Seplos checksum algorithm, returns 4 bytes"""
checksum = 0
for b in frame:
checksum += b
checksum %= 0xFFFF
checksum ^= 0xFFFF
checksum += 1
return checksum

@staticmethod
def get_info_length(info: bytes) -> int:
"""implements the Seplos checksum for the info length"""
lenid = len(info)
if lenid == 0:
return 0

lchksum = (lenid & 0xF) + ((lenid >> 4) & 0xF) + ((lenid >> 8) & 0xF)
lchksum %= 16
lchksum ^= 0xF
lchksum += 1

return (lchksum << 12) + lenid

@staticmethod
def encode_cmd(address: int, cid2: int, info: bytes = b"") -> bytes:
"""encodes a command sent to a battery (cid1=0x46)"""
cid1 = 0x46

info_length = Seplos.get_info_length(info)

frame = "{:02X}{:02X}{:02X}{:02X}{:04X}".format(
0x20, address, cid1, cid2, info_length
).encode()
frame += info

checksum = Seplos.get_checksum(frame)
encoded = b"~" + frame + "{:04X}".format(checksum).encode() + b"\r"
return encoded

def test_connection(self):
# call a function that will connect to the battery, send a command and retrieve the result.
# The result or call should be unique to this BMS. Battery name or version, etc.
# Return True if success, False for failure

try:
return self.read_status_data()
except Exception as err:
logger.error(f"Unexpected {err=}, {type(err)=}")
return False

def get_settings(self):
# After successful connection get_settings will be called to set up the battery.
# Set the current limits, populate cell count, etc.
# Return True if success, False for failure

# BMS does not provide max charge-/discharge, so we have to use hardcoded/config values
self.max_battery_charge_current = utils.MAX_BATTERY_CHARGE_CURRENT
self.max_battery_discharge_current = utils.MAX_BATTERY_DISCHARGE_CURRENT

self.max_battery_voltage = utils.MAX_CELL_VOLTAGE * self.cell_count
self.min_battery_voltage = utils.MIN_CELL_VOLTAGE * self.cell_count

# init the cell array
for _ in range(self.cell_count):
self.cells.append(Cell(False))

return True

def refresh_data(self):
# call all functions that will refresh the battery data.
# This will be called for every iteration (self.poll_interval)
# Return True if success, False for failure
result_status = self.read_status_data()
# sleep(0.5)
result_alarm = self.read_alarm_data()

return result_status and result_alarm

@staticmethod
def decode_alarm_byte(data_byte: int, alarm_bit: int, warn_bit: int):
if data_byte & (1 << alarm_bit) != 0:
return Protection.ALARM
if data_byte & (1 << warn_bit) != 0:
return Protection.WARNING
return Protection.OK

def read_alarm_data(self):
data = self.read_serial_data_seplos(
self.encode_cmd(address=0x00, cid2=self.COMMAND_ALARM, info=b"01")
)
# check if connection success
if data is False:
return False

logger.debug("alarm info raw {}".format(data))
return self.decode_alarm_data(bytes.fromhex(data.decode("ascii")))

def decode_alarm_data(self, data: bytes):
logger.debug("alarm info decoded {}".format(data))
voltage_alarm_byte = data[30]
self.protection.voltage_cell_low = Seplos.decode_alarm_byte(
data_byte=voltage_alarm_byte, alarm_bit=3, warn_bit=2
)
# cell high voltage is actually unused because DBUS does not seem to support it, decoding anyway
# c.f. https://github.com/victronenergy/venus/wiki/dbus#battery
self.protection.voltage_cell_high = Seplos.decode_alarm_byte(
data_byte=voltage_alarm_byte, alarm_bit=1, warn_bit=0
)
self.protection.voltage_low = Seplos.decode_alarm_byte(
data_byte=voltage_alarm_byte, alarm_bit=7, warn_bit=6
)
self.protection.voltage_high = Seplos.decode_alarm_byte(
data_byte=voltage_alarm_byte, alarm_bit=5, warn_bit=4
)

temperature_alarm_byte = data[31]
self.protection.temp_low_charge = Seplos.decode_alarm_byte(
data_byte=temperature_alarm_byte, alarm_bit=3, warn_bit=2
)
self.protection.temp_high_charge = Seplos.decode_alarm_byte(
data_byte=temperature_alarm_byte, alarm_bit=1, warn_bit=0
)
self.protection.temp_low_discharge = Seplos.decode_alarm_byte(
data_byte=temperature_alarm_byte, alarm_bit=7, warn_bit=6
)
self.protection.temp_high_discharge = Seplos.decode_alarm_byte(
data_byte=temperature_alarm_byte, alarm_bit=5, warn_bit=4
)

current_alarm_byte = data[33]
self.protection.current_over = Seplos.decode_alarm_byte(
data_byte=current_alarm_byte, alarm_bit=1, warn_bit=0
)
self.protection.current_under = Seplos.decode_alarm_byte(
data_byte=current_alarm_byte, alarm_bit=3, warn_bit=2
)

soc_alarm_byte = data[34]
self.protection.soc_low = Seplos.decode_alarm_byte(
data_byte=soc_alarm_byte, alarm_bit=3, warn_bit=2
)

switch_byte = data[35]
self.discharge_fet = True if switch_byte & 0b01 != 0 else False
self.charge_fet = True if switch_byte & 0b10 != 0 else False
return True

def read_status_data(self):
logger.debug("read status data")
data = self.read_serial_data_seplos(
self.encode_cmd(address=0x00, cid2=0x42, info=b"01")
)

# check if connection success
if data is False:
return False

cell_count_offset = 4
voltage_offset = 6
temps_offset = 72
self.cell_count = Seplos.int_from_1byte_hex_ascii(
data=data, offset=cell_count_offset
)
if self.cell_count == len(self.cells):
for i in range(self.cell_count):
voltage = (
Seplos.int_from_2byte_hex_ascii(data, voltage_offset + i * 4) / 1000
)
self.cells[i].voltage = voltage
logger.debug("Voltage cell[{}]={}V".format(i, voltage))
for i in range(min(4, self.cell_count)):
temp = (
Seplos.int_from_2byte_hex_ascii(data, temps_offset + i * 4) - 2731
) / 10
self.cells[i].temp = temp
logger.debug("Temp cell[{}]={}°C".format(i, temp))

self.temp1 = (
Seplos.int_from_2byte_hex_ascii(data, temps_offset + 4 * 4) - 2731
) / 10
self.temp2 = (
Seplos.int_from_2byte_hex_ascii(data, temps_offset + 5 * 4) - 2731
) / 10
self.current = (
Seplos.int_from_2byte_hex_ascii(data, offset=96, signed=True) / 100
)
self.voltage = Seplos.int_from_2byte_hex_ascii(data, offset=100) / 100
self.capacity_remain = Seplos.int_from_2byte_hex_ascii(data, offset=104) / 100
self.capacity = Seplos.int_from_2byte_hex_ascii(data, offset=110) / 100
self.soc = Seplos.int_from_2byte_hex_ascii(data, offset=114) / 10
self.cycles = Seplos.int_from_2byte_hex_ascii(data, offset=122)
self.hardware_version = "Seplos BMS {} cells".format(self.cell_count)

logger.debug("Current = {}A , Voltage = {}V".format(self.current, self.voltage))
logger.debug(
"Capacity = {}/{}Ah , SOC = {}%".format(
self.capacity_remain, self.capacity, self.soc
)
)
logger.debug("Cycles = {}".format(self.cycles))
logger.debug(
"Environment temp = {}°C , Power temp = {}°C".format(
self.temp1, self.temp2
)
)
logger.debug("HW:" + self.hardware_version)

return True

@staticmethod
def is_valid_frame(data: bytes) -> bool:
"""checks if data contains a valid frame
* minimum length is 18 Byte
* checksum needs to be valid
* also checks for error code as return code in cid2
* not checked: lchksum
"""
if len(data) < 18:
logger.warning("short read, data={}".format(data))
return False

chksum = Seplos.get_checksum(data[1:-5])
if chksum != Seplos.int_from_2byte_hex_ascii(data, -5):
logger.warning("checksum error")
return False

cid2 = data[7:9]
if cid2 != b"00":
logger.warning("command returned with error code {}".format(cid2))
return False

return True

def read_serial_data_seplos(self, command):
logger.debug("read serial data seplos")

with serial.Serial(self.port, baudrate=self.baud_rate, timeout=1) as ser:
ser.flushOutput()
ser.flushInput()
written = ser.write(command)
logger.debug(
"wrote {} bytes to serial port {}, command={}".format(
written, self.port, command
)
)

data = ser.readline()

if not Seplos.is_valid_frame(data):
return False

length_pos = 10
return_data = data[length_pos + 3 : -5]
info_length = Seplos.int_from_2byte_hex_ascii(b"0" + data[length_pos:], 0)
logger.debug(
"return info data of length {} : {}".format(info_length, return_data)
)

return return_data

0 comments on commit 9cf3b9a

Please sign in to comment.