Skip to content

Commit

Permalink
first commit
Browse files Browse the repository at this point in the history
  • Loading branch information
chris35469 committed Jul 5, 2024
0 parents commit b4dd650
Show file tree
Hide file tree
Showing 4 changed files with 389 additions and 0 deletions.
61 changes: 61 additions & 0 deletions cam.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import camera
import machine
import utime
from time import sleep

class CameraBoard():
def __init__(self, os, photo_directory = "/sd/"):
self.os = os
self.cam_loading = True
self.photo_directory = photo_directory

self.microsd_config = {
'miso':8,
'mosi':9,
'cs':21,
'sck':7,
}
self.sd = machine.SDCard(slot=3, width=1,
sck=machine.Pin(self.microsd_config['sck']),
mosi=machine.Pin(self.microsd_config['mosi']),
miso=machine.Pin(self.microsd_config['miso']),
cs=machine.Pin(self.microsd_config['cs']))

self.expansion_board_init()

def cam_init(self):
while self.cam_loading:
cam_ready = camera.init() # Camera
print("Camera ready?: ", cam_ready)
self.cam_loading = not cam_ready
sleep(1)

def expansion_board_init(self):
print("mounting")
print(self.sd.info())
self.os.mount(self.sd, "/sd")
print(self.os.listdir("/sd"))

def get_time(self):
current_time = utime.localtime()
hour = current_time[3]
minute = current_time[4]
sec = current_time[5]
return str(hour) + "_" + str(minute) + "_" + str(sec)

def save_photo(self, file_name, data):
file_path = self.photo_directory + file_name + ".jpg"
imgFile = open(file_path, "wb")
imgFile.write(data)
imgFile.close()

def take_photo(self):
#self.cam_init()
p=camera.capture()
print("photo_captured")
#self.close()
return p

def close(self):
camera.deinit()

61 changes: 61 additions & 0 deletions cam_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from cam import CameraBoard
from comms import Comms
import time
from math import floor

cam = CameraBoard(os)
comms = Comms()
comms.create_wifi_connection('wifi_credentials.json')
comms.create_rest_connection("{connection_id}", "{rest_end_point}")

def get_photo():
buf = cam.take_photo()
return buf

def convert_score(score):
return str(floor(score * 100))

# Take a photo and send a post request. The reponses
def analyze_vision():
try:
# Returns byte array of photo
img = get_photo()
time = cam.get_time()
photo_label = f"{time}_"

# Query contents of the image. Uses the following model by default
# https://api-inference.huggingface.co/models/hustvl/yolos-small
res = comms.img_query("py_anywhere", img)
print("--------------------------------")
if len(res) > 0:
for item in res:
if len(photo_label) < 30: # Restrict size of filename.
photo_label = f"{photo_label}_{item['label']}_{convert_score(item['score'])}"
print(item)
cam.save_photo(photo_label, img)
print(photo_label)

else:
print("No objects detected")
analyze_vision()
except Exception as e:
print(e)
time.sleep(3)
analyze_vision()


def main():
#comms.post_json("py_anywhere", {'msg': "hello world esp"})
#comms.post_img("py_anywhere", "final_photo", img)
#comms.post_query("py_anywhere", "How many suns are in the solar system?")
cam.cam_init()
analyze_vision()
time.sleep(3)
comms.wifi.disconnect()
#cam.close()

main()




262 changes: 262 additions & 0 deletions comms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
from micropython import const
import uasyncio as asyncio
import aioble
import bluetooth
import struct
from machine import Pin
from random import randint
import network
import urequests as requests
import ujson
import utime

class Wifi():
def __init__(self, credentials_file):
print("init wifi")
self.credentials_file = credentials_file
credentials = self.load_wifi_credentials()
self.ssid = credentials['ssid']
self.password = credentials['password']
self.sta = None
self.connect_s3()

def load_wifi_credentials(self):
try:
with open(self.credentials_file, 'r') as file:
data = ujson.load(file)
return data
except:
raise RuntimeError(f"unable to find {self.credentials_file} file")


def connect_s3(self):
self.sta = network.WLAN(network.STA_IF)
try:
self.sta.disconnect()
except Exception as e:
print(e)

self.sta.active(True)
# Fill in your network name (SSID) and password here:
if not self.sta.isconnected():
self.sta.connect(self.ssid, self.password)
print("connecting....")
while not self.sta.isconnected():
utime.sleep(1) # sleep for a second
print("Network connected")

def connect(self):
self.sta = network.WLAN(network.STA_IF)
#self.sta.disconnect()
if not self.sta.isconnected():
print('connecting to network...')
self.sta.active(True)
self.sta.connect(self.ssid, self.password)
while not self.sta.isconnected():
pass
print("wifi connected")
#self.post_req()
return 0
print("wifi connected")
#self.post_req()

def disconnect(self):
self.sta.disconnect()
print(self.ssid + " disconnected")


class BLE():
def __init__(self, service_uuid, name):
self.service_uuid = bluetooth.UUID(service_uuid)
self.service = self._register_service(self.service_uuid)
self.name = name
self.characteristics = {}
self.ADV_INTERVAL_MS = 250_000
print("init BLE.")


# Helper to encode the data characteristic UTF-8
def _encode_data(self, data):
return str(data).encode('utf-8')

# Helper to decode string data
def _decode_str(self, data):
try:
string_msg = data.decode("utf-8")
return string_msg
except Exception as e:
print("Error decoding data:", e)
return None

