diff --git a/pytheory/live.py b/pytheory/live.py index 83885a9..c441df2 100644 --- a/pytheory/live.py +++ b/pytheory/live.py @@ -74,6 +74,7 @@ class _Channel: self.lowpass_q = kwargs.get('lowpass_q', 0.707) self.reverb = kwargs.get('reverb', 0) self.volume = kwargs.get('volume', 0.5) + self.pan = kwargs.get('pan', 0.0) # -1 left, 0 center, 1 right self.chorus = kwargs.get('chorus', 0) self.detune = kwargs.get('detune', 0) self.spread = kwargs.get('spread', 0) @@ -89,6 +90,7 @@ class _Channel: self.voices = [] # active _Voice objects self._cache = {} # MIDI note -> pre-rendered wave self._lock = threading.Lock() + self.level = 0.0 # current output level (for VU meter) def _get_wave(self, midi_note, n_samples): """Get or render a waveform for a MIDI note.""" @@ -188,9 +190,9 @@ class _Channel: v.releasing = True v.release_pos = 0 - def render(self, n_frames): - """Mix all active voices into a buffer.""" - buf = numpy.zeros(n_frames, dtype=numpy.float32) + def render_stereo(self, n_frames): + """Mix all active voices into a stereo buffer (n_frames, 2).""" + mono = numpy.zeros(n_frames, dtype=numpy.float32) dead = [] with self._lock: @@ -207,19 +209,20 @@ class _Channel: dead.append(i) continue - # Pitch bend: read at variable rate through the wavetable + # Pitch bend: variable-rate read if abs(v.pitch_ratio - 1.0) > 0.001: read_positions = v.pos + numpy.arange(chunk) * v.pitch_ratio read_positions = numpy.clip(read_positions, 0, len(v.wave) - 2) idx = read_positions.astype(numpy.int64) frac = (read_positions - idx).astype(numpy.float32) - samples = (v.wave[idx] * (1 - frac) + v.wave[numpy.minimum(idx + 1, len(v.wave) - 1)] * frac) + samples = (v.wave[idx] * (1 - frac) + + v.wave[numpy.minimum(idx + 1, len(v.wave) - 1)] * frac) samples *= v.velocity * self.volume else: int_pos = int(v.pos) samples = v.wave[int_pos:int_pos + chunk] * v.velocity * self.volume - # Release fade + # Release crossfade if v.releasing: fade_chunk = min(chunk, v.release_len - v.release_pos) if fade_chunk > 0: @@ -234,7 +237,7 @@ class _Channel: v.active = False samples[fade_chunk:] = 0 - buf[:chunk] += samples + mono[:chunk] += samples v.pos += chunk * v.pitch_ratio # Clean up dead voices @@ -242,7 +245,20 @@ class _Channel: if i < len(self.voices): self.voices.pop(i) - return buf + # VU meter + peak = numpy.abs(mono).max() if len(mono) > 0 else 0 + self.level = self.level * 0.7 + peak * 0.3 # smooth + + # Stereo pan (constant power) + import math + angle = (self.pan + 1) * math.pi / 4 # 0 to pi/2 + l_gain = math.cos(angle) + r_gain = math.sin(angle) + stereo = numpy.zeros((n_frames, 2), dtype=numpy.float32) + stereo[:, 0] = mono * l_gain + stereo[:, 1] = mono * r_gain + + return stereo # ── LiveEngine ─────────────────────────────────────────────────────────── @@ -270,6 +286,13 @@ class LiveEngine: self._midi_in = None self._stream = None self._stop_event = threading.Event() + # Recording + self._recording = False + self._record_events = [] # (timestamp, ch, note, velocity, on/off) + self._record_start = 0 + # Keyboard MIDI + self._keyboard_channel = None + self._keyboard_octave = 4 # Clock sync self._clock_count = 0 # MIDI clock pulses (24 per quarter note) self._clock_times = [] # timestamps for BPM calculation @@ -460,8 +483,16 @@ class LiveEngine: if msg_type == 0x90 and velocity > 0: channel.note_on(note, velocity) + if self._recording: + import time as _t + self._record_events.append( + (_t.perf_counter() - self._record_start, ch, note, velocity, True)) elif msg_type == 0x80 or (msg_type == 0x90 and velocity == 0): channel.note_off(note) + if self._recording: + import time as _t + self._record_events.append( + (_t.perf_counter() - self._record_start, ch, note, 0, False)) elif msg_type == 0xB0: self._apply_cc(ch, note, velocity) elif msg_type == 0xE0: @@ -474,16 +505,17 @@ class LiveEngine: v.pitch_ratio = ratio def _audio_callback(self, outdata, frames, time_info, status): - """sounddevice callback - mix all channels.""" - buf = numpy.zeros(frames, dtype=numpy.float32) + """sounddevice callback - mix all channels to stereo.""" + stereo = numpy.zeros((frames, 2), dtype=numpy.float32) for channel in self.channels.values(): - buf += channel.render(frames) + stereo += channel.render_stereo(frames) if self._drum_channel: - buf += self._drum_channel.render(frames) + stereo += self._drum_channel.render_stereo(frames) - buf = numpy.tanh(buf) - outdata[:, 0] = buf - outdata[:, 1] = buf + # Soft clip per channel + stereo[:, 0] = numpy.tanh(stereo[:, 0]) + stereo[:, 1] = numpy.tanh(stereo[:, 1]) + outdata[:] = stereo def list_ports(self): """List available MIDI input ports.""" @@ -595,6 +627,167 @@ class LiveEngine: self._midi_in.delete() self._midi_in = None + def keyboard_play(self, ch=1): + """Enable computer keyboard as MIDI input on a channel.""" + self._keyboard_channel = ch + return self + + def keyboard_note(self, key, on=True): + """Translate a keyboard key to a MIDI note and play it. + + QWERTY layout: ZSXDCVGBHNJM = C through B (lower octave) + Q2W3ER5T6Y7U = C through B (upper octave) + """ + # Lower row: Z=C, S=C#, X=D, D=D#, C=E, V=F, G=F#, B=G, H=G#, N=A, J=A#, M=B + lower = {'z': 0, 's': 1, 'x': 2, 'd': 3, 'c': 4, 'v': 5, + 'g': 6, 'b': 7, 'h': 8, 'n': 9, 'j': 10, 'm': 11} + # Upper row: Q=C, 2=C#, W=D, 3=D#, E=E, R=F, 5=F#, T=G, 6=G#, Y=A, 7=A#, U=B + upper = {'q': 0, '2': 1, 'w': 2, '3': 3, 'e': 4, 'r': 5, + '5': 6, 't': 7, '6': 8, 'y': 9, '7': 10, 'u': 11} + + if self._keyboard_channel is None: + return False + + ch = self._keyboard_channel + if ch not in self.channels: + return False + + midi_note = None + if key in lower: + midi_note = (self._keyboard_octave + 1) * 12 + lower[key] + elif key in upper: + midi_note = (self._keyboard_octave + 2) * 12 + upper[key] + + if midi_note is not None: + channel = self.channels[ch] + if on: + channel.note_on(midi_note, 100) + if self._recording: + import time as _t + self._record_events.append( + (_t.perf_counter() - self._record_start, + ch, midi_note, 100, True)) + else: + channel.note_off(midi_note) + if self._recording: + import time as _t + self._record_events.append( + (_t.perf_counter() - self._record_start, + ch, midi_note, 0, False)) + return True + return False + + def start_recording(self): + """Start recording MIDI events.""" + import time as _t + self._record_events = [] + self._record_start = _t.perf_counter() + self._recording = True + + def stop_recording(self): + """Stop recording.""" + self._recording = False + + def export_recording(self, filename="recording.mid", bpm=None): + """Export recorded events to a MIDI file. + + Returns a pytheory Score if no filename given. + """ + if not self._record_events: + return None + + use_bpm = bpm or (self._bpm if self._bpm > 10 else 120) + + from .rhythm import Score, Duration + + score = Score("4/4", bpm=int(use_bpm)) + + # Group events by channel + by_channel = {} + for ts, ch, note, vel, is_on in self._record_events: + if ch not in by_channel: + by_channel[ch] = [] + by_channel[ch].append((ts, note, vel, is_on)) + + # Build parts + for ch, events in sorted(by_channel.items()): + inst = self.picks[ch - 1] if 1 <= ch <= 8 else "piano" + part = score.part(f"ch{ch}", instrument=inst) + + # Convert to note-on/off pairs + active = {} + notes = [] + for ts, note, vel, is_on in events: + if is_on: + active[note] = (ts, vel) + elif note in active: + start_ts, start_vel = active.pop(note) + dur_sec = ts - start_ts + dur_beats = dur_sec * use_bpm / 60.0 + notes.append((start_ts, note, max(0.125, dur_beats), start_vel)) + + notes.sort(key=lambda x: x[0]) + + beat_pos = 0.0 + for ts, midi_note, dur, vel in notes: + note_beat = ts * use_bpm / 60.0 + if note_beat > beat_pos: + part.rest(note_beat - beat_pos) + # Convert MIDI note to name + name = NOTE_NAMES[midi_note % 12] + octave = midi_note // 12 - 1 + part.add(f"{name}{octave}", dur, velocity=vel) + beat_pos = note_beat + dur + + if filename: + score.save_midi(filename) + + return score + + def save_config(self, filename): + """Save current configuration to JSON.""" + import json + config = { + "seed": self.seed if hasattr(self, 'seed') else None, + "buffer_size": self.buffer_size, + "drums": getattr(self, '_drum_pattern_name', None), + "channels": {}, + } + for ch, channel in self.channels.items(): + config["channels"][str(ch)] = { + "synth": channel.synth_name, + "envelope": channel.envelope_name, + "volume": channel.volume, + "pan": channel.pan, + "reverb": channel.reverb, + "lowpass": channel.lowpass, + "chorus": channel.chorus, + "distortion": channel.distortion, + "is_drums": channel.is_drums, + } + with open(filename, 'w') as f: + json.dump(config, f, indent=2) + + def load_config(self, filename): + """Load configuration from JSON.""" + import json + with open(filename) as f: + config = json.load(f) + for ch_str, ch_cfg in config.get("channels", {}).items(): + ch = int(ch_str) + self.channel(ch, + synth=ch_cfg.get("synth"), + envelope=ch_cfg.get("envelope"), + drums=ch_cfg.get("is_drums", False), + volume=ch_cfg.get("volume", 0.5), + pan=ch_cfg.get("pan", 0.0), + reverb=ch_cfg.get("reverb", 0), + lowpass=ch_cfg.get("lowpass", 0), + chorus=ch_cfg.get("chorus", 0), + distortion=ch_cfg.get("distortion", 0)) + if config.get("drums"): + self.drums(config["drums"]) + def stop(self): """Stop the engine.""" self._stop_event.set() diff --git a/test_live.py b/test_live.py index 35e67c1..aa68baf 100644 --- a/test_live.py +++ b/test_live.py @@ -235,6 +235,30 @@ class LiveTUI: y += 1 y += 1 + # VU meters per channel + y += 1 + try: + stdscr.addstr(y, cfg_x, "Levels:", curses.A_BOLD | curses.A_DIM) + except curses.error: + pass + y += 1 + for i, inst in enumerate(self.picks, 1): + if i in self.engine.channels: + lv = self.engine.channels[i].level + bars = int(lv * 20) + meter = "█" * bars + "░" * (20 - bars) + color = 1 if bars < 15 else 3 if bars < 18 else 4 + try: + stdscr.addstr(y, cfg_x, f"{i}", curses.A_BOLD) + stdscr.addstr(y, cfg_x + 1, f" {meter}", curses.color_pair(color)) + except curses.error: + pass + y += 1 + + y += 1 + rec_indicator = " ● REC " if self.engine._recording else "" + kbd_indicator = f" kbd:ch{self.engine._keyboard_channel} oct{self.engine._keyboard_octave}" if self.engine._keyboard_channel else "" + pairs = [ ("Drums", self.current_drum, 5), ("BPM", str(self.bpm), 0), @@ -242,6 +266,10 @@ class LiveTUI: ("MIDI", self.port, 0), ("Seed", str(self.seed), 2), ] + if rec_indicator: + pairs.append(("", rec_indicator, 4)) + if kbd_indicator: + pairs.append(("", kbd_indicator, 2)) for label, val, cp in pairs: try: stdscr.addstr(y, cfg_x, f"{label}:", curses.A_BOLD) @@ -255,9 +283,12 @@ class LiveTUI: cmds = [ "ch [inst]", "fx ", + "pan <-1..1>", "drums [pattern|-]", - "seed [n]", - "list patterns", + "kbd [ch] [oct]", + "rec / stop / export", + "save/load ", + "seed [n] / list", "exit", ] try: @@ -349,8 +380,19 @@ class LiveTUI: tab_matches = [] cursor_pos = len(cmd_buf) elif 32 <= ch < 127: - cmd_buf = cmd_buf[:cursor_pos] + chr(ch) + cmd_buf[cursor_pos:] - cursor_pos += 1 + key = chr(ch) + # Keyboard-as-MIDI: if keyboard mode is on and not typing a command + if (self.engine._keyboard_channel and + not cmd_buf and + self.engine.keyboard_note(key.lower(), on=True)): + # Schedule note-off after 200ms + def _off(k=key.lower()): + time.sleep(0.2) + self.engine.keyboard_note(k, on=False) + threading.Thread(target=_off, daemon=True).start() + else: + cmd_buf = cmd_buf[:cursor_pos] + key + cmd_buf[cursor_pos:] + cursor_pos += 1 tab_matches = [] tab_idx = -1 @@ -362,10 +404,11 @@ class LiveTUI: def _complete(self, text): """Return list of completions for current input.""" parts = text.split() - commands = ["ch", "fx", "drums", "seed", "list", "patterns", "help", "exit"] - fx_params = ["volume", "lowpass", "reverb", "chorus", "detune", "spread", - "analog", "distortion", "delay", "tremolo_depth", - "saturation", "phaser", "sub_osc", "noise_mix"] + commands = ["ch", "fx", "pan", "drums", "kbd", "rec", "stop", "export", + "save", "load", "seed", "list", "patterns", "octave", "help", "exit"] + fx_params = ["volume", "pan", "lowpass", "lowpass_q", "reverb", "chorus", + "detune", "spread", "analog", "distortion", "delay", + "tremolo_depth", "saturation", "phaser", "sub_osc", "noise_mix"] if not parts: return [c + " " for c in commands] @@ -478,10 +521,18 @@ class LiveTUI: except ValueError: self.log("fx ", 4) elif verb == "fx" and len(parts) <= 1: - self.log("Params: volume lowpass reverb", 2) - self.log(" chorus detune spread analog", 2) - self.log(" distortion delay tremolo_depth", 2) - self.log(" saturation phaser sub_osc noise_mix", 2) + self.log("Volume/Mix:", 3) + self.log(" volume pan reverb delay", 2) + self.log("Filter:", 3) + self.log(" lowpass lowpass_q", 2) + self.log("Modulation:", 3) + self.log(" chorus detune spread tremolo_depth", 2) + self.log(" phaser analog", 2) + self.log("Drive:", 3) + self.log(" distortion saturation", 2) + self.log("Synth:", 3) + self.log(" sub_osc noise_mix", 2) + self.log("", 0) self.log("fx ", 2) elif verb == "drums" and len(parts) == 1: self.log(f"Current: {self.current_drum}", 5) @@ -522,6 +573,74 @@ class LiveTUI: for i in range(0, len(self.drum_patterns), 4): row = " ".join(f"{x:17s}" for x in self.drum_patterns[i:i+4]) self.log(f" {row}", 2) + elif verb == "pan" and len(parts) >= 3: + try: + n = int(parts[1]) + val = float(parts[2]) + val = max(-1.0, min(1.0, val)) + if n in self.engine.channels: + self.engine.channels[n].pan = val + self.log(f"Ch {n} pan={val:+.1f}", 1) + else: + self.log(f"Channel {n} not active", 4) + except ValueError: + self.log("pan <1-8> <-1..1>", 4) + elif verb == "kbd": + if len(parts) >= 2: + try: + ch_num = int(parts[1]) + self.engine._keyboard_channel = ch_num + if len(parts) >= 3: + self.engine._keyboard_octave = int(parts[2]) + self.log(f"Keyboard → ch{ch_num} oct{self.engine._keyboard_octave}", 1) + except ValueError: + self.log("kbd [octave]", 4) + elif self.engine._keyboard_channel: + self.engine._keyboard_channel = None + self.log("Keyboard off", 3) + else: + self.engine._keyboard_channel = 1 + self.log(f"Keyboard → ch1 oct{self.engine._keyboard_octave}", 1) + elif verb == "rec": + self.engine.start_recording() + self.log("● Recording...", 4) + elif verb == "stop" and self.engine._recording: + self.engine.stop_recording() + n = len(self.engine._record_events) + self.log(f"■ Stopped ({n} events)", 3) + elif verb == "export": + fname = parts[1] if len(parts) > 1 else "recording.mid" + score = self.engine.export_recording(fname) + if score: + self.log(f"Exported → {fname}", 1) + else: + self.log("Nothing to export", 4) + elif verb == "save" and len(parts) >= 2: + fname = parts[1] + if not fname.endswith(".json"): + fname += ".json" + self.engine.seed = self.seed + self.engine.save_config(fname) + self.log(f"Saved → {fname}", 1) + elif verb == "load" and len(parts) >= 2: + fname = parts[1] + if not fname.endswith(".json"): + fname += ".json" + try: + self.engine.load_config(fname) + self.log(f"Loaded ← {fname}", 1) + # Update picks from channels + for ch, channel in self.engine.channels.items(): + if 1 <= ch <= 8: + self.picks[ch - 1] = channel.synth_name + except Exception as e: + self.log(f"Error: {e}", 4) + elif verb == "octave" and len(parts) >= 2: + try: + self.engine._keyboard_octave = int(parts[1]) + self.log(f"Keyboard octave → {self.engine._keyboard_octave}", 2) + except ValueError: + self.log("octave <0-8>", 4) else: self.log(f"? {cmd}", 4)