Skip to content

Commit

Permalink
Merge pull request #3 from bilikyar/Arduino_nano
Browse files Browse the repository at this point in the history
Arduino nano
  • Loading branch information
wwj718 authored May 8, 2019
2 parents 8367924 + b5aed2c commit 54dc8ea
Show file tree
Hide file tree
Showing 3 changed files with 237 additions and 0 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
此插件使用[pymata-io](https://github.com/bilikyar/pymata-aio)库作为Arduino_nano的固件。

如果有什么问题,尽管开issue,欢迎来交流。
127 changes: 127 additions & 0 deletions arduino_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
'''
arduino nano
requirement:
pip3 install pymata-aio --user
'''
import zmq
from time import sleep

from pymata_aio.pymata3 import PyMata3
from pymata_aio.constants import Constants

# zmq socket
port = 38782
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:%s" % port)


def main():
ConnectedToArduino = False
while True:
if not ConnectedToArduino:
try:
board = PyMata3()
except:
pass
else:
ConnectedToArduino = True
board.set_pin_mode(13, Constants.OUTPUT)
board.digital_write(13, 1)

arduino_code = socket.recv_json().get("arduino_code")

if not arduino_code:
socket.send_json({"result": {
"pin_2_state": board.get_pin_state(2),
"pin_3_state": board.get_pin_state(3),
"pin_4_state": board.get_pin_state(4),
"pin_5_state": board.get_pin_state(5),
"pin_6_state": board.get_pin_state(6),
"pin_7_state": board.get_pin_state(7),
"pin_8_state": board.get_pin_state(8),
"pin_9_state": board.get_pin_state(9),
"pin_10_state": board.get_pin_state(10),
"pin_11_state": board.get_pin_state(11),
"pin_12_state": board.get_pin_state(12),
"pin_13_state": board.get_pin_state(13),
"digital_pin_2": board.digital_read(2),
"digital_pin_3": board.digital_read(3),
"digital_pin_4": board.digital_read(4),
"digital_pin_5": board.digital_read(5),
"digital_pin_6": board.digital_read(6),
"digital_pin_7": board.digital_read(7),
"digital_pin_8": board.digital_read(8),
"digital_pin_9": board.digital_read(9),
"digital_pin_10": board.digital_read(10),
"digital_pin_11": board.digital_read(11),
"digital_pin_12": board.digital_read(12),
"digital_pin_13": board.digital_read(13),
"analog_pin_0": board.analog_read(0),
"analog_pin_1": board.analog_read(1),
"analog_pin_2": board.analog_read(2),
"analog_pin_3": board.analog_read(3),
"analog_pin_4": board.analog_read(4),
"analog_pin_5": board.analog_read(5),
"analog_pin_6": board.analog_read(6),
"analog_pin_7": board.analog_read(7),
}})
sleep(0.05)
continue

if arduino_code == "quit!":
output = eval("board.shutdown()", {}, {
"board": board, "Constants": Constants})
socket.send_json({"result": "quit!"})
break
else:
try:
output = eval(arduino_code, {}, {
"board": board, "Constants": Constants})
# output = exec(arduino_code) # 安全性问题
except Exception as e:
output = e
socket.send_json({
"result": {
"output": str(output),
"pin_2_state": board.get_pin_state(2),
"pin_3_state": board.get_pin_state(3),
"pin_4_state": board.get_pin_state(4),
"pin_5_state": board.get_pin_state(5),
"pin_6_state": board.get_pin_state(6),
"pin_7_state": board.get_pin_state(7),
"pin_8_state": board.get_pin_state(8),
"pin_9_state": board.get_pin_state(9),
"pin_10_state": board.get_pin_state(10),
"pin_11_state": board.get_pin_state(11),
"pin_12_state": board.get_pin_state(12),
"pin_13_state": board.get_pin_state(13),
"digital_pin_2": board.digital_read(2),
"digital_pin_3": board.digital_read(3),
"digital_pin_4": board.digital_read(4),
"digital_pin_5": board.digital_read(5),
"digital_pin_6": board.digital_read(6),
"digital_pin_7": board.digital_read(7),
"digital_pin_8": board.digital_read(8),
"digital_pin_9": board.digital_read(9),
"digital_pin_10": board.digital_read(10),
"digital_pin_11": board.digital_read(11),
"digital_pin_12": board.digital_read(12),
"digital_pin_13": board.digital_read(13),
"analog_pin_0": board.analog_read(0),
"analog_pin_1": board.analog_read(1),
"analog_pin_2": board.analog_read(2),
"analog_pin_3": board.analog_read(3),
"analog_pin_4": board.analog_read(4),
"analog_pin_5": board.analog_read(5),
"analog_pin_6": board.analog_read(6),
"analog_pin_7": board.analog_read(7),
}})
sleep(0.05)

socket.close()
context.term()


if __name__ == '__main__':
main()
107 changes: 107 additions & 0 deletions extension_arduino_nano.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@

'''
Arduino
requirement:
pip3 install pymata-aio --user
'''
import zmq
import subprocess
import pathlib
import platform
import time
import threading

from codelab_adapter import settings
from codelab_adapter.core_extension import Extension


def get_python3_path():
# If it is not working, Please replace python3_path with your local python3 path. shell: which python3
if (platform.system() == "Darwin"):
# which python3
# 不如用PATH python
# 不确定
path = "/usr/local/bin/python3"
if platform.system() == "Windows":
path = "python3"
if platform.system() == "Linux":
path = "/usr/bin/python3"
return path


python3_path = get_python3_path()


class arduinoExtension(Extension):
def __init__(self):
name = type(self).__name__ # class name
super().__init__(name)
self.scratch3_message = {}
self.TOPIC = "eim/arduino"
self.first_start = 1


def run(self):
# 抽象掉这部分 Class
port = 38782 # todo 随机分配
context = zmq.Context.instance()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:%s" % port)

codelab_adapter_server_dir = pathlib.Path.home(
) / "codelab_adapter" / "servers"
script = "{}/arduino_server.py".format(codelab_adapter_server_dir)

cmd = [python3_path, script]
arduino_server = subprocess.Popen(cmd)
settings.running_child_procs.append(arduino_server)

lock = threading.Lock()

def request():
while self._running:
lock.acquire()
self.scratch3_message = self.read()
lock.release()

bg_task = threading.Thread(target=request)
self.logger.debug("thread start")
bg_task.daemon = True
bg_task.start()

while self._running:
scratch3_message = self.scratch3_message
self.logger.debug("scratch3_message {}".format(scratch3_message))
self.scratch3_message = {}
if scratch3_message == {}:
scratch3_message = {"topic": self.TOPIC, "payload": ""}

topic = scratch3_message.get('topic')
arduino_code = scratch3_message.get("payload")

if topic == self.TOPIC:
socket.send_json({"arduino_code": arduino_code})

result = socket.recv_json().get("result")

if self.first_start == 1:
self.publish({"topic": "eim/arduino/init","payload": ""})
self.first_start = 0

# 发往scratch3.0
self.publish({"topic": self.TOPIC,"payload": result})
time.sleep(0.05)




# release socket
socket.send_json({"arduino_code": "quit!"})
result = socket.recv_json().get("result")
arduino_server.terminate()
arduino_server.wait()
socket.close()
context.term()


export = arduinoExtension

0 comments on commit 54dc8ea

Please sign in to comment.