-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwire.py
55 lines (45 loc) · 1.83 KB
/
wire.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#!/usr/bin/env python3
"""Pass input directly to output.
See https://www.assembla.com/spaces/portaudio/subversion/source/HEAD/portaudio/trunk/test/patest_wire.c
"""
import argparse
import logging
def int_or_str(text):
"""Helper function for argument parsing."""
try:
return int(text)
except ValueError:
return text
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("-i", "--input-device", type=int_or_str,
help="input device ID or substring")
parser.add_argument("-o", "--output-device", type=int_or_str,
help="output device ID or substring")
parser.add_argument("-c", "--channels", type=int, default=2,
help="number of channels")
parser.add_argument("-t", "--dtype", help="audio data type")
parser.add_argument("-s", "--samplerate", type=float, help="sampling rate")
parser.add_argument("-b", "--blocksize", type=int, help="block size")
parser.add_argument("-l", "--latency", type=float, help="latency in seconds")
args = parser.parse_args()
try:
import sounddevice as sd
cumulated_status = sd.CallbackFlags()
def callback(indata, outdata, frames, time, status):
global cumulated_status
cumulated_status |= status
outdata[:] = indata
with sd.Stream(device=(args.input_device, args.output_device),
samplerate=args.samplerate, blocksize=args.blocksize,
dtype=args.dtype, latency=args.latency,
channels=args.channels, callback=callback):
print("#" * 80)
print("press Return to quit")
print("#" * 80)
input()
if cumulated_status:
logging.warning(str(cumulated_status))
except KeyboardInterrupt:
parser.exit('\nInterrupted by user')
except Exception as e:
parser.exit(type(e).__name__ + ': ' + str(e))