-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathsynth_controller.py
More file actions
154 lines (128 loc) · 5.16 KB
/
Copy pathsynth_controller.py
File metadata and controls
154 lines (128 loc) · 5.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import liblo
import threading
from osc_receiver import OscReceiver
import time
import subprocess
import re
import logging
class SynthController:
DEFAULT_PORT = 57120
@staticmethod
def kill_potential_engine_from_previous_process():
subprocess.call("killall --quiet sclang", shell=True)
subprocess.call("killall --quiet scsynth", shell=True)
def __init__(self, logger=None):
if logger is None:
logger = logging.getLogger("SynthController")
self.logger = logger
self._sc_process = None
self._sc_listener = None
self._listening_to_engine = False
def launch_engine(self, mode):
self.stop_engine()
f = open("sc/engine.sc")
engine = f.read()
f.close()
f = open("sc/%s.sc" % mode)
engine += f.read()
f.close()
out = open("sc/_compiled.sc", "w")
f = open("sc/boot.sc")
for line in f:
line = line.replace("//$ENGINE", engine)
print >>out, line,
f.close()
out.close()
self._sc_process = subprocess.Popen("sclang sc/_compiled.sc", shell=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE)
self._listening_to_engine = True
self.lang_port = None
initialized = False
while self.lang_port is None or not initialized:
line = self._sc_process.stdout.readline().strip()
print "SC: %s" % line
m = re.search('langPort=(\d+)', line)
if m:
self.lang_port = int(m.group(1))
elif line == "Receiving notification messages from server localhost":
initialized = True
self._sc_output_thread = threading.Thread(name="SynthController._sc_output_thread",
target=self._read_sc_output)
self._sc_output_thread.daemon = True
self._sc_output_thread.start()
def _read_sc_output(self):
while self._listening_to_engine:
line = self._sc_process.stdout.readline()
if line:
print "SC: %s" % line,
def connect(self, port):
self._lock = threading.Lock()
self.target = liblo.Address(port)
def subscribe_to_info(self):
self._load_results = {}
if not self._sc_listener:
self._sc_listener = OscReceiver(proto=liblo.TCP, name="SynthController")
self._sc_listener.add_method("/loaded", "ii", self._handle_loaded)
self._sc_listener.start()
self._send("/info_subscribe", self._sc_listener.port)
def stop_engine(self):
if self._sc_listener:
self._sc_listener.stop()
if self._sc_process:
self._sc_process.stdin.write("thisProcess.shutdown;\n")
self._sc_process.stdin.write("0.exit;\n")
if self._listening_to_engine:
self._listening_to_engine = False
self._send("/shutdown")
self.logger.info("waiting for SC output thread to finish")
self._sc_output_thread.join()
self.logger.info("closing SC pipe")
self._sc_process.stdin.close()
self._sc_process.stdout.close()
self.logger.info("waiting for SC process to exit")
self._sc_process.wait()
self.logger.info("SC process exited")
self.target = None
def load_sound(self, sound_id, filename):
self._send("/load", sound_id, filename)
num_frames_loaded = self._get_load_result(sound_id)
return num_frames_loaded
def _get_load_result(self, sound_id, timeout=10.0):
t = time.time()
while True:
if sound_id in self._load_results:
result = self._load_results[sound_id]
del self._load_results[sound_id]
return result
elif (time.time() - t) > timeout:
return None
else:
self._sc_listener.serve()
time.sleep(0.01)
def _handle_loaded(self,path, args, types, src, data):
sound_id, result = args
print "got /loaded %s" % args #TEMP
self._load_results[sound_id] = result
def free_sound(self, sound_id):
self._send("/free", sound_id)
def free_sounds(self):
self._send("/free_all")
def play_segment(self, segment_id, sound_id, begin, end, period_duration, looped_duration, channel, pan):
if looped_duration:
self._send("/loop", segment_id, sound_id, begin, end, period_duration, looped_duration, channel, pan)
else:
self._send("/play", segment_id, sound_id, begin, end, period_duration, channel, pan)
def pan(self, segment_id, pan):
self._send("/pan", segment_id, pan)
def stop_all(self):
self._send("/stop_all")
def sync_beep(self):
self._send("/sync_beep")
def subscribe_to_amp(self, port):
self._send("/amp_subscribe", port)
def subscribe_to_waveform(self, port):
self._send("/waveform_subscribe", port)
def _send(self, command, *args):
with self._lock:
liblo.send(self.target, command, *args)