From b4dd6507e725557c92c82b6ae3cc7b7f2d179843 Mon Sep 17 00:00:00 2001 From: Chris Crawford Date: Fri, 5 Jul 2024 12:23:45 -0500 Subject: [PATCH] first commit --- cam.py | 61 ++++++++++ cam_test.py | 61 ++++++++++ comms.py | 262 ++++++++++++++++++++++++++++++++++++++++++ wifi_credentials.json | 5 + 4 files changed, 389 insertions(+) create mode 100644 cam.py create mode 100644 cam_test.py create mode 100644 comms.py create mode 100644 wifi_credentials.json diff --git a/cam.py b/cam.py new file mode 100644 index 0000000..f42464f --- /dev/null +++ b/cam.py @@ -0,0 +1,61 @@ +import camera +import machine +import utime +from time import sleep + +class CameraBoard(): + def __init__(self, os, photo_directory = "/sd/"): + self.os = os + self.cam_loading = True + self.photo_directory = photo_directory + + self.microsd_config = { + 'miso':8, + 'mosi':9, + 'cs':21, + 'sck':7, + } + self.sd = machine.SDCard(slot=3, width=1, + sck=machine.Pin(self.microsd_config['sck']), + mosi=machine.Pin(self.microsd_config['mosi']), + miso=machine.Pin(self.microsd_config['miso']), + cs=machine.Pin(self.microsd_config['cs'])) + + self.expansion_board_init() + + def cam_init(self): + while self.cam_loading: + cam_ready = camera.init() # Camera + print("Camera ready?: ", cam_ready) + self.cam_loading = not cam_ready + sleep(1) + + def expansion_board_init(self): + print("mounting") + print(self.sd.info()) + self.os.mount(self.sd, "/sd") + print(self.os.listdir("/sd")) + + def get_time(self): + current_time = utime.localtime() + hour = current_time[3] + minute = current_time[4] + sec = current_time[5] + return str(hour) + "_" + str(minute) + "_" + str(sec) + + def save_photo(self, file_name, data): + file_path = self.photo_directory + file_name + ".jpg" + imgFile = open(file_path, "wb") + imgFile.write(data) + imgFile.close() + + def take_photo(self): + #self.cam_init() + p=camera.capture() + print("photo_captured") + #self.close() + return p + + def close(self): + camera.deinit() + diff --git a/cam_test.py b/cam_test.py new file mode 100644 index 0000000..c4a31e4 --- /dev/null +++ b/cam_test.py @@ -0,0 +1,61 @@ +from cam import CameraBoard +from comms import Comms +import time +from math import floor + +cam = CameraBoard(os) +comms = Comms() +comms.create_wifi_connection('wifi_credentials.json') +comms.create_rest_connection("{connection_id}", "{rest_end_point}") + +def get_photo(): + buf = cam.take_photo() + return buf + +def convert_score(score): + return str(floor(score * 100)) + +# Take a photo and send a post request. The reponses +def analyze_vision(): + try: + # Returns byte array of photo + img = get_photo() + time = cam.get_time() + photo_label = f"{time}_" + + # Query contents of the image. Uses the following model by default + # https://api-inference.huggingface.co/models/hustvl/yolos-small + res = comms.img_query("py_anywhere", img) + print("--------------------------------") + if len(res) > 0: + for item in res: + if len(photo_label) < 30: # Restrict size of filename. + photo_label = f"{photo_label}_{item['label']}_{convert_score(item['score'])}" + print(item) + cam.save_photo(photo_label, img) + print(photo_label) + + else: + print("No objects detected") + analyze_vision() + except Exception as e: + print(e) + time.sleep(3) + analyze_vision() + + +def main(): + #comms.post_json("py_anywhere", {'msg': "hello world esp"}) + #comms.post_img("py_anywhere", "final_photo", img) + #comms.post_query("py_anywhere", "How many suns are in the solar system?") + cam.cam_init() + analyze_vision() + time.sleep(3) + comms.wifi.disconnect() + #cam.close() + +main() + + + + diff --git a/comms.py b/comms.py new file mode 100644 index 0000000..c6363da --- /dev/null +++ b/comms.py @@ -0,0 +1,262 @@ +from micropython import const +import uasyncio as asyncio +import aioble +import bluetooth +import struct +from machine import Pin +from random import randint +import network +import urequests as requests +import ujson +import utime + +class Wifi(): + def __init__(self, credentials_file): + print("init wifi") + self.credentials_file = credentials_file + credentials = self.load_wifi_credentials() + self.ssid = credentials['ssid'] + self.password = credentials['password'] + self.sta = None + self.connect_s3() + + def load_wifi_credentials(self): + try: + with open(self.credentials_file, 'r') as file: + data = ujson.load(file) + return data + except: + raise RuntimeError(f"unable to find {self.credentials_file} file") + + + def connect_s3(self): + self.sta = network.WLAN(network.STA_IF) + try: + self.sta.disconnect() + except Exception as e: + print(e) + + self.sta.active(True) + # Fill in your network name (SSID) and password here: + if not self.sta.isconnected(): + self.sta.connect(self.ssid, self.password) + print("connecting....") + while not self.sta.isconnected(): + utime.sleep(1) # sleep for a second + print("Network connected") + + def connect(self): + self.sta = network.WLAN(network.STA_IF) + #self.sta.disconnect() + if not self.sta.isconnected(): + print('connecting to network...') + self.sta.active(True) + self.sta.connect(self.ssid, self.password) + while not self.sta.isconnected(): + pass + print("wifi connected") + #self.post_req() + return 0 + print("wifi connected") + #self.post_req() + + def disconnect(self): + self.sta.disconnect() + print(self.ssid + " disconnected") + + +class BLE(): + def __init__(self, service_uuid, name): + self.service_uuid = bluetooth.UUID(service_uuid) + self.service = self._register_service(self.service_uuid) + self.name = name + self.characteristics = {} + self.ADV_INTERVAL_MS = 250_000 + print("init BLE.") + + + # Helper to encode the data characteristic UTF-8 + def _encode_data(self, data): + return str(data).encode('utf-8') + + # Helper to decode string data + def _decode_str(self, data): + try: + string_msg = data.decode("utf-8") + return string_msg + except Exception as e: + print("Error decoding data:", e) + return None + + # Helper to decode int data + def _decode_int(self, data): + try: + number = int.from_bytes(data, 'big') + return number + except Exception as e: + print("Error decoding data:", e) + return None + + def _add_characteristic(self, id, characteristic): + self.characteristics[id] = characteristic + + def get_characteristics(self): + print(self.characteristics) + return self.characteristics + + def register_characteristic(self, characteristic_id, UUID, read=False, write=False, notify=False, capture=False): + if self.service is not None: + UUID = bluetooth.UUID(UUID) + _characteristic = aioble.Characteristic(self.service, UUID, read=read, notify=notify, write=write, capture=capture) + self._add_characteristic(characteristic_id, _characteristic) + else: + print("Service not defined.") + + def remove_characteristic(self, id, characteristic): + self.characteristics[id].pop() + + def _register_service(self, UUID): + print(aioble) + return aioble.Service(self.service_uuid) + + + def register_services(self): + aioble.register_services(self.service) + + # Get sensor readings + def get_random_value(self): + return randint(0,100) + + def write_characteristic(self, value, characteristic): + self.characteristics[characteristic].write(self._encode_data(value), send_update=True) + + # Get new value and update characteristic + async def sensor_task(self, characteristic, sensor_callback): + while True: + value = sensor_callback() + #self.write_characteristic(value, characteristic) + self.characteristics[characteristic].write(self._encode_data(value), send_update=True) + #print('New random value written: ', value) + await asyncio.sleep_ms(1000) + + async def wait_for_int_write(self, characteristic): + while True: + try: + connection, data = await self.characteristics[characteristic].written() + number = self._decode_int(data) + print("number: " + number) + except Exception as e: + print("Error decoding int data:", e) + return None + + async def wait_for_str_write(self, characteristic): + while True: + try: + connection, data = await self.characteristics[characteristic].written() + string = self._decode_str(data) + print("string: " + string) + except Exception as e: + print("Error decoding int data:", e) + return None + + # Serially wait for connections. Don't advertise while a central is connected. + async def wait_for_connection(self): + print("INTERVAL: " + " " + str(self.ADV_INTERVAL_MS)) + print("Service UUID: " + " " + str(self.service_uuid)) + while True: + try: + async with await aioble.advertise( + self.ADV_INTERVAL_MS, + name=self.name, + services=[self.service_uuid], + ) as connection: + print("Connection from", connection.device) + await connection.disconnected() + except asyncio.CancelledError: + # Catch the CancelledError + print("Peripheral task cancelled") + except Exception as e: + print("Error in peripheral_task:", e) + finally: + # Ensure the loop continues to the next iteration + await asyncio.sleep_ms(100) + +class MQTT_CLIENT_COMM(): + def __init__(self): + print("init mqtt client") + + def create_mqtt_connection(self): + print("create_mqtt_connection") + +class Comms(): + def __init__(self): + self.ble = None + self.wifi = None + self.mqtt = None + self.rest_connections = {} + print("init comms") + + def create_ble_connection(self, service_uuid, name): + self.ble = BLE(service_uuid, name) + return self.ble + + def create_wifi_connection(self, credentials_file): + self.wifi = Wifi(credentials_file) + + def create_rest_connection(self, end_point_id, base_url): + self.rest_connections[end_point_id] = REST(base_url) + print(self.rest_connections) + + def get_rest_connection(self, end_point_id): + return self.rest_connections[end_point_id] + + def post_json(self, end_point_id, json): + self.rest_connections[end_point_id].post_json(json) + + def post_query(self, end_point_id, query): + self.rest_connections[end_point_id].post_query(query) + + def post_img(self, end_point_id, file_name, img): + self.get_rest_connection(end_point_id).post_img(file_name, img) + + def img_query(self, end_point_id, img, query_type="yolo"): + return self.get_rest_connection(end_point_id).img_query(img, query_type) + + def is_wifi_connected(self): + return network.WLAN(network.STA_IF).isconnected() + +class REST(): + def __init__(self, end_point_url): + self.end_point = end_point_url + + def post_json(self, json_obj): + #BASE_URL = f"https://northcsc.pythonanywhere.com/?input=esp" + print(f"POST: {self.end_point}") + post_data = ujson.dumps(json_obj) + res = requests.post(url=self.end_point, headers = ({'content-type': 'application/json'}), data = (post_data)).json() + #print(res) + return res + + def post_query(self, query, query_type="sentiment"): + _headers = {'content-type': 'application/json'} + _end_point = f"{self.end_point}/{query_type}/" + json_obj = ujson.dumps({'input': query}) + res = requests.post(url=_end_point, headers = (_headers), data = (json_obj)).json() + print(res) + + def post_img(self, file_name, img): + _headers = {'content-type': 'image/jpeg'} + _end_point = f"{self.end_point}/?file_name={file_name}" + print("end_point") + print(_end_point) + res = requests.post(url=_end_point, headers = (_headers), data = (img)).json() + print(res) + return res + + def img_query(self, img, query_type="yolo"): + _headers = {'content-type': 'image/jpeg'} + _end_point = f"{self.end_point}/{query_type}/" + print("request_sent") + res = requests.post(url=_end_point, headers = (_headers), data = (img)).json() + return res + diff --git a/wifi_credentials.json b/wifi_credentials.json new file mode 100644 index 0000000..00a30d2 --- /dev/null +++ b/wifi_credentials.json @@ -0,0 +1,5 @@ +{ + "ssid": "", + "password": "" +} +