Skip to content

Commit

Permalink
AP_camera: Add example scripts for camera tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
khanasif786 committed Aug 30, 2024
1 parent 6d844d8 commit d70f55d
Show file tree
Hide file tree
Showing 2 changed files with 320 additions and 0 deletions.
148 changes: 148 additions & 0 deletions libraries/AP_Camera/examples/send_camera_information.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
import threading
import time
from pymavlink import mavutil
import argparse
import socket
import struct

# Define CAMERA_CAP_FLAGS as constants
CAMERA_CAP_FLAGS_CAPTURE_VIDEO = 1 # Camera is able to record video
CAMERA_CAP_FLAGS_CAPTURE_IMAGE = 2 # Camera is able to capture images
CAMERA_CAP_FLAGS_HAS_MODES = 4 # Camera has separate Video and Image/Photo modes
CAMERA_CAP_FLAGS_CAN_CAPTURE_IMAGE_IN_VIDEO_MODE = 8 # Camera can capture images while in video mode
CAMERA_CAP_FLAGS_CAN_CAPTURE_VIDEO_IN_IMAGE_MODE = 16 # Camera can capture videos while in Photo/Image mode
CAMERA_CAP_FLAGS_HAS_IMAGE_SURVEY_MODE = 32 # Camera has image survey mode
CAMERA_CAP_FLAGS_HAS_BASIC_ZOOM = 64 # Camera has basic zoom control
CAMERA_CAP_FLAGS_HAS_BASIC_FOCUS = 128 # Camera has basic focus control
CAMERA_CAP_FLAGS_HAS_VIDEO_STREAM = 256 # Camera has video streaming capabilities
CAMERA_CAP_FLAGS_HAS_TRACKING_POINT = 512 # Camera supports tracking of a point
CAMERA_CAP_FLAGS_HAS_TRACKING_RECTANGLE = 1024 # Camera supports tracking of a selection rectangle
CAMERA_CAP_FLAGS_HAS_TRACKING_GEO_STATUS = 2048 # Camera supports tracking geo status

class CameraTrackingScript:
def __init__(self, ip, port, sysid, compid):
self.ip = ip
self.port = port
self.sysid = sysid
self.compid = compid
self.connection = None
self.udp_ip = "127.0.0.1" # Localhost
self.udp_port = 14580 # Port to send to
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

def connect_to_mavlink(self):
self.connection = mavutil.mavlink_connection(f'udp:{self.ip}:{self.port}', source_system=self.sysid)
print("Searching Vehicle")
while not self.connection.probably_vehicle_heartbeat(self.connection.wait_heartbeat()):
print(".", end="")
print("\nFound Vehicle")
self.connection.wait_heartbeat()
print("Heartbeat received from system (system %u component %u)" % (self.connection.target_system, self.connection.target_component))
self.connection.mav.heartbeat_send(
type=mavutil.mavlink.MAV_TYPE_ONBOARD_CONTROLLER,
autopilot=mavutil.mavlink.MAV_AUTOPILOT_INVALID,
base_mode=0,
custom_mode=0,
system_status=mavutil.mavlink.MAV_STATE_UNINIT,
mavlink_version=3
)

def send_camera_information(self):
flags = (
CAMERA_CAP_FLAGS_CAPTURE_VIDEO |
CAMERA_CAP_FLAGS_CAPTURE_IMAGE |
CAMERA_CAP_FLAGS_HAS_MODES |
CAMERA_CAP_FLAGS_CAN_CAPTURE_IMAGE_IN_VIDEO_MODE |
CAMERA_CAP_FLAGS_CAN_CAPTURE_VIDEO_IN_IMAGE_MODE |
CAMERA_CAP_FLAGS_HAS_IMAGE_SURVEY_MODE |
CAMERA_CAP_FLAGS_HAS_BASIC_ZOOM |
CAMERA_CAP_FLAGS_HAS_BASIC_FOCUS |
CAMERA_CAP_FLAGS_HAS_VIDEO_STREAM |
CAMERA_CAP_FLAGS_HAS_TRACKING_POINT |
CAMERA_CAP_FLAGS_HAS_TRACKING_RECTANGLE |
CAMERA_CAP_FLAGS_HAS_TRACKING_GEO_STATUS
)
self.connection.mav.camera_information_send(
int(time.time() * 1000) & 0xFFFFFFFF, # time_boot_ms
b"CameraVendor123" + b'\0' * (32 - len("CameraVendor123")), # vendor_name
b"CameraModel123" + b'\0' * (32 - len("CameraModel123")), # model_name
(1 << 24) | (0 << 16) | (0 << 8) | 1, # firmware_version
float('nan'), # focal_length
float('nan'), # sensor_size_h
float('nan'), # sensor_size_v
640, # resolution_h
480, # resolution_v
0, # lens_id
flags, # flags
0, # cam_definition_version
b"", # cam_definition_uri
0 # gimbal_device_id
)
print("Camera information message sent")

