diff --git a/pyproject.toml b/pyproject.toml index d0fdf2a..7f28f5c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,6 +23,7 @@ classifiers = [ dependencies = [ "sounddevice", "scipy", + "rich>=14.3.3", ] [project.optional-dependencies] diff --git a/pytheory/live.py b/pytheory/live.py index f1d593e..83885a9 100644 --- a/pytheory/live.py +++ b/pytheory/live.py @@ -74,6 +74,17 @@ class _Channel: self.lowpass_q = kwargs.get('lowpass_q', 0.707) self.reverb = kwargs.get('reverb', 0) self.volume = kwargs.get('volume', 0.5) + self.chorus = kwargs.get('chorus', 0) + self.detune = kwargs.get('detune', 0) + self.spread = kwargs.get('spread', 0) + self.analog = kwargs.get('analog', 0) + self.distortion = kwargs.get('distortion', 0) + self.delay = kwargs.get('delay', 0) + self.tremolo_depth = kwargs.get('tremolo_depth', 0) + self.saturation = kwargs.get('saturation', 0) + self.phaser = kwargs.get('phaser', 0) + self.sub_osc = kwargs.get('sub_osc', 0) + self.noise_mix = kwargs.get('noise_mix', 0) self.voices = [] # active _Voice objects self._cache = {} # MIDI note -> pre-rendered wave @@ -129,6 +140,30 @@ class _Channel: fb_delay = int(fb_delay * 1.5) wave_f = wave_f * (1.0 - wet) + reverbed * wet + # Apply distortion/saturation + if self.distortion > 0: + drive = 3.0 + wave_f = numpy.tanh(wave_f * drive * (1 + self.distortion * 3)) / drive + if self.saturation > 0: + wave_f = numpy.tanh(wave_f * (1 + self.saturation * 2)) + + # Apply tremolo + if self.tremolo_depth > 0: + t = numpy.arange(len(wave_f), dtype=numpy.float32) / SAMPLE_RATE + trem = 1.0 - self.tremolo_depth * 0.5 * (1 + numpy.sin(2 * numpy.pi * 5.0 * t)) + wave_f *= trem + + # Apply chorus (simple delay modulation) + if self.chorus > 0: + t = numpy.arange(len(wave_f), dtype=numpy.float32) / SAMPLE_RATE + mod = (numpy.sin(2 * numpy.pi * 1.5 * t) * 0.002 * SAMPLE_RATE).astype(int) + chorus_buf = numpy.zeros_like(wave_f) + for i in range(len(wave_f)): + idx = i - abs(mod[i]) - int(SAMPLE_RATE * 0.015) + if 0 <= idx < len(wave_f): + chorus_buf[i] = wave_f[idx] + wave_f = wave_f * (1 - self.chorus * 0.5) + chorus_buf * self.chorus * 0.5 + self._cache[midi_note] = wave_f return wave_f diff --git a/test_live.py b/test_live.py new file mode 100644 index 0000000..5c506fc --- /dev/null +++ b/test_live.py @@ -0,0 +1,451 @@ +"""PyTheory Live — interactive MIDI synthesizer with TUI.""" + +import curses +import random +import sys +import threading +import time +import os + +from pytheory.live import LiveEngine +from pytheory.rhythm import INSTRUMENTS, Pattern + + +NOTE_NAMES = ['C', 'C#', 'D', 'D#', 'E', 'F', 'F#', 'G', 'G#', 'A', 'A#', 'B'] + + +def note_name(midi): + return f"{NOTE_NAMES[midi % 12]}{midi // 12 - 1}" + + +class LiveTUI: + def __init__(self, seed=None, port="OP-XY"): + self.seed = seed or random.randint(0, 9999) + self.port = port + self.engine = None + self.log_lines = [] + self.max_log = 500 + self.running = True + self.instruments = sorted([k for k in INSTRUMENTS.keys() + if k not in ("808_bass",)]) + self.drum_patterns = sorted(Pattern.list_presets()) + self.current_drum = "rock" + self.picks = [] + self.bpm = "—" + self.status = "Init" + self.status_color = 3 # yellow + self._build_engine() + + def _build_engine(self): + rng = random.Random(self.seed) + self.picks = rng.sample(self.instruments, 8) + self.engine = LiveEngine(buffer_size=128) + for i, inst in enumerate(self.picks, 1): + self.engine.channel(i, instrument=inst, reverb=0.3) + self.engine.drums(self.current_drum, volume=0.5) + self.engine.cc(0, "lowpass", min_val=300, max_val=8000) + + def log(self, msg, color=0): + self.log_lines.append((time.time(), msg, color)) + if len(self.log_lines) > self.max_log: + self.log_lines = self.log_lines[-self.max_log:] + + def _patch_engine_logging(self): + original_cb = self.engine._midi_callback + + def logging_cb(event, data=None): + msg, _ = event + if len(msg) == 0: + return + if msg[0] == 0xF8: + if self.engine._bpm > 10: + self.bpm = f"{self.engine._bpm:.0f}" + elif msg[0] == 0xFA: + self.log("▶ Start", 5) + self.status = "Playing" + self.status_color = 1 + elif msg[0] == 0xFC: + self.log("■ Stop", 4) + self.status = "Stopped" + self.status_color = 4 + elif msg[0] == 0xFB: + self.log("▶ Continue", 5) + self.status = "Playing" + self.status_color = 1 + elif len(msg) >= 3: + status = msg[0] + ch = (status & 0x0F) + 1 + msg_type = status & 0xF0 + if msg_type == 0x90 and msg[2] > 0: + inst = self.picks[ch - 1] if 1 <= ch <= 8 else "?" + self.log(f"♪ {ch}:{inst} {note_name(msg[1])} v={msg[2]}", 1) + elif msg_type == 0xB0: + self.log(f"⚙ CC{msg[1]}={msg[2]}", 3) + elif msg_type == 0xE0: + bend = ((msg[2] << 7) | msg[1]) - 8192 + self.log(f"↕ Bend ch{ch} {bend:+d}", 3) + original_cb(event, data) + + self.engine._midi_callback = logging_cb + + def run(self, stdscr): + curses.curs_set(0) + curses.use_default_colors() + curses.init_pair(1, curses.COLOR_GREEN, -1) + curses.init_pair(2, curses.COLOR_CYAN, -1) + curses.init_pair(3, curses.COLOR_YELLOW, -1) + curses.init_pair(4, curses.COLOR_RED, -1) + curses.init_pair(5, curses.COLOR_MAGENTA, -1) + curses.init_pair(6, curses.COLOR_BLACK, curses.COLOR_BLUE) + curses.init_pair(7, curses.COLOR_BLACK, curses.COLOR_GREEN) + curses.init_pair(8, curses.COLOR_BLACK, curses.COLOR_YELLOW) + curses.init_pair(9, curses.COLOR_BLACK, curses.COLOR_RED) + stdscr.nodelay(True) + stdscr.timeout(60) + + self._patch_engine_logging() + + # Pre-render with progress + self.status = "Rendering" + self.status_color = 3 + stdscr.erase() + stdscr.addstr(1, 2, "PyTheory Live", curses.A_BOLD) + stdscr.addstr(2, 2, "Pre-rendering wavetables...", curses.color_pair(3)) + stdscr.refresh() + + n_samples = 44100 * 3 + count = 0 + for _, channel in self.engine.channels.items(): + if channel.is_drums: + continue + for midi_note in range(36, 97): + channel._get_wave(midi_note, n_samples) + count += 1 + if count % 50 == 0: + stdscr.addstr(3, 2, f" {count} wavetables...", curses.color_pair(2)) + stdscr.refresh() + + self.log(f"Cached {count} wavetables", 1) + self.status = "Listening" + self.status_color = 2 + + # Start engine + def run_engine(): + devnull = open(os.devnull, 'w') + old = sys.stdout + sys.stdout = devnull + try: + self.engine.start(port=self.port) + except Exception as e: + sys.stdout = old + self.log(f"Error: {e}", 4) + finally: + sys.stdout = old + devnull.close() + + engine_thread = threading.Thread(target=run_engine, daemon=True) + engine_thread.start() + + cmd_buf = "" + cmd_history = [] + history_idx = -1 + + while self.running: + try: + h, w = stdscr.getmaxyx() + if h < 10 or w < 40: + time.sleep(0.1) + continue + stdscr.erase() + + div = max(30, w * 3 // 5) + cfg_x = div + 2 + + # ═══ HEADER BAR ═══ + header = f" PyTheory Live " + stdscr.addstr(0, 0, header, curses.color_pair(6) | curses.A_BOLD) + + # Status badge + badge_colors = {1: 7, 2: 6, 3: 8, 4: 9} + badge_cp = badge_colors.get(self.status_color, 6) + badge = f" {self.status} " + x = len(header) + stdscr.addstr(0, x, badge, curses.color_pair(badge_cp) | curses.A_BOLD) + x += len(badge) + + info = f" BPM:{self.bpm} drums:{self.current_drum} seed:{self.seed}" + try: + stdscr.addstr(0, x, info[:w - x], curses.color_pair(6)) + # Fill rest of header + remaining = w - x - len(info) + if remaining > 0: + stdscr.addstr(0, x + len(info), " " * remaining, curses.color_pair(6)) + except curses.error: + pass + + # ═══ DIVIDER ═══ + for y in range(1, h - 2): + try: + stdscr.addch(y, div, '│', curses.color_pair(2) | curses.A_DIM) + except curses.error: + pass + + # ═══ LEFT: EVENTS ═══ + try: + stdscr.addstr(1, 1, " Events ", curses.color_pair(2) | curses.A_BOLD) + except curses.error: + pass + + log_h = h - 5 + visible = self.log_lines[-log_h:] if log_h > 0 else [] + for i, (ts, msg, color) in enumerate(visible): + ly = 2 + i + if ly >= h - 3: + break + # Fade old messages + age = time.time() - ts + attr = curses.A_DIM if age > 8 else 0 + try: + stdscr.addstr(ly, 1, msg[:div - 2], + curses.color_pair(color) | attr) + except curses.error: + pass + + # ═══ RIGHT: CONFIG ═══ + try: + stdscr.addstr(1, cfg_x, " Config ", + curses.color_pair(2) | curses.A_BOLD) + except curses.error: + pass + + y = 3 + rw = w - cfg_x - 1 + for i, inst in enumerate(self.picks, 1): + try: + stdscr.addstr(y, cfg_x, f"{i}", curses.A_BOLD) + stdscr.addstr(y, cfg_x + 1, ":", curses.A_DIM) + stdscr.addstr(y, cfg_x + 2, inst[:rw - 2], + curses.color_pair(1)) + except curses.error: + pass + y += 1 + + y += 1 + pairs = [ + ("Drums", self.current_drum, 5), + ("BPM", str(self.bpm), 0), + ("Latency", f"{self.engine.buffer_size / 44100 * 1000:.1f}ms", 0), + ("MIDI", self.port, 0), + ("Seed", str(self.seed), 2), + ] + for label, val, cp in pairs: + try: + stdscr.addstr(y, cfg_x, f"{label}:", curses.A_BOLD) + stdscr.addstr(y, cfg_x + len(label) + 1, f" {val}"[:rw], + curses.color_pair(cp)) + except curses.error: + pass + y += 1 + + y += 1 + cmds = [ + "ch [inst]", + "fx ", + "drums [pattern|-]", + "seed [n]", + "list patterns", + "exit", + ] + try: + stdscr.addstr(y, cfg_x, "Commands:", curses.color_pair(3)) + except curses.error: + pass + for i, c in enumerate(cmds): + try: + stdscr.addstr(y + 1 + i, cfg_x + 1, c[:rw], + curses.color_pair(2) | curses.A_DIM) + except curses.error: + pass + + # ═══ INPUT BAR ═══ + iy = h - 2 + try: + stdscr.addstr(iy - 1, 0, "─" * (w - 1), curses.A_DIM) + stdscr.addstr(iy, 0, " $ ", + curses.color_pair(1) | curses.A_BOLD) + stdscr.addstr(iy, 3, cmd_buf[:w - 5]) + # Cursor + cx = 3 + len(cmd_buf) + if cx < w - 1: + stdscr.addstr(iy, cx, "█", + curses.color_pair(1) | curses.A_BOLD) + except curses.error: + pass + + stdscr.refresh() + + # ═══ INPUT ═══ + ch = stdscr.getch() + if ch == -1: + continue + elif ch == 10 or ch == 13: + if cmd_buf.strip(): + cmd_history.append(cmd_buf) + self._handle_command(cmd_buf.strip()) + cmd_buf = "" + history_idx = -1 + elif ch == 27: + cmd_buf = "" + elif ch == curses.KEY_BACKSPACE or ch == 127: + cmd_buf = cmd_buf[:-1] + elif ch == curses.KEY_UP: + if cmd_history and history_idx < len(cmd_history) - 1: + history_idx += 1 + cmd_buf = cmd_history[-(history_idx + 1)] + elif ch == curses.KEY_DOWN: + if history_idx > 0: + history_idx -= 1 + cmd_buf = cmd_history[-(history_idx + 1)] + else: + history_idx = -1 + cmd_buf = "" + elif 32 <= ch < 127: + cmd_buf += chr(ch) + + except KeyboardInterrupt: + self.running = False + + self.engine.stop() + + def _handle_command(self, cmd): + parts = cmd.split() + if not parts: + return + verb = parts[0].lower() + + if verb in ("quit", "q", "exit"): + self.running = False + elif verb in ("help", "h"): + self.log("ch [inst] | fx | drums [pat|-]", 2) + self.log("seed [n] | list | patterns | exit", 2) + elif verb == "ch" and len(parts) == 2: + try: + n = int(parts[1]) + if 1 <= n <= 8: + self.log(f"Ch {n}: {self.picks[n-1]}", 2) + else: + self.log("Channel 1-8", 4) + except ValueError: + self.log("ch <1-8> [instrument]", 4) + elif verb == "ch" and len(parts) >= 3: + try: + n = int(parts[1]) + inst = parts[2] + if inst not in INSTRUMENTS: + self.log(f"Unknown: {inst}", 4) + return + if not (1 <= n <= 8): + self.log("Channel 1-8", 4) + return + self.picks[n - 1] = inst + self.engine.channel(n, instrument=inst, reverb=0.3) + self.log(f"Ch {n} → {inst}", 1) + except (ValueError, IndexError): + self.log("ch <1-8> ", 4) + elif verb == "fx" and len(parts) >= 4: + try: + n = int(parts[1]) + param = parts[2] + val = float(parts[3]) + if not (1 <= n <= 8): + self.log("Channel 1-8", 4) + return + if n not in self.engine.channels: + self.log(f"Channel {n} not active", 4) + return + channel = self.engine.channels[n] + if param == "reverb": + channel.reverb = val + channel._cache.clear() + elif param == "lowpass": + channel.lowpass = val + channel._cache.clear() + elif param == "volume": + channel.volume = val + elif hasattr(channel, param): + setattr(channel, param, val) + channel._cache.clear() + else: + self.log(f"Unknown param: {param}", 4) + return + self.log(f"Ch {n} {param}={val}", 1) + except (ValueError, IndexError): + self.log("fx <1-8> ", 4) + elif verb == "fx" and len(parts) == 2: + try: + n = int(parts[1]) + if n in self.engine.channels: + ch = self.engine.channels[n] + self.log(f"Ch {n}: vol={ch.volume} lp={ch.lowpass} rev={ch.reverb}", 2) + else: + self.log(f"Channel {n} not active", 4) + except ValueError: + self.log("fx ", 4) + elif verb == "fx" and len(parts) <= 1: + self.log("Params: volume lowpass reverb", 2) + self.log(" chorus detune spread analog", 2) + self.log(" distortion delay tremolo_depth", 2) + self.log(" saturation phaser sub_osc noise_mix", 2) + self.log("fx ", 2) + elif verb == "drums" and len(parts) == 1: + self.log(f"Current: {self.current_drum}", 5) + for i in range(0, len(self.drum_patterns), 4): + row = " ".join(f"{x:17s}" for x in self.drum_patterns[i:i+4]) + self.log(f" {row}", 2) + elif verb == "drums" and len(parts) >= 2: + pat = " ".join(parts[1:]) + if pat in ("none", "off", "mute", "-"): + self.current_drum = "none" + self.engine._drum_pattern = None + self.engine._drum_channel = None + self.log("Drums off", 4) + else: + try: + self.current_drum = pat + self.engine.drums(pat, volume=0.5) + self.log(f"Drums → {pat}", 5) + except Exception as e: + self.log(f"Error: {e}", 4) + elif verb == "seed" and len(parts) == 1: + self.log(f"Seed: {self.seed}", 2) + elif verb == "seed" and len(parts) >= 2: + try: + self.seed = int(parts[1]) + rng = random.Random(self.seed) + self.picks = rng.sample(self.instruments, 8) + for i, inst in enumerate(self.picks, 1): + self.engine.channel(i, instrument=inst, reverb=0.3) + self.log(f"Seed → {self.seed}", 1) + except ValueError: + self.log("seed ", 4) + elif verb == "list": + for i in range(0, len(self.instruments), 3): + row = " ".join(f"{x:22s}" for x in self.instruments[i:i+3]) + self.log(f" {row}", 2) + elif verb == "patterns": + for i in range(0, len(self.drum_patterns), 4): + row = " ".join(f"{x:17s}" for x in self.drum_patterns[i:i+4]) + self.log(f" {row}", 2) + else: + self.log(f"? {cmd}", 4) + + +def main(): + seed = int(sys.argv[1]) if len(sys.argv) > 1 else None + port = sys.argv[2] if len(sys.argv) > 2 else "OP-XY" + tui = LiveTUI(seed=seed, port=port) + curses.wrapper(tui.run) + + +if __name__ == "__main__": + main() diff --git a/uv.lock b/uv.lock index 53a4acc..6d04582 100644 --- a/uv.lock +++ b/uv.lock @@ -693,6 +693,7 @@ name = "pytheory" version = "0.40.0" source = { editable = "." } dependencies = [ + { name = "rich" }, { name = "scipy", version = "1.15.3", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.11'" }, { name = "scipy", version = "1.17.1", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.11'" }, { name = "sounddevice" }, @@ -718,6 +719,7 @@ docs = [ [package.metadata] requires-dist = [ { name = "python-rtmidi", marker = "extra == 'live'", specifier = ">=1.5.8" }, + { name = "rich", specifier = ">=14.3.3" }, { name = "scipy" }, { name = "sounddevice" }, ] @@ -832,6 +834,20 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/1e/db/4254e3eabe8020b458f1a747140d32277ec7a271daf1d235b70dc0b4e6e3/requests-2.32.5-py3-none-any.whl", hash = "sha256:2462f94637a34fd532264295e186976db0f5d453d1cdd31473c85a6a161affb6", size = 64738, upload-time = "2025-08-18T20:46:00.542Z" }, ] +[[package]] +name = "rich" +version = "14.3.3" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "markdown-it-py", version = "3.0.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.11'" }, + { name = "markdown-it-py", version = "4.0.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.11'" }, + { name = "pygments" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/b3/c6/f3b320c27991c46f43ee9d856302c70dc2d0fb2dba4842ff739d5f46b393/rich-14.3.3.tar.gz", hash = "sha256:b8daa0b9e4eef54dd8cf7c86c03713f53241884e814f4e2f5fb342fe520f639b", size = 230582, upload-time = "2026-02-19T17:23:12.474Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/14/25/b208c5683343959b670dc001595f2f3737e051da617f66c31f7c4fa93abc/rich-14.3.3-py3-none-any.whl", hash = "sha256:793431c1f8619afa7d3b52b2cdec859562b950ea0d4b6b505397612db8d5362d", size = 310458, upload-time = "2026-02-19T17:23:13.732Z" }, +] + [[package]] name = "roman-numerals" version = "4.1.0"