Files
pytheory/test_live.py
T
2026-03-29 20:23:03 -04:00

718 lines
30 KiB
Python

"""PyTheory Live — interactive MIDI synthesizer with TUI."""
import curses
import random
import sys
import threading
import time
import os
from pytheory.live import LiveEngine
from pytheory.rhythm import INSTRUMENTS, Pattern
NOTE_NAMES = ['C', 'C#', 'D', 'D#', 'E', 'F', 'F#', 'G', 'G#', 'A', 'A#', 'B']
def note_name(midi):
return f"{NOTE_NAMES[midi % 12]}{midi // 12 - 1}"
class LiveTUI:
def __init__(self, seed=None, port="OP-XY"):
self.seed = seed or random.randint(0, 9999)
self.port = port
self.engine = None
self.log_lines = []
self.max_log = 500
self.running = True
self.instruments = sorted([k for k in INSTRUMENTS.keys()
if k not in ("808_bass",)])
self.drum_patterns = sorted(Pattern.list_presets())
self.current_drum = "rock"
self.picks = []
self.bpm = ""
self.status = "Init"
self.status_color = 3 # yellow
self._build_engine()
def _build_engine(self):
rng = random.Random(self.seed)
self.picks = rng.sample(self.instruments, 8)
self.engine = LiveEngine(buffer_size=128)
for i, inst in enumerate(self.picks, 1):
self.engine.channel(i, instrument=inst, reverb=0.3)
self.engine.drums(self.current_drum, volume=0.5)
self.engine.cc(0, "lowpass", min_val=300, max_val=8000)
def log(self, msg, color=0):
self.log_lines.append((time.time(), msg, color))
if len(self.log_lines) > self.max_log:
self.log_lines = self.log_lines[-self.max_log:]
def _patch_engine_logging(self):
original_cb = self.engine._midi_callback
def logging_cb(event, data=None):
msg, _ = event
if len(msg) == 0:
return
if msg[0] == 0xF8:
if self.engine._bpm > 10:
self.bpm = f"{self.engine._bpm:.0f}"
elif msg[0] == 0xFA:
self.log("▶ Start", 5)
self.status = "Playing"
self.status_color = 1
elif msg[0] == 0xFC:
self.log("■ Stop", 4)
self.status = "Stopped"
self.status_color = 4
elif msg[0] == 0xFB:
self.log("▶ Continue", 5)
self.status = "Playing"
self.status_color = 1
elif len(msg) >= 3:
status = msg[0]
ch = (status & 0x0F) + 1
msg_type = status & 0xF0
if msg_type == 0x90 and msg[2] > 0:
inst = self.picks[ch - 1] if 1 <= ch <= 8 else "?"
self.log(f"{ch}:{inst} {note_name(msg[1])} v={msg[2]}", 1)
elif msg_type == 0xB0:
self.log(f"⚙ CC{msg[1]}={msg[2]}", 3)
elif msg_type == 0xE0:
bend = ((msg[2] << 7) | msg[1]) - 8192
self.log(f"↕ Bend ch{ch} {bend:+d}", 3)
original_cb(event, data)
self.engine._midi_callback = logging_cb
def run(self, stdscr):
curses.curs_set(0)
curses.use_default_colors()
curses.init_pair(1, curses.COLOR_GREEN, -1)
curses.init_pair(2, curses.COLOR_CYAN, -1)
curses.init_pair(3, curses.COLOR_YELLOW, -1)
curses.init_pair(4, curses.COLOR_RED, -1)
curses.init_pair(5, curses.COLOR_MAGENTA, -1)
curses.init_pair(6, curses.COLOR_BLACK, curses.COLOR_BLUE)
curses.init_pair(7, curses.COLOR_BLACK, curses.COLOR_GREEN)
curses.init_pair(8, curses.COLOR_BLACK, curses.COLOR_YELLOW)
curses.init_pair(9, curses.COLOR_BLACK, curses.COLOR_RED)
stdscr.nodelay(True)
stdscr.timeout(60)
self._patch_engine_logging()
# Pre-render with progress
self.status = "Rendering"
self.status_color = 3
stdscr.erase()
stdscr.addstr(1, 2, "PyTheory Live", curses.A_BOLD)
stdscr.addstr(2, 2, "Pre-rendering wavetables...", curses.color_pair(3))
stdscr.refresh()
n_samples = 44100 * 3
count = 0
for _, channel in self.engine.channels.items():
if channel.is_drums:
continue
for midi_note in range(36, 97):
channel._get_wave(midi_note, n_samples)
count += 1
if count % 50 == 0:
stdscr.addstr(3, 2, f" {count} wavetables...", curses.color_pair(2))
stdscr.refresh()
self.log(f"Cached {count} wavetables", 1)
self.log("Starting engine...", 3)
self.status = "Starting"
self.status_color = 3
# Start engine
def run_engine():
import io
capture = io.StringIO()
old = sys.stdout
sys.stdout = capture
try:
self.engine.start(port=self.port)
except Exception as e:
self.log(f"Engine error: {e}", 4)
finally:
sys.stdout = old
output = capture.getvalue()
if output.strip():
for line in output.strip().split('\n'):
self.log(line.strip(), 2)
engine_thread = threading.Thread(target=run_engine, daemon=True)
engine_thread.start()
cmd_buf = ""
cursor_pos = 0
cmd_history = []
history_idx = -1
tab_matches = []
tab_idx = -1
tab_prefix = ""
self.kbd_active = False
while self.running:
try:
# Update status from engine state
if (self.engine._stream and self.engine._stream.active
and self.status == "Starting"):
self.status = "Listening"
self.status_color = 2
self.log("Audio stream active", 1)
h, w = stdscr.getmaxyx()
if h < 10 or w < 40:
time.sleep(0.1)
continue
stdscr.erase()
div = max(30, w * 3 // 5)
cfg_x = div + 2
# ═══ HEADER BAR ═══
header = f" PyTheory Live "
stdscr.addstr(0, 0, header, curses.color_pair(6) | curses.A_BOLD)
# Status badge
badge_colors = {1: 7, 2: 6, 3: 8, 4: 9}
badge_cp = badge_colors.get(self.status_color, 6)
badge = f" {self.status} "
x = len(header)
stdscr.addstr(0, x, badge, curses.color_pair(badge_cp) | curses.A_BOLD)
x += len(badge)
kbd_mode = ""
if self.engine._keyboard_channel:
kbd_mode = f" KBD:ch{self.engine._keyboard_channel}"
rec_mode = " ●REC" if self.engine._recording else ""
info = f" BPM:{self.bpm} drums:{self.current_drum} seed:{self.seed}{kbd_mode}{rec_mode}"
try:
stdscr.addstr(0, x, info[:w - x], curses.color_pair(6))
# Fill rest of header
remaining = w - x - len(info)
if remaining > 0:
stdscr.addstr(0, x + len(info), " " * remaining, curses.color_pair(6))
except curses.error:
pass
# ═══ DIVIDER ═══
for y in range(1, h - 2):
try:
stdscr.addch(y, div, '', curses.color_pair(2) | curses.A_DIM)
except curses.error:
pass
# ═══ LEFT: EVENTS ═══
try:
stdscr.addstr(1, 1, " Events ", curses.color_pair(2) | curses.A_BOLD)
except curses.error:
pass
log_h = h - 5
visible = self.log_lines[-log_h:] if log_h > 0 else []
for i, (ts, msg, color) in enumerate(visible):
ly = 2 + i
if ly >= h - 3:
break
# Fade old messages
age = time.time() - ts
attr = curses.A_DIM if age > 8 else 0
try:
stdscr.addstr(ly, 1, msg[:div - 2],
curses.color_pair(color) | attr)
except curses.error:
pass
# ═══ RIGHT: CONFIG ═══
try:
stdscr.addstr(1, cfg_x, " Config ",
curses.color_pair(2) | curses.A_BOLD)
except curses.error:
pass
y = 3
rw = w - cfg_x - 1
for i, inst in enumerate(self.picks, 1):
try:
stdscr.addstr(y, cfg_x, f"{i}", curses.A_BOLD)
stdscr.addstr(y, cfg_x + 1, ":", curses.A_DIM)
stdscr.addstr(y, cfg_x + 2, inst[:rw - 2],
curses.color_pair(1))
except curses.error:
pass
y += 1
y += 1
# VU meters per channel
y += 1
try:
stdscr.addstr(y, cfg_x, "Levels:", curses.A_BOLD | curses.A_DIM)
except curses.error:
pass
y += 1
for i, inst in enumerate(self.picks, 1):
if i in self.engine.channels:
lv = self.engine.channels[i].level
bars = int(min(lv, 1.0) * 16)
meter = "|" * bars + "-" * (16 - bars)
color = 1 if bars < 15 else 3 if bars < 18 else 4
try:
stdscr.addstr(y, cfg_x, f"{i}", curses.A_BOLD)
stdscr.addstr(y, cfg_x + 1, f" {meter}", curses.color_pair(color))
except curses.error:
pass
y += 1
y += 1
rec_indicator = " ● REC " if self.engine._recording else ""
kbd_indicator = f" kbd:ch{self.engine._keyboard_channel} oct{self.engine._keyboard_octave}" if self.engine._keyboard_channel else ""
pairs = [
("Drums", self.current_drum, 5),
("BPM", str(self.bpm), 0),
("Latency", f"{self.engine.buffer_size / 44100 * 1000:.1f}ms", 0),
("MIDI", self.port, 0),
("Seed", str(self.seed), 2),
]
if rec_indicator:
pairs.append(("", rec_indicator, 4))
if kbd_indicator:
pairs.append(("", kbd_indicator, 2))
for label, val, cp in pairs:
try:
stdscr.addstr(y, cfg_x, f"{label}:", curses.A_BOLD)
stdscr.addstr(y, cfg_x + len(label) + 1, f" {val}"[:rw],
curses.color_pair(cp))
except curses.error:
pass
y += 1
y += 1
cmds = [
"ch <n> [inst]",
"fx <n> <param> <val>",
"pan <n> <-1..1>",
"drums [pattern|-]",
"kbd [ch] [oct]",
"rec / stop / export",
"save/load <file>",
"seed [n] / list",
"exit",
]
try:
stdscr.addstr(y, cfg_x, "Commands:", curses.color_pair(3))
except curses.error:
pass
for i, c in enumerate(cmds):
try:
stdscr.addstr(y + 1 + i, cfg_x + 1, c[:rw],
curses.color_pair(2) | curses.A_DIM)
except curses.error:
pass
# ═══ INPUT BAR ═══
iy = h - 2
try:
stdscr.addstr(iy - 1, 0, "" * (w - 1), curses.A_DIM)
if self.kbd_active:
stdscr.addstr(iy, 0, " KEYBOARD ",
curses.color_pair(7) | curses.A_BOLD)
stdscr.addstr(iy, 10, " Esc=exit Up/Down=octave ",
curses.color_pair(3))
else:
stdscr.addstr(iy, 0, " $ ",
curses.color_pair(1) | curses.A_BOLD)
stdscr.addstr(iy, 3, cmd_buf[:w - 5])
# Cursor at position
cx = 3 + cursor_pos
if cx < w - 1:
# Show character under cursor inverted, or block at end
if cursor_pos < len(cmd_buf):
stdscr.addstr(iy, cx, cmd_buf[cursor_pos],
curses.A_REVERSE)
else:
stdscr.addstr(iy, cx, " ", curses.A_REVERSE)
except curses.error:
pass
stdscr.refresh()
# ═══ INPUT ═══
ch = stdscr.getch()
if ch == -1:
continue
# KEYBOARD MODE: all keys go to MIDI
if self.kbd_active:
if ch == 27: # Escape exits keyboard mode
self.kbd_active = False
self.engine._keyboard_channel = None
self.log("Keyboard off (Esc)", 3)
elif ch == curses.KEY_UP:
self.engine._keyboard_octave = min(8, self.engine._keyboard_octave + 1)
self.log(f"Octave ↑ {self.engine._keyboard_octave}", 2)
elif ch == curses.KEY_DOWN:
self.engine._keyboard_octave = max(0, self.engine._keyboard_octave - 1)
self.log(f"Octave ↓ {self.engine._keyboard_octave}", 2)
elif 32 <= ch < 127:
key = chr(ch).lower()
played = self.engine.keyboard_note(key, on=True)
if played:
ch_num = self.engine._keyboard_channel
if ch_num in self.engine.channels:
channel = self.engine.channels[ch_num]
nv = len(channel.voices)
vol = channel.volume
lv = channel.level
# Check if wavetable has audio
cache_peek = ""
if channel._cache:
first_wave = next(iter(channel._cache.values()))
peak = abs(first_wave).max()
cache_peek = f" wpeak={peak:.3f}"
stream_ok = "stream:OK" if self.engine._stream and self.engine._stream.active else "stream:OFF"
self.log(f" key:{key} v={nv} vol={vol} lv={lv:.3f}{cache_peek} {stream_ok}", 2)
def _off(k=key):
time.sleep(0.25)
self.engine.keyboard_note(k, on=False)
threading.Thread(target=_off, daemon=True).start()
else:
self.log(f" unmapped:{key}", 3)
continue
if ch == 10 or ch == 13:
if cmd_buf.strip():
cmd_history.append(cmd_buf)
self._handle_command(cmd_buf.strip())
cmd_buf = ""
cursor_pos = 0
history_idx = -1
elif ch == 27:
if self.engine._keyboard_channel and not cmd_buf:
# Escape exits keyboard mode
self.engine._keyboard_channel = None
self.log("Keyboard off (Esc)", 3)
else:
cmd_buf = ""
cursor_pos = 0
elif ch == curses.KEY_BACKSPACE or ch == 127:
if cursor_pos > 0:
cmd_buf = cmd_buf[:cursor_pos - 1] + cmd_buf[cursor_pos:]
cursor_pos -= 1
elif ch == curses.KEY_LEFT:
cursor_pos = max(0, cursor_pos - 1)
elif ch == curses.KEY_RIGHT:
cursor_pos = min(len(cmd_buf), cursor_pos + 1)
elif ch == curses.KEY_HOME or ch == 1: # Ctrl-A
cursor_pos = 0
elif ch == curses.KEY_END or ch == 5: # Ctrl-E
cursor_pos = len(cmd_buf)
elif ch == curses.KEY_UP:
if cmd_history and history_idx < len(cmd_history) - 1:
history_idx += 1
cmd_buf = cmd_history[-(history_idx + 1)]
cursor_pos = len(cmd_buf)
elif ch == curses.KEY_DOWN:
if history_idx > 0:
history_idx -= 1
cmd_buf = cmd_history[-(history_idx + 1)]
cursor_pos = len(cmd_buf)
else:
history_idx = -1
cmd_buf = ""
cursor_pos = 0
elif ch == 9: # Tab
if tab_matches and tab_prefix == cmd_buf:
tab_idx = (tab_idx + 1) % len(tab_matches)
cmd_buf = tab_matches[tab_idx]
else:
tab_matches = self._complete(cmd_buf)
tab_prefix = cmd_buf
if len(tab_matches) == 1:
cmd_buf = tab_matches[0]
tab_matches = []
elif tab_matches:
tab_idx = 0
cmd_buf = tab_matches[0]
else:
tab_matches = []
cursor_pos = len(cmd_buf)
elif 32 <= ch < 127:
cmd_buf = cmd_buf[:cursor_pos] + chr(ch) + cmd_buf[cursor_pos:]
cursor_pos += 1
tab_matches = []
tab_idx = -1
except KeyboardInterrupt:
self.running = False
self.engine.stop()
def _complete(self, text):
"""Return list of completions for current input."""
parts = text.split()
commands = ["ch", "fx", "pan", "drums", "kbd", "rec", "stop", "export",
"save", "load", "seed", "list", "patterns", "octave", "help", "exit"]
fx_params = ["volume", "pan", "lowpass", "lowpass_q", "reverb", "chorus",
"detune", "spread", "analog", "distortion", "delay",
"tremolo_depth", "saturation", "phaser", "sub_osc", "noise_mix"]
if not parts:
return [c + " " for c in commands]
# Completing first word
if len(parts) == 1 and not text.endswith(" "):
prefix = parts[0].lower()
return [c + " " for c in commands if c.startswith(prefix)]
verb = parts[0].lower()
# ch <n> <instrument>
if verb == "ch" and len(parts) == 3 and not text.endswith(" "):
prefix = parts[2].lower()
return [f"ch {parts[1]} {i} " for i in self.instruments
if i.startswith(prefix)]
if verb == "ch" and len(parts) == 2 and text.endswith(" "):
return [f"ch {parts[1]} {i} " for i in self.instruments]
# drums <pattern>
if verb == "drums" and len(parts) == 2 and not text.endswith(" "):
prefix = parts[1].lower()
matches = [p for p in self.drum_patterns if p.startswith(prefix)]
return [f"drums {m} " for m in matches]
if verb == "drums" and len(parts) == 1 and text.endswith(" "):
return [f"drums {p} " for p in self.drum_patterns]
# fx <n> <param> <val>
if verb == "fx" and len(parts) == 3 and not text.endswith(" "):
prefix = parts[2].lower()
return [f"fx {parts[1]} {p} " for p in fx_params
if p.startswith(prefix)]
if verb == "fx" and len(parts) == 2 and text.endswith(" "):
return [f"fx {parts[1]} {p} " for p in fx_params]
return []
def _handle_command(self, cmd):
parts = cmd.split()
if not parts:
return
verb = parts[0].lower()
if verb in ("quit", "q", "exit"):
self.running = False
elif verb in ("help", "h"):
self.log("ch <n> [inst] | fx <n> <param> <val> | drums [pat|-]", 2)
self.log("seed [n] | list | patterns | exit", 2)
elif verb == "ch" and len(parts) == 2:
try:
n = int(parts[1])
if 1 <= n <= 8:
self.log(f"Ch {n}: {self.picks[n-1]}", 2)
else:
self.log("Channel 1-8", 4)
except ValueError:
self.log("ch <1-8> [instrument]", 4)
elif verb == "ch" and len(parts) >= 3:
try:
n = int(parts[1])
inst = parts[2]
if inst not in INSTRUMENTS:
self.log(f"Unknown: {inst}", 4)
return
if not (1 <= n <= 8):
self.log("Channel 1-8", 4)
return
self.picks[n - 1] = inst
self.engine.channel(n, instrument=inst, reverb=0.3)
self.log(f"Ch {n}{inst}", 1)
except (ValueError, IndexError):
self.log("ch <1-8> <instrument>", 4)
elif verb == "fx" and len(parts) >= 4:
try:
n = int(parts[1])
param = parts[2]
val = float(parts[3])
if not (1 <= n <= 8):
self.log("Channel 1-8", 4)
return
if n not in self.engine.channels:
self.log(f"Channel {n} not active", 4)
return
channel = self.engine.channels[n]
if param == "reverb":
channel.reverb = val
channel._cache.clear()
elif param == "lowpass":
channel.lowpass = val
channel._cache.clear()
elif param == "volume":
channel.volume = val
elif hasattr(channel, param):
setattr(channel, param, val)
channel._cache.clear()
else:
self.log(f"Unknown param: {param}", 4)
return
self.log(f"Ch {n} {param}={val}", 1)
except (ValueError, IndexError):
self.log("fx <1-8> <param> <value>", 4)
elif verb == "fx" and len(parts) == 2:
try:
n = int(parts[1])
if n in self.engine.channels:
ch = self.engine.channels[n]
self.log(f"Ch {n}: vol={ch.volume} lp={ch.lowpass} rev={ch.reverb}", 2)
else:
self.log(f"Channel {n} not active", 4)
except ValueError:
self.log("fx <ch> <param> <value>", 4)
elif verb == "fx" and len(parts) <= 1:
self.log("Volume/Mix:", 3)
self.log(" volume pan reverb delay", 2)
self.log("Filter:", 3)
self.log(" lowpass lowpass_q", 2)
self.log("Modulation:", 3)
self.log(" chorus detune spread tremolo_depth", 2)
self.log(" phaser analog", 2)
self.log("Drive:", 3)
self.log(" distortion saturation", 2)
self.log("Synth:", 3)
self.log(" sub_osc noise_mix", 2)
self.log("", 0)
self.log("fx <ch> <param> <value>", 2)
elif verb == "drums" and len(parts) == 1:
self.log(f"Current: {self.current_drum}", 5)
for i in range(0, len(self.drum_patterns), 4):
row = " ".join(f"{x:17s}" for x in self.drum_patterns[i:i+4])
self.log(f" {row}", 2)
elif verb == "drums" and len(parts) >= 2:
pat = " ".join(parts[1:])
if pat in ("none", "off", "mute", "-"):
self.current_drum = "none"
self.engine._drum_pattern = None
self.engine._drum_channel = None
self.log("Drums off", 4)
else:
try:
self.current_drum = pat
self.engine.drums(pat, volume=0.5)
self.log(f"Drums → {pat}", 5)
except Exception as e:
self.log(f"Error: {e}", 4)
elif verb == "seed" and len(parts) == 1:
self.log(f"Seed: {self.seed}", 2)
elif verb == "seed" and len(parts) >= 2:
try:
self.seed = int(parts[1])
rng = random.Random(self.seed)
self.picks = rng.sample(self.instruments, 8)
for i, inst in enumerate(self.picks, 1):
self.engine.channel(i, instrument=inst, reverb=0.3)
self.log(f"Seed → {self.seed}", 1)
except ValueError:
self.log("seed <number>", 4)
elif verb == "list":
for i in range(0, len(self.instruments), 3):
row = " ".join(f"{x:22s}" for x in self.instruments[i:i+3])
self.log(f" {row}", 2)
elif verb == "patterns":
for i in range(0, len(self.drum_patterns), 4):
row = " ".join(f"{x:17s}" for x in self.drum_patterns[i:i+4])
self.log(f" {row}", 2)
elif verb == "pan" and len(parts) >= 3:
try:
n = int(parts[1])
val = float(parts[2])
val = max(-1.0, min(1.0, val))
if n in self.engine.channels:
self.engine.channels[n].pan = val
self.log(f"Ch {n} pan={val:+.1f}", 1)
else:
self.log(f"Channel {n} not active", 4)
except ValueError:
self.log("pan <1-8> <-1..1>", 4)
elif verb == "kbd":
if len(parts) >= 2:
try:
ch_num = int(parts[1])
self.engine._keyboard_channel = ch_num
if len(parts) >= 3:
self.engine._keyboard_octave = int(parts[2])
except ValueError:
self.log("kbd <channel> [octave]", 4)
return
else:
self.engine._keyboard_channel = self.engine._keyboard_channel or 1
self.kbd_active = True
# Make sure wavetables are cached for this channel
ch_num = self.engine._keyboard_channel
if ch_num in self.engine.channels:
channel = self.engine.channels[ch_num]
if not channel._cache:
self.log("Rendering wavetables...", 3)
for midi_note in range(36, 97):
channel._get_wave(midi_note, 44100 * 3)
self.log(f"♪ Keyboard ON ch{ch_num} oct{self.engine._keyboard_octave} (Esc=exit, ↑↓=octave)", 1)
elif verb == "rec":
self.engine.start_recording()
self.log("● Recording...", 4)
elif verb == "stop" and self.engine._recording:
self.engine.stop_recording()
n = len(self.engine._record_events)
self.log(f"■ Stopped ({n} events)", 3)
elif verb == "export":
fname = parts[1] if len(parts) > 1 else "recording.mid"
score = self.engine.export_recording(fname)
if score:
self.log(f"Exported → {fname}", 1)
else:
self.log("Nothing to export", 4)
elif verb == "save" and len(parts) >= 2:
fname = parts[1]
if not fname.endswith(".json"):
fname += ".json"
self.engine.seed = self.seed
self.engine.save_config(fname)
self.log(f"Saved → {fname}", 1)
elif verb == "load" and len(parts) >= 2:
fname = parts[1]
if not fname.endswith(".json"):
fname += ".json"
try:
self.engine.load_config(fname)
self.log(f"Loaded ← {fname}", 1)
# Update picks from channels
for ch, channel in self.engine.channels.items():
if 1 <= ch <= 8:
self.picks[ch - 1] = channel.synth_name
except Exception as e:
self.log(f"Error: {e}", 4)
elif verb == "octave" and len(parts) >= 2:
try:
self.engine._keyboard_octave = int(parts[1])
self.log(f"Keyboard octave → {self.engine._keyboard_octave}", 2)
except ValueError:
self.log("octave <0-8>", 4)
else:
self.log(f"? {cmd}", 4)
def main():
seed = int(sys.argv[1]) if len(sys.argv) > 1 else None
port = sys.argv[2] if len(sys.argv) > 2 else "OP-XY"
tui = LiveTUI(seed=seed, port=port)
curses.wrapper(tui.run)
if __name__ == "__main__":
main()