def handle_camera_track_point(self, msg):
print("Received MAV_CMD_CAMERA_TRACK_POINT command.")
# These are already floats
param1 = msg.param1
param2 = msg.param2
print(f"Tracking point parameters: param1={param1}, param2={param2}")

def handle_camera_track_rectangle(self, msg):
print("Received MAV_CMD_CAMERA_TRACK_RECTANGLE command.")
# These should remain as floats (normalized coordinates)
norm_x = msg.param1
norm_y = msg.param2
norm_w = msg.param3
norm_h = msg.param4
print(f"Tracking rectangle parameters: norm_x={norm_x}, norm_y={norm_y}, norm_w={norm_w}, norm_h={norm_h}")

# Send normalized coordinates as floats to the OpenCV script
self.sock.sendto(struct.pack('!ffff', norm_x, norm_y, norm_w, norm_h), (self.udp_ip, self.udp_port))
print("Sent normalized tracking coordinates to OpenCV script.")

def send_heartbeat(self):
while True:
self.connection.mav.heartbeat_send(
type=mavutil.mavlink.MAV_TYPE_ONBOARD_CONTROLLER,
autopilot=mavutil.mavlink.MAV_AUTOPILOT_INVALID,
base_mode=0,
custom_mode=0,
system_status=mavutil.mavlink.MAV_STATE_UNINIT,
mavlink_version=3
)
time.sleep(1)

def run(self):
self.connect_to_mavlink()
self.send_camera_information()

# Start the heartbeat thread
heartbeat_thread = threading.Thread(target=self.send_heartbeat)
heartbeat_thread.daemon = True
heartbeat_thread.start()

while True:
msg = self.connection.recv_match(type='COMMAND_LONG', blocking=True)
if msg and msg.get_type() == 'COMMAND_LONG':
if msg.target_system == self.sysid:
if msg.command == mavutil.mavlink.MAV_CMD_CAMERA_TRACK_POINT:
self.handle_camera_track_point(msg)
elif msg.command == mavutil.mavlink.MAV_CMD_CAMERA_TRACK_RECTANGLE:
self.handle_camera_track_rectangle(msg)
else:
print("Received but not for us")

def main():
parser = argparse.ArgumentParser(description="A script to demonstrate command-line arguments for sysid and compid.")
parser.add_argument('--sysid', type=int, help='System ID', required=True)
parser.add_argument('--compid', type=int, help='Component ID', required=True)
args = parser.parse_args()

ip = "127.0.0.1"
port = 14570

script = CameraTrackingScript(ip, port, args.sysid, args.compid)
script.run()

if __name__ == "__main__":
main()
172 changes: 172 additions & 0 deletions libraries/AP_Camera/examples/tracking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
import cv2
import socket
import struct
import threading
import math
from pymavlink import mavutil
import time

class GimbalControl:
def __init__(self, connection_str):
self.master = mavutil.mavlink_connection(connection_str)
self.master.wait_heartbeat()
print("Heartbeat from the system (system %u component %u)" %
(self.master.target_system, self.master.target_component))
self.center_x = 0
self.center_y = 0
self.lock = threading.Lock() # Initialize a lock for shared variables
self.control_thread = threading.Thread(target=self.send_command)
self.control_thread.start()

def send_gimbal_manager_pitch_yaw_angles(self, pitch, yaw, pitch_rate, yaw_rate):
msg = self.master.mav.gimbal_manager_set_pitchyaw_encode(
self.master.target_system,
self.master.target_component,
0,
0,
pitch,
yaw,
pitch_rate,
yaw_rate
)
self.master.mav.send(msg)

def send_command(self):
while True:
start_time = time.time() # Record the start time of the loop

with self.lock: # Lock when accessing shared variables
centre_x_copy = int(self.center_x)
centre_y_copy = int(self.center_y)

