-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathbasic_example6dof.py
91 lines (66 loc) · 2.57 KB
/
basic_example6dof.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
"""
Streaming 6Dof from QTM
"""
import asyncio
import xml.etree.ElementTree as ET
import pkg_resources
import qtm_rt
QTM_FILE = pkg_resources.resource_filename("qtm_rt", "data/Demo.qtm")
def create_body_index(xml_string):
""" Extract a name to index dictionary from 6dof settings xml """
xml = ET.fromstring(xml_string)
body_to_index = {}
for index, body in enumerate(xml.findall("*/Body/Name")):
body_to_index[body.text.strip()] = index
return body_to_index
def body_enabled_count(xml_string):
xml = ET.fromstring(xml_string)
return sum(enabled.text == "true" for enabled in xml.findall("*/Body/Enabled"))
async def main():
# Connect to qtm
connection = await qtm_rt.connect("127.0.0.1")
# Connection failed?
if connection is None:
print("Failed to connect")
return
# # Take control of qtm, context manager will automatically release control after scope end
# async with qtm_rt.TakeControl(connection, "password"):
# realtime = True
# if realtime:
# # Start new realtime
# await connection.new()
# else:
# # Load qtm file
# await connection.load(QTM_FILE)
# # start rtfromfile
# await connection.start(rtfromfile=True)
# Get 6dof settings from qtm
xml_string = await connection.get_parameters(parameters=["6d"])
body_index = create_body_index(xml_string)
print("{} of {} 6DoF bodies enabled".format(body_enabled_count(xml_string), len(body_index)))
wanted_body = "PORT ROTOR"
def on_packet(packet):
info, bodies = packet.get_6d()
print(
"Framenumber: {} - Body count: {}".format(
packet.framenumber, info.body_count
)
)
if wanted_body is not None and wanted_body in body_index:
# Extract one specific body
wanted_index = body_index[wanted_body]
position, rotation = bodies[wanted_index]
print("{} - Pos: {} - Rot: {}".format(wanted_body, position, rotation))
else:
# Print all bodies
for position, rotation in bodies:
print("Pos: {} - Rot: {}".format(position, rotation))
# Start streaming frames
await connection.stream_frames(components=["6d"], on_packet=on_packet)
# Wait asynchronously 5 seconds
await asyncio.sleep(5)
# Stop streaming
await connection.stream_frames_stop()
if __name__ == "__main__":
# Run our asynchronous function until complete
asyncio.get_event_loop().run_until_complete(main())