-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathicu.py
289 lines (236 loc) · 12.1 KB
/
icu.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
import cv2
import numpy as np
from skimage.metrics import structural_similarity as ssim
from ultralytics import YOLO
import os
import datetime
import yt_dlp
import time
import torch
import threading
# Global variables to manage the thread
fall_detect_thread = None
fall_detect_end_time = 0
curr_frame = None
frame_width = 0
frame_height = 0
timestamp = 0
fall_danger_model = None
# Function to load video from a path, link, or CCTV
def load_video(source):
if source.lower() == 'cctv':
return cv2.VideoCapture(0) # Use 0 or the appropriate index for your camera
elif os.path.isfile(source):
return cv2.VideoCapture(source)
elif source.startswith('http'):
try:
# Download video from YouTube
ydl_opts = {
'format': 'best[ext=mp4][height<=360]',
'outtmpl': '%(title)s.%(ext)s',
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info_dict = ydl.extract_info(source, download=True)
video_path = ydl.prepare_filename(info_dict)
return cv2.VideoCapture(video_path)
except Exception as e:
raise ValueError(f"Error downloading video: {e}")
else:
raise ValueError("Invalid video source. Provide a valid file path, URL, or 'cctv'.")
# Function to check if a specific class is present in the results
def contains_class(results, class_name):
for result in results:
for box in result.boxes:
cls_id = int(box.cls[0])
label = result.names[cls_id] if cls_id in result.names else 'Unknown'
if label == class_name:
return True
return False
# Function to detect specific movements
def detect_specific_movement(prev_roi, curr_roi):
# Convert ROIs to grayscale
prev_gray = cv2.cvtColor(prev_roi, cv2.COLOR_BGR2GRAY)
curr_gray = cv2.cvtColor(curr_roi, cv2.COLOR_BGR2GRAY)
# Calculate optical flow
flow = cv2.calcOpticalFlowFarneback(prev_gray, curr_gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)
# Calculate magnitude and angle of 2D vectors
magnitude, angle = cv2.cartToPolar(flow[..., 0], flow[..., 1])
# Define regions for different body parts
height, width = prev_roi.shape[:2]
head_region = (width // 4, 0, width * 3 // 4, height // 3)
hand_region_left = (0, height // 3, width // 3, height * 2 // 3)
hand_region_right = (width * 2 // 3, height // 3, width, height * 2 // 3)
leg_region = (0, height * 2 // 3, width, height)
chest_region = (width // 4, height // 3, width * 3 // 4, height * 2 // 3)
# Function to check movement in a region
def check_movement(region, threshold):
roi = magnitude[region[1]:region[3], region[0]:region[2]]
return np.mean(roi) > threshold and np.max(roi) > threshold * 2
# Check for movement in different regions
head_movement = check_movement(head_region, 1.0)
hand_movement_left = check_movement(hand_region_left, 1.5)
hand_movement_right = check_movement(hand_region_right, 1.5)
leg_movement = check_movement(leg_region, 2.0)
# Check for breathlessness (rapid chest movement)
chest_roi = magnitude[chest_region[1]:chest_region[3], chest_region[0]:chest_region[2]]
chest_movement = np.mean(chest_roi)
breathlessness = chest_movement > 2.0 and chest_movement < 8.0 # Adjust these thresholds as needed
movements = []
if head_movement:
movements.append("Head")
if hand_movement_left or hand_movement_right:
movements.append("Hand")
if leg_movement:
movements.append("Leg")
if breathlessness:
movements.append("Breathlessness")
return movements
def fall_detect_loop(cv2_):
global fall_detect_end_time, curr_frame, frame_width, frame_height, timestamp, fall_danger_model
alert_start_time = None
alert_duration = 5 # 5 seconds
while time.time() < fall_detect_end_time:
if curr_frame is not None:
results = fall_danger_model(curr_frame)
detection = ''
for result in results:
boxes = result.boxes.cpu().numpy()
for box in boxes:
cls_id = int(box.cls[0])
label = result.names[cls_id] if cls_id in result.names else 'Unknown'
if label in ('fall'):
detection += ' ' + label
with open('movement_log.txt', 'a', encoding='utf-8') as f:
f.write(f"{datetime.timedelta(seconds=int(timestamp))}: {label}\n")
if detection:
alert_start_time = time.time()
if alert_start_time and time.time() - alert_start_time < alert_duration:
cv2_.rectangle(curr_frame, (0, 0), (frame_width, frame_height), (0,0,255), 5)
cv2_.putText(curr_frame, '! ALERT !', (20, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0,0,255), 2)
else:
alert_start_time = None
time.sleep(0.1) # Small delay to prevent CPU overuse
def fall_detect(cv2_):
global fall_detect_thread, fall_detect_end_time
current_time = time.time()
if fall_detect_thread is None or not fall_detect_thread.is_alive():
fall_detect_end_time = current_time + 5
fall_detect_thread = threading.Thread(target=fall_detect_loop, args=(cv2_,))
fall_detect_thread.start()
else:
fall_detect_end_time = max(fall_detect_end_time, current_time + 5)
def main():
global curr_frame, frame_width, frame_height, timestamp, fall_danger_model
# Load the YOLOv8 model
model = YOLO('person_labelling.pt').to(device)
fall_danger_model = YOLO('fall_detection.pt').to(device)
# Get the video source from the user
video_source = input("Enter the video file path, URL, or 'cctv' for live feed: ")
try:
# Open the input video
cap = load_video(video_source)
if not cap.isOpened():
print("Error: Unable to open video source.")
return
# Get video properties
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS))
# Define the codec and create VideoWriter object for saving the output video
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
if video_source.lower() == 'cctv':
video_name = 'cctv_output.mp4'
else:
video_name = os.path.splitext(os.path.basename(video_source))[0] + '_output.mp4'
output_video_path = video_name
out = cv2.VideoWriter(output_video_path, fourcc, fps, (frame_width, frame_height))
# Read the first frame for motion detection initialization
ret, prev_frame = cap.read()
if not ret:
print("Failed to read the first frame.")
return
frame_count = 0
fall_detect(cv2)
alert_start_time = None
alert_duration = 5 # 5 seconds
# Open the movement log file for writing in real-time
with open('movement_log.txt', 'w') as log_file:
while cap.isOpened():
ret, curr_frame = cap.read()
if not ret:
break
frame_count += 1
timestamp = frame_count / fps # Calculate timestamp in seconds
# Perform inference with the main model
results = model(curr_frame)
# Check for the presence of doctor or nurse
doctor_or_nurse_present = contains_class(results, 'doctor') or contains_class(results, 'nurse')
# Process each detection result
for result in results:
boxes = result.boxes.cpu().numpy()
for box in boxes:
x1, y1, x2, y2 = map(int, box.xyxy[0])
conf = box.conf[0]
cls_id = int(box.cls[0])
label = result.names[cls_id] if cls_id in result.names else 'Unknown'
if label == 'patient' and not doctor_or_nurse_present:
# Motion detection within patient box
roi_prev = prev_frame[y1:y2, x1:x2]
roi_curr = curr_frame[y1:y2, x1:x2]
# Ensure the regions are the same size
if roi_prev.shape == roi_curr.shape:
# Compute SSIM between the current and previous frames
ssim_index = ssim(cv2.cvtColor(roi_prev, cv2.COLOR_BGR2GRAY), cv2.cvtColor(roi_curr, cv2.COLOR_BGR2GRAY))
# Define a threshold for SSIM to detect motion
ssim_threshold = 0.95 # This value may need adjustment based on your video
if ssim_index < ssim_threshold:
color = (255, 0, 0) # Blue if significant motion detected
# Detect specific movements
movements = detect_specific_movement(roi_prev, roi_curr)
if movements:
color = (0, 0, 255) # Red if specific movements detected
movement_text = ", ".join(movements)
# Log the movement with timestamp
log_entry = f"{datetime.timedelta(seconds=int(timestamp))}: {movement_text}\n"
log_file.write(log_entry)
log_file.flush() # Ensure the log is written in real-time
fall_detect(cv2)
else:
movement_text = ""
else:
color = (0, 255, 255) # Yellow if no significant motion detected and no doctor/nurse
movement_text = ""
else:
color = (0, 255, 255) # Yellow if regions do not match exactly
movement_text = ""
else:
color = (0, 255, 0) # Green for all other cases
movement_text = ""
cv2.rectangle(curr_frame, (x1, y1), (x2, y2), color, 2)
cv2.putText(curr_frame, f'{label} {conf:.2f}', (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, color, 2)
if movement_text:
cv2.putText(curr_frame, movement_text, (x1, y2 + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
# Check if alert should be displayed
if alert_start_time and time.time() - alert_start_time < alert_duration:
cv2.rectangle(curr_frame, (0, 0), (frame_width, frame_height), (0,0,255), 5)
cv2.putText(curr_frame, '! ALERT !', (20, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0,0,255), 2)
else:
alert_start_time = None
# Display the frame
cv2.imshow('Live Prediction', curr_frame)
# Write the frame with predictions to the output video
out.write(curr_frame)
# Break the loop if 'q' is pressed
if cv2.waitKey(1) & 0xFF == ord('q'):
break
# Update frames for motion detection
prev_frame = curr_frame
# Release resources
cap.release()
out.release()
cv2.destroyAllWindows()
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
device = 'cuda' if torch.cuda.is_available() else 'cpu'
main()