-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathpyhaptic.py
executable file
·144 lines (124 loc) · 4.32 KB
/
pyhaptic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
"""
.. module:: HapticInterface
:platform: Linux, Windows, Mac
:synopsis: Part of the Haptic Construction Kit https://github.com/Haptic-Construction-Kit/
.. moduleauthor:: see contributors.md
"""
import serial
import time
import struct
import logging
class HapticInterface:
"""Creates instances of haptic devices."""
VERSION = 1
def __init__(self, comm_choice):
self.ser = ""
self.comm_choice = comm_choice
self.ascii = True
def __build_binary(self, motor, rhythm, magnitude, duration):
#Commands are 16 bits each, big endian
#ttmmmmmmRRRMMddd
vibrate_type = 0
motor_mask = 63
rhythm_mask = 7
magnitude_mask = 3
duration_mask = 7
byte1 = vibrate_type << 6 | (motor & motor_mask)
byte2 = (rhythm & rhythm_mask) << 5 | (magnitude & magnitude_mask) << 3 | (duration & duration_mask)
result = struct.pack('>BB', byte1, byte2)
logging.debug('built command string: ' + result)
return result
def set_ascii(self):
self.ser.write("\xC0\x00")
response = self.ser.read(1)
if "\00" not in response:
for x in response:
logging.debug('Error Switching to ascii' + str(x))
else:
self.ascii = True
def set_binary(self):
self.__send("BGN\n")
#assume success until we get the exceptions implemented
self.ascii = False
def __send(self, command):
self.ser.write(command)
response = self.ser.readlines()
if not any("STS 0" in s for s in response):
logging.debug('Error sending ascii command: ' + command)
return response
def connect(self):
"""Attempts a connection over serial to the haptic device.
Args:
none.
Returns:
none.
Raises:
SerialException
"""
self.ser = serial.Serial(self.comm_choice, timeout=.1)
#check that meets min version
def qry_ver(self):
"""Query version number of controller firmware.
Args:
none.
Returns:
int. The version number of controller firmware:
"""
if not (self.ascii): #we need to be in ascii
self.set_ascii()
self.ser.write("QRY VER\n")
response = self.ser.readline()
response_list = response.split(" ")
ver = 0
if ((len(response_list) == 3) & (response_list[0] == "RSP")):
ver = int(response_list[2])
else:
#throw communication exception?
pass
logging.debug( 'Version Queried:' + str(ver) )
return ver
def qry_number_motors(self):
"""Query number of motors present.
Args:
none.
Returns:
int. The number of motors present:
"""
if not (self.ascii): #we need to be in ascii
self.set_ascii()
self.ser.write("QRY MTR\n")
response = self.ser.readline()
response_list = response.split(" ")
if ((len(response_list) == 3) & (response_list[0] == "RSP")):
motors = int(response_list[2])
else:
motors = 0
logging.debug('Motors Queried: ' + str(motors))
return motors
def qry_magnitudes(self):
pass
def qry_rhythms(self):
pass
def learn_rhythm(self):
return 'Rhythm Learned'
def learn_magnitude(self):
pass
def vibrate(self, motor, rhythm, magnitude, cycles):
"""Vibrate a motor.
Args:
| motor(int): Motor to select. 0 indexed.
| rhythm(int): Stored rhythm to select. 0 indexed. Use qry_rhythms to see available and learn_rhythm to store rhythms.
| magnitude(int): Stored magnitude to select. 0 indexed. Use qry_magnitudes to see available and learn_magnitude to store magnitudes.
| cycles(int): Number of times to run stored magnitude. 0-7 are valid. 7 is continuous. 0 is used to interrupt a vibration and disable motor.
Returns:
none.
"""
if(self.ascii):
self.set_binary()
self.ser.write(self.__build_binary(motor, rhythm, magnitude, cycles))
response = self.ser.read(1)
if "\00" not in response:
for x in response:
logging.debug( 'Error Vibrating' + str(x) )
def disconnect(self):
return 'Disconnect Successful'