-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.py
executable file
·116 lines (92 loc) · 2.87 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#!/usr/bin/env python
from __future__ import unicode_literals
import time
import logging
from flask import Flask, render_template, Response, request
from flask_socketio import SocketIO, emit, disconnect
from utils import requires_auth, socket_requires_auth
from camera import Camera
from controller import LauncherController
from motors import MotorsController
app = Flask(__name__)
socketio = SocketIO(app)
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.INFO)
app.logger.addHandler(stream_handler)
camera_ids = Camera.get_available_cameras()
cameras = {str(i) : Camera(i) for i in camera_ids}
print(camera_ids)
launcher = LauncherController()
motors = MotorsController()
rtts = {}
RTTS_INCREMENT = 1.15
def get_client_id():
return 0
@app.route('/')
@requires_auth
def index():
"""Video streaming home page."""
return render_template('index.html')
@socketio.on('connected')
def connected():
rtts[get_client_id()] = 1
@socketio.on('disconnect')
def unconnect():
try:
del rtts[get_client_id()]
except KeyError:
pass
@socketio.on('list_cameras')
@socket_requires_auth
def list_cameras():
emit('cameras', camera_ids)
@socketio.on('rttpong')
@socket_requires_auth
def pong(data):
rtts[get_client_id()] = min((time.time() - data["timestamp"]), 1)
emit('rttpong', 'done')
@socketio.on('stream')
@socket_requires_auth
def stream(stream_id):
stream_id = str(stream_id)
if stream_id in cameras:
data = {
'id': stream_id,
'raw': 'data:image/jpeg;base64,' + cameras[stream_id].get_frame_base64(),
}
emit('frame', data)
else:
print("ERROR: Invalid stream ID %s" % stream_id)
@socketio.on('motor')
@socket_requires_auth
def motor(command):
emit('rttping', {"timestamp": time.time()})
direction = command["direction"]
steering = command["steering"]
if direction not in ["fwd", "bwd", "none"]:
print('ERROR in motor: wrog direction')
elif steering not in ["left", "right", "none"]:
print('ERROR in motor: wrong steering')
else:
duration = 1
if get_client_id() in rtts:
duration = rtts[get_client_id()] * RTTS_INCREMENT
motors.do_step(direction, steering, duration=duration)
@socketio.on('rocket')
@socket_requires_auth
def rocket(data):
"""rocket socket"""
emit('rttping', {"timestamp": time.time()})
command = data["command"]
if command not in ["stop", "up", "down", "right", "left", "fire"]:
print('ERROR in rocket: invalid command')
else:
if command == "fire":
launcher.fire()
else:
duration = 0.05
if get_client_id() in rtts:
duration = rtts[get_client_id()] * RTTS_INCREMENT
launcher.step(command, duration)
if __name__ == '__main__':
socketio.run(app, host='0.0.0.0', port=5000, debug=True)