diff --git a/pyproject.toml b/pyproject.toml index 7f28f5c..6463bc6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,6 +37,7 @@ Issues = "https://github.com/kennethreitz/pytheory/issues" [project.scripts] pytheory = "pytheory.cli:main" +pytheory-live = "pytheory.live_tui:main" [dependency-groups] dev = ["pytest"] diff --git a/pytheory/live_tui.py b/pytheory/live_tui.py new file mode 100644 index 0000000..46f3e30 --- /dev/null +++ b/pytheory/live_tui.py @@ -0,0 +1,730 @@ +"""PyTheory Live — interactive MIDI synthesizer with TUI.""" + +import curses +import random +import sys +import threading +import time +import os + +from pytheory.live import LiveEngine +from pytheory.rhythm import INSTRUMENTS, Pattern + + +NOTE_NAMES = ['C', 'C#', 'D', 'D#', 'E', 'F', 'F#', 'G', 'G#', 'A', 'A#', 'B'] + + +def note_name(midi): + return f"{NOTE_NAMES[midi % 12]}{midi // 12 - 1}" + + +class LiveTUI: + def __init__(self, seed=None, port="OP-XY", n_channels=8, + drum_pattern="rock", buffer_size=128): + self.seed = seed or random.randint(0, 9999) + self.port = port + self.n_channels = n_channels + self.buffer_size = buffer_size + self.engine = None + self.log_lines = [] + self.max_log = 500 + self.running = True + self.instruments = sorted([k for k in INSTRUMENTS.keys() + if k not in ("808_bass",)]) + self.drum_patterns = sorted(Pattern.list_presets()) + self.current_drum = drum_pattern + self.picks = [] + self.bpm = "—" + self.status = "Init" + self.status_color = 3 + self._build_engine() + + def _build_engine(self): + rng = random.Random(self.seed) + self.picks = rng.sample(self.instruments, min(self.n_channels, len(self.instruments))) + self.engine = LiveEngine(buffer_size=self.buffer_size) + for i, inst in enumerate(self.picks, 1): + self.engine.channel(i, instrument=inst, reverb=0.3) + if self.current_drum and self.current_drum not in ("none", "-"): + self.engine.drums(self.current_drum, volume=0.5) + self.engine.cc(0, "lowpass", min_val=300, max_val=8000) + + def log(self, msg, color=0): + self.log_lines.append((time.time(), msg, color)) + if len(self.log_lines) > self.max_log: + self.log_lines = self.log_lines[-self.max_log:] + + def _patch_engine_logging(self): + original_cb = self.engine._midi_callback + + def logging_cb(event, data=None): + msg, _ = event + if len(msg) == 0: + return + if msg[0] == 0xF8: + if self.engine._bpm > 10: + self.bpm = f"{self.engine._bpm:.0f}" + elif msg[0] == 0xFA: + self.log("▶ Start", 5) + self.status = "Playing" + self.status_color = 1 + elif msg[0] == 0xFC: + self.log("■ Stop", 4) + self.status = "Stopped" + self.status_color = 4 + elif msg[0] == 0xFB: + self.log("▶ Continue", 5) + self.status = "Playing" + self.status_color = 1 + elif len(msg) >= 3: + status = msg[0] + ch = (status & 0x0F) + 1 + msg_type = status & 0xF0 + if msg_type == 0x90 and msg[2] > 0: + inst = self.picks[ch - 1] if 1 <= ch <= 8 else "?" + self.log(f"♪ {ch}:{inst} {note_name(msg[1])} v={msg[2]}", 1) + elif msg_type == 0xB0: + self.log(f"⚙ CC{msg[1]}={msg[2]}", 3) + elif msg_type == 0xE0: + bend = ((msg[2] << 7) | msg[1]) - 8192 + self.log(f"↕ Bend ch{ch} {bend:+d}", 3) + original_cb(event, data) + + self.engine._midi_callback = logging_cb + + def run(self, stdscr): + curses.curs_set(0) + curses.use_default_colors() + curses.init_pair(1, curses.COLOR_GREEN, -1) + curses.init_pair(2, curses.COLOR_CYAN, -1) + curses.init_pair(3, curses.COLOR_YELLOW, -1) + curses.init_pair(4, curses.COLOR_RED, -1) + curses.init_pair(5, curses.COLOR_MAGENTA, -1) + curses.init_pair(6, curses.COLOR_BLACK, curses.COLOR_BLUE) + curses.init_pair(7, curses.COLOR_BLACK, curses.COLOR_GREEN) + curses.init_pair(8, curses.COLOR_BLACK, curses.COLOR_YELLOW) + curses.init_pair(9, curses.COLOR_BLACK, curses.COLOR_RED) + stdscr.nodelay(True) + stdscr.timeout(60) + + self._patch_engine_logging() + + # Pre-render with progress + self.status = "Rendering" + self.status_color = 3 + stdscr.erase() + stdscr.addstr(1, 2, "PyTheory Live", curses.A_BOLD) + stdscr.addstr(2, 2, "Pre-rendering wavetables...", curses.color_pair(3)) + stdscr.refresh() + + n_samples = 44100 * 3 + count = 0 + for _, channel in self.engine.channels.items(): + if channel.is_drums: + continue + for midi_note in range(36, 97): + channel._get_wave(midi_note, n_samples) + count += 1 + if count % 50 == 0: + stdscr.addstr(3, 2, f" {count} wavetables...", curses.color_pair(2)) + stdscr.refresh() + + self.log(f"Cached {count} wavetables", 1) + self.log("Starting engine...", 3) + self.status = "Starting" + self.status_color = 3 + + # Start engine + def run_engine(): + import io + capture = io.StringIO() + old = sys.stdout + sys.stdout = capture + try: + self.engine.start(port=self.port) + except Exception as e: + self.log(f"Engine error: {e}", 4) + finally: + sys.stdout = old + output = capture.getvalue() + if output.strip(): + for line in output.strip().split('\n'): + self.log(line.strip(), 2) + + engine_thread = threading.Thread(target=run_engine, daemon=True) + engine_thread.start() + + cmd_buf = "" + cursor_pos = 0 + cmd_history = [] + history_idx = -1 + tab_matches = [] + tab_idx = -1 + tab_prefix = "" + self.kbd_active = False + self._kbd_held = {} # key → last_press_time + + while self.running: + try: + # Update status from engine state + if (self.engine._stream and self.engine._stream.active + and self.status == "Starting"): + self.status = "Listening" + self.status_color = 2 + self.log("Audio stream active", 1) + + h, w = stdscr.getmaxyx() + if h < 10 or w < 40: + time.sleep(0.1) + continue + stdscr.erase() + + div = max(30, w * 3 // 5) + cfg_x = div + 2 + + # ═══ HEADER BAR ═══ + header = f" PyTheory Live " + stdscr.addstr(0, 0, header, curses.color_pair(6) | curses.A_BOLD) + + # Status badge + badge_colors = {1: 7, 2: 6, 3: 8, 4: 9} + badge_cp = badge_colors.get(self.status_color, 6) + badge = f" {self.status} " + x = len(header) + stdscr.addstr(0, x, badge, curses.color_pair(badge_cp) | curses.A_BOLD) + x += len(badge) + + kbd_mode = "" + if self.engine._keyboard_channel: + kbd_mode = f" KBD:ch{self.engine._keyboard_channel}" + rec_mode = " ●REC" if self.engine._recording else "" + info = f" BPM:{self.bpm} drums:{self.current_drum} seed:{self.seed}{kbd_mode}{rec_mode}" + try: + stdscr.addstr(0, x, info[:w - x], curses.color_pair(6)) + # Fill rest of header + remaining = w - x - len(info) + if remaining > 0: + stdscr.addstr(0, x + len(info), " " * remaining, curses.color_pair(6)) + except curses.error: + pass + + # ═══ DIVIDER ═══ + for y in range(1, h - 2): + try: + stdscr.addch(y, div, '│', curses.color_pair(2) | curses.A_DIM) + except curses.error: + pass + + # ═══ LEFT: EVENTS ═══ + try: + stdscr.addstr(1, 1, " Events ", curses.color_pair(2) | curses.A_BOLD) + except curses.error: + pass + + log_h = h - 5 + visible = self.log_lines[-log_h:] if log_h > 0 else [] + for i, (ts, msg, color) in enumerate(visible): + ly = 2 + i + if ly >= h - 3: + break + # Fade old messages + age = time.time() - ts + attr = curses.A_DIM if age > 8 else 0 + try: + stdscr.addstr(ly, 1, msg[:div - 2], + curses.color_pair(color) | attr) + except curses.error: + pass + + # ═══ RIGHT: CONFIG ═══ + try: + stdscr.addstr(1, cfg_x, " Config ", + curses.color_pair(2) | curses.A_BOLD) + except curses.error: + pass + + y = 3 + rw = w - cfg_x - 1 + for i, inst in enumerate(self.picks, 1): + try: + stdscr.addstr(y, cfg_x, f"{i}", curses.A_BOLD) + stdscr.addstr(y, cfg_x + 1, ":", curses.A_DIM) + stdscr.addstr(y, cfg_x + 2, inst[:rw - 2], + curses.color_pair(1)) + except curses.error: + pass + y += 1 + + y += 1 + # VU meters per channel + y += 1 + try: + stdscr.addstr(y, cfg_x, "Levels:", curses.A_BOLD | curses.A_DIM) + except curses.error: + pass + y += 1 + for i, inst in enumerate(self.picks, 1): + if i in self.engine.channels: + lv = self.engine.channels[i].level + bars = int(min(lv, 1.0) * 16) + meter = "|" * bars + "-" * (16 - bars) + color = 1 if bars < 15 else 3 if bars < 18 else 4 + try: + stdscr.addstr(y, cfg_x, f"{i}", curses.A_BOLD) + stdscr.addstr(y, cfg_x + 1, f" {meter}", curses.color_pair(color)) + except curses.error: + pass + y += 1 + + y += 1 + rec_indicator = " ● REC " if self.engine._recording else "" + kbd_indicator = f" kbd:ch{self.engine._keyboard_channel} oct{self.engine._keyboard_octave}" if self.engine._keyboard_channel else "" + + pairs = [ + ("Drums", self.current_drum, 5), + ("BPM", str(self.bpm), 0), + ("Latency", f"{self.engine.buffer_size / 44100 * 1000:.1f}ms", 0), + ("MIDI", self.port, 0), + ("Seed", str(self.seed), 2), + ] + if rec_indicator: + pairs.append(("", rec_indicator, 4)) + if kbd_indicator: + pairs.append(("", kbd_indicator, 2)) + for label, val, cp in pairs: + try: + stdscr.addstr(y, cfg_x, f"{label}:", curses.A_BOLD) + stdscr.addstr(y, cfg_x + len(label) + 1, f" {val}"[:rw], + curses.color_pair(cp)) + except curses.error: + pass + y += 1 + + y += 1 + cmds = [ + "ch [inst]", + "fx ", + "pan <-1..1>", + "drums [pattern|-]", + "kbd [ch] [oct]", + "rec / stop / export", + "save/load ", + "seed [n] / list", + "exit", + ] + try: + stdscr.addstr(y, cfg_x, "Commands:", curses.color_pair(3)) + except curses.error: + pass + for i, c in enumerate(cmds): + try: + stdscr.addstr(y + 1 + i, cfg_x + 1, c[:rw], + curses.color_pair(2) | curses.A_DIM) + except curses.error: + pass + + # ═══ INPUT BAR ═══ + iy = h - 2 + try: + stdscr.addstr(iy - 1, 0, "─" * (w - 1), curses.A_DIM) + if self.kbd_active: + stdscr.addstr(iy, 0, " KEYBOARD ", + curses.color_pair(7) | curses.A_BOLD) + stdscr.addstr(iy, 10, " Esc=exit Up/Down=octave ", + curses.color_pair(3)) + else: + stdscr.addstr(iy, 0, " $ ", + curses.color_pair(1) | curses.A_BOLD) + stdscr.addstr(iy, 3, cmd_buf[:w - 5]) + # Cursor at position + cx = 3 + cursor_pos + if cx < w - 1: + # Show character under cursor inverted, or block at end + if cursor_pos < len(cmd_buf): + stdscr.addstr(iy, cx, cmd_buf[cursor_pos], + curses.A_REVERSE) + else: + stdscr.addstr(iy, cx, " ", curses.A_REVERSE) + except curses.error: + pass + + stdscr.refresh() + + # ═══ INPUT ═══ + ch = stdscr.getch() + if ch == -1: + continue + + # KEYBOARD MODE: all keys go to MIDI + if self.kbd_active: + if ch == 27: # Escape exits keyboard mode + # Release all held notes + for k in list(self._kbd_held): + self.engine.keyboard_note(k, on=False) + self._kbd_held.clear() + self.kbd_active = False + self.engine._keyboard_channel = None + self.log("Keyboard off (Esc)", 3) + elif ch == curses.KEY_UP: + self.engine._keyboard_octave = min(8, self.engine._keyboard_octave + 1) + self.log(f"Octave ↑ {self.engine._keyboard_octave}", 2) + elif ch == curses.KEY_DOWN: + self.engine._keyboard_octave = max(0, self.engine._keyboard_octave - 1) + self.log(f"Octave ↓ {self.engine._keyboard_octave}", 2) + elif 32 <= ch < 127: + key = chr(ch).lower() + now = time.time() + if key in self._kbd_held: + # Key repeat — just refresh the timer + self._kbd_held[key] = now + else: + # New key press + played = self.engine.keyboard_note(key, on=True) + if played: + self._kbd_held[key] = now + + # Release keys that haven't been pressed for 150ms + now = time.time() + expired = [k for k, t in self._kbd_held.items() + if now - t > 0.15] + for k in expired: + self.engine.keyboard_note(k, on=False) + del self._kbd_held[k] + + continue + + if ch == 10 or ch == 13: + if cmd_buf.strip(): + cmd_history.append(cmd_buf) + self._handle_command(cmd_buf.strip()) + cmd_buf = "" + cursor_pos = 0 + history_idx = -1 + elif ch == 27: + if self.engine._keyboard_channel and not cmd_buf: + # Escape exits keyboard mode + self.engine._keyboard_channel = None + self.log("Keyboard off (Esc)", 3) + else: + cmd_buf = "" + cursor_pos = 0 + elif ch == curses.KEY_BACKSPACE or ch == 127: + if cursor_pos > 0: + cmd_buf = cmd_buf[:cursor_pos - 1] + cmd_buf[cursor_pos:] + cursor_pos -= 1 + elif ch == curses.KEY_LEFT: + cursor_pos = max(0, cursor_pos - 1) + elif ch == curses.KEY_RIGHT: + cursor_pos = min(len(cmd_buf), cursor_pos + 1) + elif ch == curses.KEY_HOME or ch == 1: # Ctrl-A + cursor_pos = 0 + elif ch == curses.KEY_END or ch == 5: # Ctrl-E + cursor_pos = len(cmd_buf) + elif ch == curses.KEY_UP: + if cmd_history and history_idx < len(cmd_history) - 1: + history_idx += 1 + cmd_buf = cmd_history[-(history_idx + 1)] + cursor_pos = len(cmd_buf) + elif ch == curses.KEY_DOWN: + if history_idx > 0: + history_idx -= 1 + cmd_buf = cmd_history[-(history_idx + 1)] + cursor_pos = len(cmd_buf) + else: + history_idx = -1 + cmd_buf = "" + cursor_pos = 0 + elif ch == 9: # Tab + if tab_matches and tab_prefix == cmd_buf: + tab_idx = (tab_idx + 1) % len(tab_matches) + cmd_buf = tab_matches[tab_idx] + else: + tab_matches = self._complete(cmd_buf) + tab_prefix = cmd_buf + if len(tab_matches) == 1: + cmd_buf = tab_matches[0] + tab_matches = [] + elif tab_matches: + tab_idx = 0 + cmd_buf = tab_matches[0] + else: + tab_matches = [] + cursor_pos = len(cmd_buf) + elif 32 <= ch < 127: + cmd_buf = cmd_buf[:cursor_pos] + chr(ch) + cmd_buf[cursor_pos:] + cursor_pos += 1 + tab_matches = [] + tab_idx = -1 + + except KeyboardInterrupt: + self.running = False + + self.engine.stop() + + def _complete(self, text): + """Return list of completions for current input.""" + parts = text.split() + commands = ["ch", "fx", "pan", "drums", "kbd", "rec", "stop", "export", + "save", "load", "seed", "list", "patterns", "octave", "help", "exit"] + fx_params = ["volume", "pan", "lowpass", "lowpass_q", "reverb", "chorus", + "detune", "spread", "analog", "distortion", "delay", + "tremolo_depth", "saturation", "phaser", "sub_osc", "noise_mix"] + + if not parts: + return [c + " " for c in commands] + + # Completing first word + if len(parts) == 1 and not text.endswith(" "): + prefix = parts[0].lower() + return [c + " " for c in commands if c.startswith(prefix)] + + verb = parts[0].lower() + + # ch + if verb == "ch" and len(parts) == 3 and not text.endswith(" "): + prefix = parts[2].lower() + return [f"ch {parts[1]} {i} " for i in self.instruments + if i.startswith(prefix)] + if verb == "ch" and len(parts) == 2 and text.endswith(" "): + return [f"ch {parts[1]} {i} " for i in self.instruments] + + # drums + if verb == "drums" and len(parts) == 2 and not text.endswith(" "): + prefix = parts[1].lower() + matches = [p for p in self.drum_patterns if p.startswith(prefix)] + return [f"drums {m} " for m in matches] + if verb == "drums" and len(parts) == 1 and text.endswith(" "): + return [f"drums {p} " for p in self.drum_patterns] + + # fx + if verb == "fx" and len(parts) == 3 and not text.endswith(" "): + prefix = parts[2].lower() + return [f"fx {parts[1]} {p} " for p in fx_params + if p.startswith(prefix)] + if verb == "fx" and len(parts) == 2 and text.endswith(" "): + return [f"fx {parts[1]} {p} " for p in fx_params] + + return [] + + def _handle_command(self, cmd): + parts = cmd.split() + if not parts: + return + verb = parts[0].lower() + + if verb in ("quit", "q", "exit"): + self.running = False + elif verb in ("help", "h"): + self.log("ch [inst] | fx | drums [pat|-]", 2) + self.log("seed [n] | list | patterns | exit", 2) + elif verb == "ch" and len(parts) == 2: + try: + n = int(parts[1]) + if 1 <= n <= len(self.picks): + self.log(f"Ch {n}: {self.picks[n-1]}", 2) + else: + self.log(f"Channel 1-{len(self.picks)}", 4) + except ValueError: + self.log("ch <1-8> [instrument]", 4) + elif verb == "ch" and len(parts) >= 3: + try: + n = int(parts[1]) + inst = parts[2] + if inst not in INSTRUMENTS: + self.log(f"Unknown: {inst}", 4) + return + if not (1 <= n <= len(self.picks)): + self.log(f"Channel 1-{len(self.picks)}", 4) + return + self.picks[n - 1] = inst + self.engine.channel(n, instrument=inst, reverb=0.3) + self.log(f"Ch {n} → {inst}", 1) + except (ValueError, IndexError): + self.log("ch <1-8> ", 4) + elif verb == "fx" and len(parts) >= 4: + try: + n = int(parts[1]) + param = parts[2] + val = float(parts[3]) + if not (1 <= n <= len(self.picks)): + self.log(f"Channel 1-{len(self.picks)}", 4) + return + if n not in self.engine.channels: + self.log(f"Channel {n} not active", 4) + return + channel = self.engine.channels[n] + if param == "reverb": + channel.reverb = val + channel._cache.clear() + elif param == "lowpass": + channel.lowpass = val + channel._cache.clear() + elif param == "volume": + channel.volume = val + elif hasattr(channel, param): + setattr(channel, param, val) + channel._cache.clear() + else: + self.log(f"Unknown param: {param}", 4) + return + self.log(f"Ch {n} {param}={val}", 1) + except (ValueError, IndexError): + self.log("fx <1-8> ", 4) + elif verb == "fx" and len(parts) == 2: + try: + n = int(parts[1]) + if n in self.engine.channels: + ch = self.engine.channels[n] + self.log(f"Ch {n}: vol={ch.volume} lp={ch.lowpass} rev={ch.reverb}", 2) + else: + self.log(f"Channel {n} not active", 4) + except ValueError: + self.log("fx ", 4) + elif verb == "fx" and len(parts) <= 1: + self.log("Volume/Mix:", 3) + self.log(" volume pan reverb delay", 2) + self.log("Filter:", 3) + self.log(" lowpass lowpass_q", 2) + self.log("Modulation:", 3) + self.log(" chorus detune spread tremolo_depth", 2) + self.log(" phaser analog", 2) + self.log("Drive:", 3) + self.log(" distortion saturation", 2) + self.log("Synth:", 3) + self.log(" sub_osc noise_mix", 2) + self.log("", 0) + self.log("fx ", 2) + elif verb == "drums" and len(parts) == 1: + self.log(f"Current: {self.current_drum}", 5) + for i in range(0, len(self.drum_patterns), 4): + row = " ".join(f"{x:17s}" for x in self.drum_patterns[i:i+4]) + self.log(f" {row}", 2) + elif verb == "drums" and len(parts) >= 2: + pat = " ".join(parts[1:]) + if pat in ("none", "off", "mute", "-"): + self.current_drum = "none" + self.engine._drum_pattern = None + self.engine._drum_channel = None + self.log("Drums off", 4) + else: + try: + self.current_drum = pat + self.engine.drums(pat, volume=0.5) + self.log(f"Drums → {pat}", 5) + except Exception as e: + self.log(f"Error: {e}", 4) + elif verb == "seed" and len(parts) == 1: + self.log(f"Seed: {self.seed}", 2) + elif verb == "seed" and len(parts) >= 2: + try: + self.seed = int(parts[1]) + rng = random.Random(self.seed) + self.picks = rng.sample(self.instruments, 8) + for i, inst in enumerate(self.picks, 1): + self.engine.channel(i, instrument=inst, reverb=0.3) + self.log(f"Seed → {self.seed}", 1) + except ValueError: + self.log("seed ", 4) + elif verb == "list": + for i in range(0, len(self.instruments), 3): + row = " ".join(f"{x:22s}" for x in self.instruments[i:i+3]) + self.log(f" {row}", 2) + elif verb == "patterns": + for i in range(0, len(self.drum_patterns), 4): + row = " ".join(f"{x:17s}" for x in self.drum_patterns[i:i+4]) + self.log(f" {row}", 2) + elif verb == "pan" and len(parts) >= 3: + try: + n = int(parts[1]) + val = float(parts[2]) + val = max(-1.0, min(1.0, val)) + if n in self.engine.channels: + self.engine.channels[n].pan = val + self.log(f"Ch {n} pan={val:+.1f}", 1) + else: + self.log(f"Channel {n} not active", 4) + except ValueError: + self.log("pan <1-8> <-1..1>", 4) + elif verb == "kbd": + if len(parts) >= 2: + try: + ch_num = int(parts[1]) + self.engine._keyboard_channel = ch_num + if len(parts) >= 3: + self.engine._keyboard_octave = int(parts[2]) + except ValueError: + self.log("kbd [octave]", 4) + return + else: + self.engine._keyboard_channel = self.engine._keyboard_channel or 1 + self.kbd_active = True + # Make sure wavetables are cached for this channel + ch_num = self.engine._keyboard_channel + if ch_num in self.engine.channels: + channel = self.engine.channels[ch_num] + if not channel._cache: + self.log("Rendering wavetables...", 3) + for midi_note in range(36, 97): + channel._get_wave(midi_note, 44100 * 3) + self.log(f"♪ Keyboard ON ch{ch_num} oct{self.engine._keyboard_octave} (Esc=exit, ↑↓=octave)", 1) + elif verb == "rec": + self.engine.start_recording() + self.log("● Recording...", 4) + elif verb == "stop" and self.engine._recording: + self.engine.stop_recording() + n = len(self.engine._record_events) + self.log(f"■ Stopped ({n} events)", 3) + elif verb == "export": + fname = parts[1] if len(parts) > 1 else "recording.mid" + score = self.engine.export_recording(fname) + if score: + self.log(f"Exported → {fname}", 1) + else: + self.log("Nothing to export", 4) + elif verb == "save" and len(parts) >= 2: + fname = parts[1] + if not fname.endswith(".json"): + fname += ".json" + self.engine.seed = self.seed + self.engine.save_config(fname) + self.log(f"Saved → {fname}", 1) + elif verb == "load" and len(parts) >= 2: + fname = parts[1] + if not fname.endswith(".json"): + fname += ".json" + try: + self.engine.load_config(fname) + self.log(f"Loaded ← {fname}", 1) + # Update picks from channels + for ch, channel in self.engine.channels.items(): + if 1 <= ch <= 8: + self.picks[ch - 1] = channel.synth_name + except Exception as e: + self.log(f"Error: {e}", 4) + elif verb == "octave" and len(parts) >= 2: + try: + self.engine._keyboard_octave = int(parts[1]) + self.log(f"Keyboard octave → {self.engine._keyboard_octave}", 2) + except ValueError: + self.log("octave <0-8>", 4) + else: + self.log(f"? {cmd}", 4) + + +def main(): + import argparse + parser = argparse.ArgumentParser(description="PyTheory Live — real-time MIDI synthesizer") + parser.add_argument("seed", nargs="?", type=int, default=None, help="Random seed for instruments") + parser.add_argument("--port", "-p", default="OP-XY", help="MIDI port name (default: OP-XY)") + parser.add_argument("--channels", "-c", type=int, default=8, help="Number of channels (default: 8)") + parser.add_argument("--drums", "-d", default="rock", help="Drum pattern (default: rock, 'none' to disable)") + parser.add_argument("--buffer", "-b", type=int, default=128, help="Audio buffer size (default: 128)") + args = parser.parse_args() + + tui = LiveTUI(seed=args.seed, port=args.port, n_channels=args.channels, + drum_pattern=args.drums, buffer_size=args.buffer) + curses.wrapper(tui.run) + + +if __name__ == "__main__": + main()