# Helper to decode int data
def _decode_int(self, data):
try:
number = int.from_bytes(data, 'big')
return number
except Exception as e:
print("Error decoding data:", e)
return None

def _add_characteristic(self, id, characteristic):
self.characteristics[id] = characteristic

def get_characteristics(self):
print(self.characteristics)
return self.characteristics

def register_characteristic(self, characteristic_id, UUID, read=False, write=False, notify=False, capture=False):
if self.service is not None:
UUID = bluetooth.UUID(UUID)
_characteristic = aioble.Characteristic(self.service, UUID, read=read, notify=notify, write=write, capture=capture)
self._add_characteristic(characteristic_id, _characteristic)
else:
print("Service not defined.")

def remove_characteristic(self, id, characteristic):
self.characteristics[id].pop()

def _register_service(self, UUID):
print(aioble)
return aioble.Service(self.service_uuid)


def register_services(self):
aioble.register_services(self.service)

# Get sensor readings
def get_random_value(self):
return randint(0,100)

def write_characteristic(self, value, characteristic):
self.characteristics[characteristic].write(self._encode_data(value), send_update=True)

# Get new value and update characteristic
async def sensor_task(self, characteristic, sensor_callback):
while True:
value = sensor_callback()
#self.write_characteristic(value, characteristic)
self.characteristics[characteristic].write(self._encode_data(value), send_update=True)
#print('New random value written: ', value)
await asyncio.sleep_ms(1000)

async def wait_for_int_write(self, characteristic):
while True:
try:
connection, data = await self.characteristics[characteristic].written()
number = self._decode_int(data)
print("number: " + number)
except Exception as e:
print("Error decoding int data:", e)
return None

async def wait_for_str_write(self, characteristic):
while True:
try:
connection, data = await self.characteristics[characteristic].written()
string = self._decode_str(data)
print("string: " + string)
except Exception as e:
print("Error decoding int data:", e)
return None

# Serially wait for connections. Don't advertise while a central is connected.
async def wait_for_connection(self):
print("INTERVAL: " + " " + str(self.ADV_INTERVAL_MS))
print("Service UUID: " + " " + str(self.service_uuid))
while True:
try:
async with await aioble.advertise(
self.ADV_INTERVAL_MS,
name=self.name,
services=[self.service_uuid],
) as connection:
print("Connection from", connection.device)
await connection.disconnected()
except asyncio.CancelledError:
# Catch the CancelledError
print("Peripheral task cancelled")
except Exception as e:
print("Error in peripheral_task:", e)
finally:
# Ensure the loop continues to the next iteration
await asyncio.sleep_ms(100)

class MQTT_CLIENT_COMM():
def __init__(self):
print("init mqtt client")

def create_mqtt_connection(self):
print("create_mqtt_connection")

class Comms():
def __init__(self):
self.ble = None
self.wifi = None
self.mqtt = None
self.rest_connections = {}
print("init comms")

def create_ble_connection(self, service_uuid, name):
self.ble = BLE(service_uuid, name)
return self.ble

def create_wifi_connection(self, credentials_file):
self.wifi = Wifi(credentials_file)

def create_rest_connection(self, end_point_id, base_url):
self.rest_connections[end_point_id] = REST(base_url)
print(self.rest_connections)

def get_rest_connection(self, end_point_id):
return self.rest_connections[end_point_id]

def post_json(self, end_point_id, json):
self.rest_connections[end_point_id].post_json(json)

def post_query(self, end_point_id, query):
self.rest_connections[end_point_id].post_query(query)

def post_img(self, end_point_id, file_name, img):
self.get_rest_connection(end_point_id).post_img(file_name, img)

def img_query(self, end_point_id, img, query_type="yolo"):
return self.get_rest_connection(end_point_id).img_query(img, query_type)

def is_wifi_connected(self):
return network.WLAN(network.STA_IF).isconnected()

class REST():
def __init__(self, end_point_url):
self.end_point = end_point_url

def post_json(self, json_obj):
#BASE_URL = f"https://northcsc.pythonanywhere.com/?input=esp"
print(f"POST: {self.end_point}")
post_data = ujson.dumps(json_obj)
res = requests.post(url=self.end_point, headers = ({'content-type': 'application/json'}), data = (post_data)).json()
#print(res)
return res

def post_query(self, query, query_type="sentiment"):
_headers = {'content-type': 'application/json'}
_end_point = f"{self.end_point}/{query_type}/"
json_obj = ujson.dumps({'input': query})
res = requests.post(url=_end_point, headers = (_headers), data = (json_obj)).json()
print(res)

def post_img(self, file_name, img):
_headers = {'content-type': 'image/jpeg'}
_end_point = f"{self.end_point}/?file_name={file_name}"
print("end_point")
print(_end_point)
res = requests.post(url=_end_point, headers = (_headers), data = (img)).json()
print(res)
return res

def img_query(self, img, query_type="yolo"):
_headers = {'content-type': 'image/jpeg'}
_end_point = f"{self.end_point}/{query_type}/"
print("request_sent")
res = requests.post(url=_end_point, headers = (_headers), data = (img)).json()
return res

5 changes: 5 additions & 0 deletions wifi_credentials.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"ssid": "",
"password": ""
}

0 comments on commit b4dd650

Please sign in to comment.