"""PyTheory Live — interactive MIDI synthesizer with TUI.""" import curses import random import sys import threading import time import os from pytheory.live import LiveEngine from pytheory.rhythm import INSTRUMENTS, Pattern NOTE_NAMES = ['C', 'C#', 'D', 'D#', 'E', 'F', 'F#', 'G', 'G#', 'A', 'A#', 'B'] def note_name(midi): return f"{NOTE_NAMES[midi % 12]}{midi // 12 - 1}" class LiveTUI: def __init__(self, seed=None, port="OP-XY", n_channels=8, drum_pattern="rock", buffer_size=128): self.seed = seed or random.randint(0, 9999) self.port = port self.n_channels = n_channels self.buffer_size = buffer_size self.engine = None self.log_lines = [] self.max_log = 500 self.running = True self.instruments = sorted([k for k in INSTRUMENTS.keys() if k not in ("808_bass",)]) self.drum_patterns = sorted(Pattern.list_presets()) self.current_drum = drum_pattern self.picks = [] self.bpm = "—" self.status = "Init" self.status_color = 3 self._build_engine() def _build_engine(self): rng = random.Random(self.seed) self.picks = rng.sample(self.instruments, min(self.n_channels, len(self.instruments))) self.engine = LiveEngine(buffer_size=self.buffer_size) for i, inst in enumerate(self.picks, 1): self.engine.channel(i, instrument=inst, reverb=0.3) if self.current_drum and self.current_drum not in ("none", "-"): self.engine.drums(self.current_drum, volume=0.5) self.engine.cc(0, "lowpass", min_val=300, max_val=8000) def log(self, msg, color=0): self.log_lines.append((time.time(), msg, color)) if len(self.log_lines) > self.max_log: self.log_lines = self.log_lines[-self.max_log:] def _patch_engine_logging(self): original_cb = self.engine._midi_callback def logging_cb(event, data=None): msg, _ = event if len(msg) == 0: return if msg[0] == 0xF8: if self.engine._bpm > 10: self.bpm = f"{self.engine._bpm:.0f}" elif msg[0] == 0xFA: self.log("▶ Start", 5) self.status = "Playing" self.status_color = 1 elif msg[0] == 0xFC: self.log("■ Stop", 4) self.status = "Stopped" self.status_color = 4 elif msg[0] == 0xFB: self.log("▶ Continue", 5) self.status = "Playing" self.status_color = 1 elif len(msg) >= 3: status = msg[0] ch = (status & 0x0F) + 1 msg_type = status & 0xF0 if msg_type == 0x90 and msg[2] > 0: inst = self.picks[ch - 1] if 1 <= ch <= 8 else "?" self.log(f"♪ {ch}:{inst} {note_name(msg[1])} v={msg[2]}", 1) elif msg_type == 0xB0: self.log(f"⚙ CC{msg[1]}={msg[2]}", 3) elif msg_type == 0xE0: bend = ((msg[2] << 7) | msg[1]) - 8192 self.log(f"↕ Bend ch{ch} {bend:+d}", 3) original_cb(event, data) self.engine._midi_callback = logging_cb def run(self, stdscr): curses.curs_set(0) curses.use_default_colors() curses.init_pair(1, curses.COLOR_GREEN, -1) curses.init_pair(2, curses.COLOR_CYAN, -1) curses.init_pair(3, curses.COLOR_YELLOW, -1) curses.init_pair(4, curses.COLOR_RED, -1) curses.init_pair(5, curses.COLOR_MAGENTA, -1) curses.init_pair(6, curses.COLOR_BLACK, curses.COLOR_BLUE) curses.init_pair(7, curses.COLOR_BLACK, curses.COLOR_GREEN) curses.init_pair(8, curses.COLOR_BLACK, curses.COLOR_YELLOW) curses.init_pair(9, curses.COLOR_BLACK, curses.COLOR_RED) stdscr.nodelay(True) stdscr.timeout(60) self._patch_engine_logging() # Pre-render with progress self.status = "Rendering" self.status_color = 3 stdscr.erase() stdscr.addstr(1, 2, "PyTheory Live", curses.A_BOLD) stdscr.addstr(2, 2, "Pre-rendering wavetables...", curses.color_pair(3)) stdscr.refresh() n_samples = 44100 * 3 count = 0 for _, channel in self.engine.channels.items(): if channel.is_drums: continue for midi_note in range(36, 97): channel._get_wave(midi_note, n_samples) count += 1 if count % 50 == 0: stdscr.addstr(3, 2, f" {count} wavetables...", curses.color_pair(2)) stdscr.refresh() self.log(f"Cached {count} wavetables", 1) self.log("Starting engine...", 3) self.status = "Starting" self.status_color = 3 # Start engine def run_engine(): import io capture = io.StringIO() old = sys.stdout sys.stdout = capture try: self.engine.start(port=self.port) except Exception as e: self.log(f"Engine error: {e}", 4) finally: sys.stdout = old output = capture.getvalue() if output.strip(): for line in output.strip().split('\n'): self.log(line.strip(), 2) engine_thread = threading.Thread(target=run_engine, daemon=True) engine_thread.start() cmd_buf = "" cursor_pos = 0 cmd_history = [] history_idx = -1 tab_matches = [] tab_idx = -1 tab_prefix = "" self.kbd_active = False self._kbd_held = {} # key → last_press_time while self.running: try: # Update status from engine state if (self.engine._stream and self.engine._stream.active and self.status == "Starting"): self.status = "Listening" self.status_color = 2 self.log("Audio stream active", 1) h, w = stdscr.getmaxyx() if h < 10 or w < 40: time.sleep(0.1) continue stdscr.erase() div = max(30, w * 3 // 5) cfg_x = div + 2 # ═══ HEADER BAR ═══ header = f" PyTheory Live " stdscr.addstr(0, 0, header, curses.color_pair(6) | curses.A_BOLD) # Status badge badge_colors = {1: 7, 2: 6, 3: 8, 4: 9} badge_cp = badge_colors.get(self.status_color, 6) badge = f" {self.status} " x = len(header) stdscr.addstr(0, x, badge, curses.color_pair(badge_cp) | curses.A_BOLD) x += len(badge) kbd_mode = "" if self.engine._keyboard_channel: kbd_mode = f" KBD:ch{self.engine._keyboard_channel}" rec_mode = " ●REC" if self.engine._recording else "" info = f" BPM:{self.bpm} drums:{self.current_drum} seed:{self.seed}{kbd_mode}{rec_mode}" try: stdscr.addstr(0, x, info[:w - x], curses.color_pair(6)) # Fill rest of header remaining = w - x - len(info) if remaining > 0: stdscr.addstr(0, x + len(info), " " * remaining, curses.color_pair(6)) except curses.error: pass # ═══ DIVIDER ═══ for y in range(1, h - 2): try: stdscr.addch(y, div, '│', curses.color_pair(2) | curses.A_DIM) except curses.error: pass # ═══ LEFT: EVENTS ═══ try: stdscr.addstr(1, 1, " Events ", curses.color_pair(2) | curses.A_BOLD) except curses.error: pass log_h = h - 5 visible = self.log_lines[-log_h:] if log_h > 0 else [] for i, (ts, msg, color) in enumerate(visible): ly = 2 + i if ly >= h - 3: break # Fade old messages age = time.time() - ts attr = curses.A_DIM if age > 8 else 0 try: stdscr.addstr(ly, 1, msg[:div - 2], curses.color_pair(color) | attr) except curses.error: pass # ═══ RIGHT: CONFIG ═══ try: stdscr.addstr(1, cfg_x, " Config ", curses.color_pair(2) | curses.A_BOLD) except curses.error: pass y = 3 rw = w - cfg_x - 1 for i, inst in enumerate(self.picks, 1): try: stdscr.addstr(y, cfg_x, f"{i}", curses.A_BOLD) stdscr.addstr(y, cfg_x + 1, ":", curses.A_DIM) stdscr.addstr(y, cfg_x + 2, inst[:rw - 2], curses.color_pair(1)) except curses.error: pass y += 1 y += 1 # VU meters per channel y += 1 try: stdscr.addstr(y, cfg_x, "Levels:", curses.A_BOLD | curses.A_DIM) except curses.error: pass y += 1 for i, inst in enumerate(self.picks, 1): if i in self.engine.channels: lv = self.engine.channels[i].level bars = int(min(lv, 1.0) * 16) meter = "|" * bars + "-" * (16 - bars) color = 1 if bars < 15 else 3 if bars < 18 else 4 try: stdscr.addstr(y, cfg_x, f"{i}", curses.A_BOLD) stdscr.addstr(y, cfg_x + 1, f" {meter}", curses.color_pair(color)) except curses.error: pass y += 1 y += 1 rec_indicator = " ● REC " if self.engine._recording else "" kbd_indicator = f" kbd:ch{self.engine._keyboard_channel} oct{self.engine._keyboard_octave}" if self.engine._keyboard_channel else "" pairs = [ ("Drums", self.current_drum, 5), ("BPM", str(self.bpm), 0), ("Latency", f"{self.engine.buffer_size / 44100 * 1000:.1f}ms", 0), ("MIDI", self.port, 0), ("Seed", str(self.seed), 2), ] if rec_indicator: pairs.append(("", rec_indicator, 4)) if kbd_indicator: pairs.append(("", kbd_indicator, 2)) for label, val, cp in pairs: try: stdscr.addstr(y, cfg_x, f"{label}:", curses.A_BOLD) stdscr.addstr(y, cfg_x + len(label) + 1, f" {val}"[:rw], curses.color_pair(cp)) except curses.error: pass y += 1 y += 1 cmds = [ "ch [inst]", "fx ", "pan <-1..1>", "drums [pattern|-]", "kbd [ch] [oct]", "rec / stop / export", "save/load ", "seed [n] / list", "exit", ] try: stdscr.addstr(y, cfg_x, "Commands:", curses.color_pair(3)) except curses.error: pass for i, c in enumerate(cmds): try: stdscr.addstr(y + 1 + i, cfg_x + 1, c[:rw], curses.color_pair(2) | curses.A_DIM) except curses.error: pass # ═══ INPUT BAR ═══ iy = h - 2 try: stdscr.addstr(iy - 1, 0, "─" * (w - 1), curses.A_DIM) if self.kbd_active: stdscr.addstr(iy, 0, " KEYBOARD ", curses.color_pair(7) | curses.A_BOLD) stdscr.addstr(iy, 10, " Esc=exit Up/Down=octave ", curses.color_pair(3)) else: stdscr.addstr(iy, 0, " $ ", curses.color_pair(1) | curses.A_BOLD) stdscr.addstr(iy, 3, cmd_buf[:w - 5]) # Cursor at position cx = 3 + cursor_pos if cx < w - 1: # Show character under cursor inverted, or block at end if cursor_pos < len(cmd_buf): stdscr.addstr(iy, cx, cmd_buf[cursor_pos], curses.A_REVERSE) else: stdscr.addstr(iy, cx, " ", curses.A_REVERSE) except curses.error: pass stdscr.refresh() # ═══ INPUT ═══ ch = stdscr.getch() if ch == -1: continue # KEYBOARD MODE: all keys go to MIDI if self.kbd_active: if ch == 27: # Escape exits keyboard mode # Release all held notes for k in list(self._kbd_held): self.engine.keyboard_note(k, on=False) self._kbd_held.clear() self.kbd_active = False self.engine._keyboard_channel = None self.log("Keyboard off (Esc)", 3) elif ch == curses.KEY_UP: self.engine._keyboard_octave = min(8, self.engine._keyboard_octave + 1) self.log(f"Octave ↑ {self.engine._keyboard_octave}", 2) elif ch == curses.KEY_DOWN: self.engine._keyboard_octave = max(0, self.engine._keyboard_octave - 1) self.log(f"Octave ↓ {self.engine._keyboard_octave}", 2) elif 32 <= ch < 127: key = chr(ch).lower() now = time.time() if key in self._kbd_held: # Key repeat — just refresh the timer self._kbd_held[key] = now else: # New key press played = self.engine.keyboard_note(key, on=True) if played: self._kbd_held[key] = now # Release keys that haven't been pressed for 150ms now = time.time() expired = [k for k, t in self._kbd_held.items() if now - t > 0.15] for k in expired: self.engine.keyboard_note(k, on=False) del self._kbd_held[k] continue if ch == 10 or ch == 13: if cmd_buf.strip(): cmd_history.append(cmd_buf) self._handle_command(cmd_buf.strip()) cmd_buf = "" cursor_pos = 0 history_idx = -1 elif ch == 27: if self.engine._keyboard_channel and not cmd_buf: # Escape exits keyboard mode self.engine._keyboard_channel = None self.log("Keyboard off (Esc)", 3) else: cmd_buf = "" cursor_pos = 0 elif ch == curses.KEY_BACKSPACE or ch == 127: if cursor_pos > 0: cmd_buf = cmd_buf[:cursor_pos - 1] + cmd_buf[cursor_pos:] cursor_pos -= 1 elif ch == curses.KEY_LEFT: cursor_pos = max(0, cursor_pos - 1) elif ch == curses.KEY_RIGHT: cursor_pos = min(len(cmd_buf), cursor_pos + 1) elif ch == curses.KEY_HOME or ch == 1: # Ctrl-A cursor_pos = 0 elif ch == curses.KEY_END or ch == 5: # Ctrl-E cursor_pos = len(cmd_buf) elif ch == curses.KEY_UP: if cmd_history and history_idx < len(cmd_history) - 1: history_idx += 1 cmd_buf = cmd_history[-(history_idx + 1)] cursor_pos = len(cmd_buf) elif ch == curses.KEY_DOWN: if history_idx > 0: history_idx -= 1 cmd_buf = cmd_history[-(history_idx + 1)] cursor_pos = len(cmd_buf) else: history_idx = -1 cmd_buf = "" cursor_pos = 0 elif ch == 9: # Tab if tab_matches and tab_prefix == cmd_buf: tab_idx = (tab_idx + 1) % len(tab_matches) cmd_buf = tab_matches[tab_idx] else: tab_matches = self._complete(cmd_buf) tab_prefix = cmd_buf if len(tab_matches) == 1: cmd_buf = tab_matches[0] tab_matches = [] elif tab_matches: tab_idx = 0 cmd_buf = tab_matches[0] else: tab_matches = [] cursor_pos = len(cmd_buf) elif 32 <= ch < 127: cmd_buf = cmd_buf[:cursor_pos] + chr(ch) + cmd_buf[cursor_pos:] cursor_pos += 1 tab_matches = [] tab_idx = -1 except KeyboardInterrupt: self.running = False self.engine.stop() def _complete(self, text): """Return list of completions for current input.""" parts = text.split() commands = ["ch", "fx", "pan", "drums", "kbd", "rec", "stop", "export", "save", "load", "seed", "list", "patterns", "octave", "help", "exit"] fx_params = ["volume", "pan", "lowpass", "lowpass_q", "reverb", "chorus", "detune", "spread", "analog", "distortion", "delay", "tremolo_depth", "saturation", "phaser", "sub_osc", "noise_mix"] if not parts: return [c + " " for c in commands] # Completing first word if len(parts) == 1 and not text.endswith(" "): prefix = parts[0].lower() return [c + " " for c in commands if c.startswith(prefix)] verb = parts[0].lower() # ch if verb == "ch" and len(parts) == 3 and not text.endswith(" "): prefix = parts[2].lower() return [f"ch {parts[1]} {i} " for i in self.instruments if i.startswith(prefix)] if verb == "ch" and len(parts) == 2 and text.endswith(" "): return [f"ch {parts[1]} {i} " for i in self.instruments] # drums if verb == "drums" and len(parts) == 2 and not text.endswith(" "): prefix = parts[1].lower() matches = [p for p in self.drum_patterns if p.startswith(prefix)] return [f"drums {m} " for m in matches] if verb == "drums" and len(parts) == 1 and text.endswith(" "): return [f"drums {p} " for p in self.drum_patterns] # fx if verb == "fx" and len(parts) == 3 and not text.endswith(" "): prefix = parts[2].lower() return [f"fx {parts[1]} {p} " for p in fx_params if p.startswith(prefix)] if verb == "fx" and len(parts) == 2 and text.endswith(" "): return [f"fx {parts[1]} {p} " for p in fx_params] return [] def _handle_command(self, cmd): parts = cmd.split() if not parts: return verb = parts[0].lower() if verb in ("quit", "q", "exit"): self.running = False elif verb in ("help", "h"): self.log("ch [inst] | fx | drums [pat|-]", 2) self.log("seed [n] | list | patterns | exit", 2) elif verb == "ch" and len(parts) == 2: try: n = int(parts[1]) if 1 <= n <= len(self.picks): self.log(f"Ch {n}: {self.picks[n-1]}", 2) else: self.log(f"Channel 1-{len(self.picks)}", 4) except ValueError: self.log("ch <1-8> [instrument]", 4) elif verb == "ch" and len(parts) >= 3: try: n = int(parts[1]) inst = parts[2] if inst not in INSTRUMENTS: self.log(f"Unknown: {inst}", 4) return if not (1 <= n <= len(self.picks)): self.log(f"Channel 1-{len(self.picks)}", 4) return self.picks[n - 1] = inst self.engine.channel(n, instrument=inst, reverb=0.3) self.log(f"Ch {n} → {inst}", 1) except (ValueError, IndexError): self.log("ch <1-8> ", 4) elif verb == "fx" and len(parts) >= 4: try: n = int(parts[1]) param = parts[2] val = float(parts[3]) if not (1 <= n <= len(self.picks)): self.log(f"Channel 1-{len(self.picks)}", 4) return if n not in self.engine.channels: self.log(f"Channel {n} not active", 4) return channel = self.engine.channels[n] if param == "reverb": channel.reverb = val channel._cache.clear() elif param == "lowpass": channel.lowpass = val channel._cache.clear() elif param == "volume": channel.volume = val elif hasattr(channel, param): setattr(channel, param, val) channel._cache.clear() else: self.log(f"Unknown param: {param}", 4) return self.log(f"Ch {n} {param}={val}", 1) except (ValueError, IndexError): self.log("fx <1-8> ", 4) elif verb == "fx" and len(parts) == 2: try: n = int(parts[1]) if n in self.engine.channels: ch = self.engine.channels[n] self.log(f"Ch {n}: vol={ch.volume} lp={ch.lowpass} rev={ch.reverb}", 2) else: self.log(f"Channel {n} not active", 4) except ValueError: self.log("fx ", 4) elif verb == "fx" and len(parts) <= 1: self.log("Volume/Mix:", 3) self.log(" volume pan reverb delay", 2) self.log("Filter:", 3) self.log(" lowpass lowpass_q", 2) self.log("Modulation:", 3) self.log(" chorus detune spread tremolo_depth", 2) self.log(" phaser analog", 2) self.log("Drive:", 3) self.log(" distortion saturation", 2) self.log("Synth:", 3) self.log(" sub_osc noise_mix", 2) self.log("", 0) self.log("fx ", 2) elif verb == "drums" and len(parts) == 1: self.log(f"Current: {self.current_drum}", 5) for i in range(0, len(self.drum_patterns), 4): row = " ".join(f"{x:17s}" for x in self.drum_patterns[i:i+4]) self.log(f" {row}", 2) elif verb == "drums" and len(parts) >= 2: pat = " ".join(parts[1:]) if pat in ("none", "off", "mute", "-"): self.current_drum = "none" self.engine._drum_pattern = None self.engine._drum_channel = None self.log("Drums off", 4) else: try: self.current_drum = pat self.engine.drums(pat, volume=0.5) self.log(f"Drums → {pat}", 5) except Exception as e: self.log(f"Error: {e}", 4) elif verb == "seed" and len(parts) == 1: self.log(f"Seed: {self.seed}", 2) elif verb == "seed" and len(parts) >= 2: try: self.seed = int(parts[1]) rng = random.Random(self.seed) self.picks = rng.sample(self.instruments, 8) for i, inst in enumerate(self.picks, 1): self.engine.channel(i, instrument=inst, reverb=0.3) self.log(f"Seed → {self.seed}", 1) except ValueError: self.log("seed ", 4) elif verb == "list": for i in range(0, len(self.instruments), 3): row = " ".join(f"{x:22s}" for x in self.instruments[i:i+3]) self.log(f" {row}", 2) elif verb == "patterns": for i in range(0, len(self.drum_patterns), 4): row = " ".join(f"{x:17s}" for x in self.drum_patterns[i:i+4]) self.log(f" {row}", 2) elif verb == "pan" and len(parts) >= 3: try: n = int(parts[1]) val = float(parts[2]) val = max(-1.0, min(1.0, val)) if n in self.engine.channels: self.engine.channels[n].pan = val self.log(f"Ch {n} pan={val:+.1f}", 1) else: self.log(f"Channel {n} not active", 4) except ValueError: self.log("pan <1-8> <-1..1>", 4) elif verb == "kbd": if len(parts) >= 2: try: ch_num = int(parts[1]) self.engine._keyboard_channel = ch_num if len(parts) >= 3: self.engine._keyboard_octave = int(parts[2]) except ValueError: self.log("kbd [octave]", 4) return else: self.engine._keyboard_channel = self.engine._keyboard_channel or 1 self.kbd_active = True # Make sure wavetables are cached for this channel ch_num = self.engine._keyboard_channel if ch_num in self.engine.channels: channel = self.engine.channels[ch_num] if not channel._cache: self.log("Rendering wavetables...", 3) for midi_note in range(36, 97): channel._get_wave(midi_note, 44100 * 3) self.log(f"♪ Keyboard ON ch{ch_num} oct{self.engine._keyboard_octave} (Esc=exit, ↑↓=octave)", 1) elif verb == "rec": self.engine.start_recording() self.log("● Recording...", 4) elif verb == "stop" and self.engine._recording: self.engine.stop_recording() n = len(self.engine._record_events) self.log(f"■ Stopped ({n} events)", 3) elif verb == "export": fname = parts[1] if len(parts) > 1 else "recording.mid" score = self.engine.export_recording(fname) if score: self.log(f"Exported → {fname}", 1) else: self.log("Nothing to export", 4) elif verb == "save" and len(parts) >= 2: fname = parts[1] if not fname.endswith(".json"): fname += ".json" self.engine.seed = self.seed self.engine.save_config(fname) self.log(f"Saved → {fname}", 1) elif verb == "load" and len(parts) >= 2: fname = parts[1] if not fname.endswith(".json"): fname += ".json" try: self.engine.load_config(fname) self.log(f"Loaded ← {fname}", 1) # Update picks from channels for ch, channel in self.engine.channels.items(): if 1 <= ch <= 8: self.picks[ch - 1] = channel.synth_name except Exception as e: self.log(f"Error: {e}", 4) elif verb == "octave" and len(parts) >= 2: try: self.engine._keyboard_octave = int(parts[1]) self.log(f"Keyboard octave → {self.engine._keyboard_octave}", 2) except ValueError: self.log("octave <0-8>", 4) else: self.log(f"? {cmd}", 4) def main(): import argparse parser = argparse.ArgumentParser(description="PyTheory Live — real-time MIDI synthesizer") parser.add_argument("seed", nargs="?", type=int, default=None, help="Random seed for instruments") parser.add_argument("--port", "-p", default="OP-XY", help="MIDI port name (default: OP-XY)") parser.add_argument("--channels", "-c", type=int, default=8, help="Number of channels (default: 8)") parser.add_argument("--drums", "-d", default="rock", help="Drum pattern (default: rock, 'none' to disable)") parser.add_argument("--buffer", "-b", type=int, default=128, help="Audio buffer size (default: 128)") args = parser.parse_args() tui = LiveTUI(seed=args.seed, port=args.port, n_channels=args.channels, drum_pattern=args.drums, buffer_size=args.buffer) curses.wrapper(tui.run) if __name__ == "__main__": main()