forked from martiby/ess
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmultiplus2.py
128 lines (107 loc) · 4.9 KB
/
multiplus2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import logging
import time
from vebus import VEBus
"""
Multiplus-II, ESS Mode 3
22.01.2023 Martin Steppuhn
"""
class MultiPlus2:
def __init__(self, port, timeout=10):
self.vebus = VEBus(port=port, log='vebus')
self.log = logging.getLogger('mp2')
self.timeout = timeout
self.data_timeout = time.perf_counter() + self.timeout
self.data = None # Dictionary with all information from Multiplus
self.online = False # True if connection is established
self.cmd_lock_time = None # sleep / wakeup "timer" (time.perf_counter())
self.power_delay_time = time.perf_counter() # set 0 Watt for a time before disable sendening power command
self._wakeup = False
self._sleep = False
def sleep(self):
self._sleep = True
def wakeup(self):
self._wakeup = True
def connect(self):
version = self.vebus.get_version() # hide errors while scanning
if version:
self.data = {'mk2_version': version} # init dictionary
time.sleep(0.1)
if self.vebus.init_address():
time.sleep(0.1)
if self.vebus.scan_ess_assistant():
self.log.info("ess assistant setpoint ramid={}".format(self.vebus.ess_setpoint_ram_id))
self.data['state'] = 'init'
self.online = True
self.data_timeout = time.perf_counter() + self.timeout # start timeout
def command(self, power):
if self.online:
t = time.perf_counter()
if self._wakeup and not self.cmd_lock_time:
self.cmd_lock_time = t + 3 # lock command for 3 seconds
self._wakeup = False
self.vebus.wakeup()
self.log.info("wakeup")
elif self._sleep and not self.cmd_lock_time:
self.cmd_lock_time = t + 3 # lock command for 3 seconds
self._sleep = False
self.vebus.sleep()
self.log.info("sleep")
else:
if abs(power) >= 1:
if self.power_delay_time is None:
self.log.info("set_power start {}".format(power))
self.log.debug("set_power {}".format(power))
self.vebus.set_power(power) # send command to multiplus
self.power_delay_time = t + 5 # send zero for 5seconds after last value >= 1
elif self.power_delay_time:
self.vebus.set_power(0)
if t > self.power_delay_time:
self.power_delay_time = None
self.log.debug("set_power zero trailing timer end")
# reset command lock timer
if self.cmd_lock_time and t > self.cmd_lock_time:
self.cmd_lock_time = None
def update(self, pause_time=0.1):
"""
Read all information from Multiplus-II
:param pause_time: pause time between commands
:return: dictionary
"""
if not self.online:
self.connect()
else:
self.vebus.send_snapshot_request() # trigger snapshot
time.sleep(pause_time)
part1 = self.vebus.get_ac_info() # read ac infos and append to data dictionary
time.sleep(pause_time)
if part1:
part2 = self.vebus.read_snapshot() # read snapshot infos and append to data dictionary
time.sleep(pause_time)
if part2:
part3 = self.vebus.get_led() # read led infos and append to data dictionary
if part3:
data = {}
data.update(part1)
data.update(part2)
data.update(part3)
led = data.get('led_light', 0) + data.get('led_blink', 0)
state = data.get('device_state_id', None)
if state == 2:
data['state'] = 'sleep'
elif led & 0x40:
data['state'] = 'low_bat'
elif led & 0x80:
data['state'] = 'temperature'
elif led & 0x20:
data['state'] = 'overload'
elif state == 8 or state == 9:
data['state'] = 'on'
elif state == 4:
data['state'] = 'wait'
else:
data['state'] = '?{}?0x{:02X}?'.format(state, led)
self.data = data
self.data_timeout = time.perf_counter() + self.timeout # reset data timeout with valid rx
if time.perf_counter() > self.data_timeout:
self.online = False
self.data = {'error': 'offline', 'state': 'offline'}