-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathLLAP.py
143 lines (121 loc) · 4.7 KB
/
LLAP.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
from time import time
__author__ = 'timhodson'
# a python module to support llap
# currently mainly the llap Thermister device Persona
class LLAP:
RESPONSES = {
'AWAKE': 'Device is awake',
'TMPA': 'temperature reading',
'TEMP': 'temperature reading',
'RHUM': 'humidity reading',
'BATTLOW': 'battery low',
'BATT': 'battery reading',
'SLEEPING': 'Device is going to sleep',
'STARTED': 'Device has started',
'ERROR': 'There is an error'
}
REQUESTS = {
'APVER': 'LLAP version number',
'BATT': 'Battery level in volts',
'TEMP': 'request the temperature',
'HUM': 'request the humidity',
# not bothered with the other 'tweeks' that we could do.
}
observers = []
def __init__(self):
# might want to do something clever with detecting serial ports and what not?
pass
def split_response(self, response):
"""Split the response up into bits
Keyword arguments:
response -- the string to split into bits.
Returns:
bits -- the bits of the response
"""
self.validate_msg(response)
bits = {
'raw': response,
'deviceId': response[1:3],
'time': time()
}
for key in self.RESPONSES.keys():
if key in response:
bits['responseType'] = response[3:3 + len(key)]
bits['responseValue'] = response[3 + len(key):].rstrip("-")
self.notify_observers(bits['responseType'], bits)
return bits
def get_responses(self, msg):
"""Get the parsed responses out of an arbitrary message string.
You are assumed to have got the LLAP message yourself
by whatever means from the serial port
Keyword arguments:
msg -- a String which may contain LLAP responses from a device.
Returns:
responses -- a list of dictionaries for each response found in the message.
You don't have to do anything with this if you have already registered a listener for the messages.
"""
responses = []
if len(msg) % 12 != 0:
self.notify_observers(
'ERROR',
{
'responseType': 'ERROR',
'responseValue': 'Serial message not divisible by twelve',
'msg': msg,
'time': time()
}
)
if len(msg) >= 12:
# attempt to process as much as we can...
# find the first lower case a in the string and trim msg to that.
msg = msg[msg.index('a'):]
msg_count = len(msg) / 12
s = 0
for i in range(1, msg_count + 1):
e = i * 12
responses.append(self.split_response(msg[s:e]))
s += 12 # increment start ready for next loop
return responses
def build_request(self, devid, type):
self.validate_request(devid, type)
request = '{0}{1}{2:-<9}'.format('a', devid, type)
self.validate_msg(request)
return request
@staticmethod
def validate_msg(msg):
if len(msg) > 12:
# todo - raise some more specific Exceptions
raise LlapException("completed request is too long")
@staticmethod
def validate_request(d, t):
if len(d) > 2 or len(t) > 9:
raise LlapException("invalid parameters to build_request")
def register_observer(self, event, observer):
"""Register an observer to deal with each response found.
Each callback is self contained and cannot be chained with other callbacks.
For example: you cannot register a callback to add a timestamp
to every message and then expect other callbacks to also see that timestamp.
Keyword arguments:
event -- the type of message to react to
observer -- a reference to the callback to execute
"""
self.observers.append({event: observer})
def unregister_observer(self, event, observer):
newList = []
flag = False
for observerDict in self.observers:
if observerDict.keys()[0] == event:
if observerDict[event] == observer:
flag = True
if flag is not True:
newList.append(observerDict)
self.observers = newList
def notify_observers(self, event, data):
for observer in self.observers:
if observer.keys()[0] == event or observer.keys()[0] == 'ALL':
observer.values()[0](data)
class LlapException(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)