if (centre_x_copy == 0 and centre_y_copy == 0):
diff_x = 0
diff_y = 0
else:
diff_x = (centre_x_copy - (640 / 2)) / 2
diff_y = -(centre_y_copy - (480 / 2)) / 2

self.send_gimbal_manager_pitch_yaw_angles(float("NaN"), float("NaN"), math.radians(diff_y), math.radians(diff_x))

# 50Hz
elapsed_time = time.time() - start_time
sleep_time = max(0, 0.02 - elapsed_time) # Ensure no negative sleep time
time.sleep(sleep_time)

def update_center(self, x, y):
with self.lock: # Lock when updating shared variables
self.center_x = x
self.center_y = y


class VideoStreamer:
def __init__(self, input_port, output_port):
self.input_port = input_port
self.output_port = output_port
self.cap = self.initialize_video_capture()
self.frame_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
self.frame_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
self.out = self.initialize_video_writer()
self.frame = None

self.tracker = cv2.legacy.TrackerKCF_create() # Change to legacy namespace
self.initBB = None
self.last_change_time = 0

def initialize_video_capture(self):
cap = cv2.VideoCapture(
f'udpsrc port={self.input_port} ! application/x-rtp,encoding-name=H264,payload=96 ! rtph264depay ! h264parse ! queue ! avdec_h264 ! videoconvert ! appsink',
cv2.CAP_GSTREAMER
)
if not cap.isOpened():
print("Error: Could not open video stream.")
exit()
return cap

def initialize_video_writer(self):
out = cv2.VideoWriter(
f'appsrc ! videoconvert ! x264enc speed-preset=ultrafast tune=zerolatency ! rtph264pay config-interval=1 ! udpsink host=127.0.0.1 port={self.output_port}',
cv2.CAP_GSTREAMER,
0,
30,
(self.frame_width, self.frame_height)
)
if not out.isOpened():
print("Error: Could not open video writer.")
self.cap.release()
exit()
return out

def update_tracker(self, new_roi):
self.initBB = new_roi
self.tracker = cv2.legacy.TrackerKCF_create() # Change to legacy namespace
if self.frame is not None:
self.tracker.init(self.frame, new_roi)

def process_frame(self, gimbal_control):
while True:
ret, self.frame = self.cap.read()
if not ret:
print("Error: Unable to read frame from video stream.")
break

# If a bounding box exists, update the tracker and draw the rectangle
if self.initBB is not None:
(success, box) = self.tracker.update(self.frame)
if success:
(x, y, w, h) = [int(v) for v in box]
center_x = x + ((w) // 2)
center_y = y + ((h) // 2)
gimbal_control.update_center(center_x, center_y)
cv2.rectangle(self.frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
self.last_change_time = time.time()
else:
if time.time() - self.last_change_time >= 0.2:
gimbal_control.update_center(0, 0)

# Write the frame to the RTSP stream
self.out.write(self.frame)

# Display the frame
cv2.imshow("Frame", self.frame)
key = cv2.waitKey(1) & 0xFF
if key == ord("q"):
break

self.cleanup()

def cleanup(self):
self.cap.release()
self.out.release()
cv2.destroyAllWindows()


class UDPReceiver:
def __init__(self, gimbal_control, video_streamer, port=14580):
self.gimbal_control = gimbal_control
self.video_streamer = video_streamer
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.bind(('127.0.0.1', port))
self.udp_thread = threading.Thread(target=self.receive_udp_data)
self.udp_thread.daemon = True
self.udp_thread.start()

def receive_udp_data(self):
while True:
data, addr = self.sock.recvfrom(16)
x, y, w, h = struct.unpack('!ffff', data)
print(f"Received new tracking coordinates: x={x}, y={y}, w={w}, h={h}")
# Convert normalized coordinates to pixel values
x, y, w, h = int(x * self.video_streamer.frame_width), int(y * self.video_streamer.frame_height), int(w * self.video_streamer.frame_width), int(h * self.video_streamer.frame_height)
self.video_streamer.update_tracker((x, y, w-x, h-y))


def main():
gimbal_control = GimbalControl('127.0.0.1:14560')
video_streamer = VideoStreamer(input_port=5600, output_port=5700)
udp_receiver = UDPReceiver(gimbal_control, video_streamer)

video_streamer.process_frame(gimbal_control)

if __name__ == "__main__":
main()

0 comments on commit d70f55d

Please sign in to comment.