-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathplugin_setter.py
55 lines (38 loc) · 1.36 KB
/
plugin_setter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import reapy
import numpy as np
import queue
import threading
from logger import setup_logger
from pythonosc import dispatcher, osc_server
from reapy import reascript_api as RPR
logging = setup_logger('Plugin setter')
# Create a queu for incoming data packages
data_queue = queue.Queue()
def process_data():
while True:
# Take one data package from the queue and process it
params = data_queue.get()
print(params)
with reapy.inside_reaper():
# Link and get current project
reapy.connect()
project = reapy.Project()
# Get track and plugin
track = project.tracks[0]
plugin = track.fxs[0]
for i, value in enumerate(params):
RPR.TrackFX_SetParam(track.id, plugin.index, i, value)
# Confirm data package has been processed
data_queue.task_done()
def set_plugin_params(unused_addr, *args):
# Convert args to numpy array
params = np.array(args)
data_queue.put(params)
# Set up OSC dispatcher and server
dispatcher = dispatcher.Dispatcher()
dispatcher.map('/interpolated_data', set_plugin_params)
server = osc_server.ThreadingOSCUDPServer(('localhost', 5106), dispatcher)
logging.info(f'Serving on: {server.server_address}')
data_thread = threading.Thread(target=process_data)
data_thread.start()
server.serve_forever()